我有一个大数据集,需要根据特定参数将其分为组。我希望这项工作能够尽可能有效地处理。我可以设想两种方法

选项1 - 从原始RDD和过滤器创建地图

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

选项2 - 直接过滤原始RDD

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

拳头方法必须在原始数据集的所有记录上进行次要,其中第二个只能在正常情况下两次这样做,但是,Spark在幕后进行了一些图形构建,因此我可以想象它们是有效地以相同的方式完成。我的问题是:

有帮助吗?

解决方案

首先,让我告诉你我不是火花专家。在过去的几个月中,我一直在使用它很多,我相信我现在理解了它,但是我可能错了。

因此,回答您的问题:

a。)它们是等效的,但不是您看到的方式;如果您想知道,Spark不会优化图形,但是 customMapper 在两种情况下仍将执行两次;这是由于火花的事实, rdd1rdd2 是两个完全不同的RDD,它将构建从叶子开始的转换图。因此,选项1将转化为:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

如你所说, customMapper 被执行两次(此外, rddIn 将被读取两次,这意味着,如果它来自数据库,则可能更慢)。

b。)有一种方法,您只需要移动 cache() 在正确的位置:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

通过这样做,我们告诉Spark,它可以存储 mappedRdd;然后,它将使用这些部分结果 rdd1rdd2. 。从火花的角度来看,这等同于:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
许可以下: CC-BY-SA归因
scroll top