Question

J'ai un grand ensemble de données que j'ai besoin de se scinder en groupes en fonction de paramètres spécifiques. Je veux que le travail de traiter le plus efficacement possible. Je peux imaginer deux façons de le faire

1 - Créer carte de RDD et le filtre d'origine

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Option 2 - Filtre RDD originale directement

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

La méthode de poing doit itterate sur tous les enregistrements des données originales 3 fois, où le second n'a que de le faire deux fois, dans des circonstances normales, cependant, une étincelle fait un peu dans les coulisses bâtiment graphique, pour que je puisse imaginer qu'ils sont effectivement fait de la même manière. Mes questions sont les suivantes: a.) est une méthode plus efficace que l'autre, ou ne le bâtiment graphique d'allumage de les rendre équivalentes b.) Est-il possible de faire cette division en une seule passe

Était-ce utile?

La solution

Tout d'abord laissez-moi vous dire que je ne suis pas un expert Spark; Je l'ai utilisé beaucoup dans les derniers mois, et je crois que je comprends maintenant, mais je peux me tromper.

Alors, répondre à vos questions:

a) ils sont équivalents, mais pas dans la façon dont vous le voir. Spark ne sera pas optimiser le graphique si vous vous demandez, mais le customMapper sera toujours exécuté deux fois dans les deux cas; cela est dû au fait que l'étincelle, rdd1 et rdd2 deux RDD complètement différents, et il va construire la base vers le sommet du graphe de transformation à partir des feuilles; si l'option 1 se traduira par:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Comme vous l'avez dit, customMapper est exécutée deux fois (d'ailleurs, aussi rddIn sera lu deux fois, ce qui signifie que si elle provient d'une base de données, il peut être encore plus lent).

.

b) il existe un moyen, il vous suffit de déplacer cache() au bon endroit:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

En faisant cela, nous disons étincelle qui peut stocker les résultats partiels de mappedRdd; il utilisera ensuite ces résultats partiels à la fois pour rdd1 et rdd2. Du point de vue étincelle, cela équivaut à:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
Licencié sous: CC-BY-SA avec attribution
scroll top