Spark, le fractionnement de manière optimale une seule RDD en deux
-
16-10-2019 - |
Question
J'ai un grand ensemble de données que j'ai besoin de se scinder en groupes en fonction de paramètres spécifiques. Je veux que le travail de traiter le plus efficacement possible. Je peux imaginer deux façons de le faire
1 - Créer carte de RDD et le filtre d'origine
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Option 2 - Filtre RDD originale directement
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
La méthode de poing doit itterate sur tous les enregistrements des données originales 3 fois, où le second n'a que de le faire deux fois, dans des circonstances normales, cependant, une étincelle fait un peu dans les coulisses bâtiment graphique, pour que je puisse imaginer qu'ils sont effectivement fait de la même manière. Mes questions sont les suivantes: a.) est une méthode plus efficace que l'autre, ou ne le bâtiment graphique d'allumage de les rendre équivalentes b.) Est-il possible de faire cette division en une seule passe
La solution
Tout d'abord laissez-moi vous dire que je ne suis pas un expert Spark; Je l'ai utilisé beaucoup dans les derniers mois, et je crois que je comprends maintenant, mais je peux me tromper.
Alors, répondre à vos questions:
a) ils sont équivalents, mais pas dans la façon dont vous le voir. Spark ne sera pas optimiser le graphique si vous vous demandez, mais le customMapper
sera toujours exécuté deux fois dans les deux cas; cela est dû au fait que l'étincelle, rdd1
et rdd2
deux RDD complètement différents, et il va construire la base vers le sommet du graphe de transformation à partir des feuilles; si l'option 1 se traduira par:
rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()
Comme vous l'avez dit, customMapper
est exécutée deux fois (d'ailleurs, aussi rddIn
sera lu deux fois, ce qui signifie que si elle provient d'une base de données, il peut être encore plus lent).
b) il existe un moyen, il vous suffit de déplacer cache()
au bon endroit:
mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)
En faisant cela, nous disons étincelle qui peut stocker les résultats partiels de mappedRdd
; il utilisera ensuite ces résultats partiels à la fois pour rdd1
et rdd2
. Du point de vue étincelle, cela équivaut à:
mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)