Spark, optimally splitting a single RDD into two
-
16-10-2019 - |
Question
I have a large dataset that I need to split into groups according to specific parameters. I want the job to process as efficiently as possible. I can envision two ways of doing so
Option 1 - Create map from original RDD and filter
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Option 2 - Filter original RDD directly
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
The fist method has to itterate over all the records of the original data set 3 times, where the second only has to do so twice, under normal circumstances, however, spark does some behind the scenes graph building, so I could imagine that they are effectively done in the same way. My questions are: a.) Is one method more efficient than the other, or does the spark graph building make them equivalent b.) Is it possible to do this split in a single pass
Solution
First of all let me tell you that I'm not a Spark expert; I've been using it quite a lot in the last few months, and I believe I now understand it, but I may be wrong.
So, answering your questions:
a.) they are equivalent, but not in the way you're seeing it; Spark will not optimize the graph if you are wondering, but the customMapper
will still be executed twice in both cases; this is due to the fact that for spark, rdd1
and rdd2
are two completely different RDDs, and it will build the transformation graph bottom-up starting from the leafs; so Option 1 will translate to:
rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()
As you said, customMapper
is executed twice (moreover, also rddIn
will be read twice, which means that if it comes from a database, it may be even slower).
b.) there is a way, you just have to move cache()
in the right place:
mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)
By doing this, we are telling spark that it can store the partial results of mappedRdd
; it will then use these partial results both for rdd1
and rdd2
. From the spark point of view this is equivalent to:
mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)