火花,最佳地将单个RDD分为两个
-
16-10-2019 - |
题
我有一个大数据集,需要根据特定参数将其分为组。我希望这项工作能够尽可能有效地处理。我可以设想两种方法
选项1 - 从原始RDD和过滤器创建地图
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
选项2 - 直接过滤原始RDD
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
拳头方法必须在原始数据集的所有记录上进行次要,其中第二个只能在正常情况下两次这样做,但是,Spark在幕后进行了一些图形构建,因此我可以想象它们是有效地以相同的方式完成。我的问题是:
解决方案
首先,让我告诉你我不是火花专家。在过去的几个月中,我一直在使用它很多,我相信我现在理解了它,但是我可能错了。
因此,回答您的问题:
a。)它们是等效的,但不是您看到的方式;如果您想知道,Spark不会优化图形,但是 customMapper
在两种情况下仍将执行两次;这是由于火花的事实, rdd1
和 rdd2
是两个完全不同的RDD,它将构建从叶子开始的转换图。因此,选项1将转化为:
rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()
如你所说, customMapper
被执行两次(此外, rddIn
将被读取两次,这意味着,如果它来自数据库,则可能更慢)。
b。)有一种方法,您只需要移动 cache()
在正确的位置:
mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)
通过这样做,我们告诉Spark,它可以存储 mappedRdd
;然后,它将使用这些部分结果 rdd1
和 rdd2
. 。从火花的角度来看,这等同于:
mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)