質問

特定のパラメーターに従ってグループに分割する必要がある大きなデータセットがあります。仕事を可能な限り効率的に処理してほしい。私はそうする2つの方法を想像することができます

オプション1 - 元のRDDとフィルターからマップを作成します

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

オプション2 - 元のRDDを直接フィルタリングします

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

拳法は、元のデータセットのすべての記録を3回ittateしなければなりません。2番目は通常の状況では2回しか必要ありませんが、Sparkは舞台裏のグラフの構築を行うので、それらがあると想像できます。同じ方法で効果的に行われます。私の質問は次のとおりです。a。)1つの方法は他の方法よりも効率的です。または、Sparkグラフの構築はそれらを同等のbにします。)1回のパスでこの分割を行うことは可能ですか

役に立ちましたか?

解決

まず第一に、私は火花の専門家ではないことを教えてください。私はここ数ヶ月でかなり多く使用してきましたが、今は理解していると思いますが、間違っているかもしれません。

だから、あなたの質問に答える:

a。)それらは同等ですが、あなたがそれを見ている方法ではありません。あなたが疑問に思っている場合、Sparkはグラフを最適化しませんが、 customMapper どちらの場合も2回実行されます。これは、火花のために、 rdd1rdd2 2つの完全に異なるRDDであり、リーフから始まる変換グラフボトムアップを構築します。したがって、オプション1は次のように翻訳します。

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

あなたが言ったように、 customMapper さらに2回実行されます(さらに) rddIn 2回読み取られます。つまり、データベースから来る場合、さらに遅くなる可能性があります)。

b。)方法があります、あなたはただ動かなければなりません cache() 正しい場所に:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

これを行うことにより、私たちはそれがの部分的な結果を保存できることをSparkに伝えています mappedRdd;その後、これらの部分的な結果の両方を使用します rdd1rdd2. 。スパークの観点からは、これは次のと同等です。

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
ライセンス: CC-BY-SA帰属
所属していません datascience.stackexchange
scroll top