Spark、単一のRDDを2つに最適に分割します
-
16-10-2019 - |
質問
特定のパラメーターに従ってグループに分割する必要がある大きなデータセットがあります。仕事を可能な限り効率的に処理してほしい。私はそうする2つの方法を想像することができます
オプション1 - 元のRDDとフィルターからマップを作成します
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
オプション2 - 元のRDDを直接フィルタリングします
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
拳法は、元のデータセットのすべての記録を3回ittateしなければなりません。2番目は通常の状況では2回しか必要ありませんが、Sparkは舞台裏のグラフの構築を行うので、それらがあると想像できます。同じ方法で効果的に行われます。私の質問は次のとおりです。a。)1つの方法は他の方法よりも効率的です。または、Sparkグラフの構築はそれらを同等のbにします。)1回のパスでこの分割を行うことは可能ですか
解決
まず第一に、私は火花の専門家ではないことを教えてください。私はここ数ヶ月でかなり多く使用してきましたが、今は理解していると思いますが、間違っているかもしれません。
だから、あなたの質問に答える:
a。)それらは同等ですが、あなたがそれを見ている方法ではありません。あなたが疑問に思っている場合、Sparkはグラフを最適化しませんが、 customMapper
どちらの場合も2回実行されます。これは、火花のために、 rdd1
と rdd2
2つの完全に異なるRDDであり、リーフから始まる変換グラフボトムアップを構築します。したがって、オプション1は次のように翻訳します。
rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()
あなたが言ったように、 customMapper
さらに2回実行されます(さらに) rddIn
2回読み取られます。つまり、データベースから来る場合、さらに遅くなる可能性があります)。
b。)方法があります、あなたはただ動かなければなりません cache()
正しい場所に:
mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)
これを行うことにより、私たちはそれがの部分的な結果を保存できることをSparkに伝えています mappedRdd
;その後、これらの部分的な結果の両方を使用します rdd1
と rdd2
. 。スパークの観点からは、これは次のと同等です。
mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)