Question

I have a large dataset that I need to split into groups according to specific parameters. I want the job to process as efficiently as possible. I can envision two ways of doing so

Option 1 - Create map from original RDD and filter

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Option 2 - Filter original RDD directly

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

The fist method has to itterate over all the records of the original data set 3 times, where the second only has to do so twice, under normal circumstances, however, spark does some behind the scenes graph building, so I could imagine that they are effectively done in the same way. My questions are: a.) Is one method more efficient than the other, or does the spark graph building make them equivalent b.) Is it possible to do this split in a single pass

Was it helpful?

Solution

First of all let me tell you that I'm not a Spark expert; I've been using it quite a lot in the last few months, and I believe I now understand it, but I may be wrong.

So, answering your questions:

a.) they are equivalent, but not in the way you're seeing it; Spark will not optimize the graph if you are wondering, but the customMapper will still be executed twice in both cases; this is due to the fact that for spark, rdd1 and rdd2 are two completely different RDDs, and it will build the transformation graph bottom-up starting from the leafs; so Option 1 will translate to:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

As you said, customMapper is executed twice (moreover, also rddIn will be read twice, which means that if it comes from a database, it may be even slower).

b.) there is a way, you just have to move cache() in the right place:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

By doing this, we are telling spark that it can store the partial results of mappedRdd; it will then use these partial results both for rdd1 and rdd2. From the spark point of view this is equivalent to:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
Licensed under: CC-BY-SA with attribution
Not affiliated with datascience.stackexchange
scroll top