Spark, optimal ein einzelnes RDD in zwei Teile aufteilt
-
16-10-2019 - |
Frage
Ich habe einen großen Datensatz, den ich gemäß bestimmten Parametern in Gruppen teilen muss. Ich möchte, dass der Job so effizient wie möglich verarbeitet. Ich kann mir zwei Möglichkeiten vorstellen, dies zu tun
Option 1 - Erstellen Sie die Karte aus Original RDD und Filter
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Option 2 - FILDEN SIE URIGUM RDD Direkt
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
Die Faustmethode muss alle Datensätze des ursprünglichen Datensatzes dreimal übertreffen, wobei die zweite unter normalen Umständen nur zweimal tun muss effektiv auf die gleiche Weise gemacht. Meine Fragen sind: a.) Ist eine methode effizienter als die andere oder macht das Funkengrafiker sie äquivalent b.
Lösung
Lassen Sie mich Ihnen zunächst sagen, dass ich kein Funkenexperte bin. Ich habe es in den letzten Monaten ziemlich viel benutzt und ich glaube, ich verstehe es jetzt, aber ich kann mich irren.
Beantworten Sie also Ihre Fragen:
a.) Sie sind gleichwertig, aber nicht so, wie Sie es sehen; Spark wird das Diagramm nicht optimieren, wenn Sie sich wundern, aber die customMapper
rdd1
und rdd2
sind zwei völlig unterschiedliche RDDs, und es baut das Bottom-up der Transformationsgrafik aus den Blättern auf. Option 1 übersetzt also:
rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()
Wie du gesagt hast, customMapper
wird zweimal ausgeführt (darüber hinaus auch rddIn
wird zweimal gelesen, was bedeutet, dass sie, wenn sie aus einer Datenbank stammt, noch langsamer sein kann.
b.) Es gibt einen Weg, man muss sich nur bewegen cache()
am richtigen Platz:
mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)
Auf diese Weise sagen wir Spark, dass es die Teilergebnisse von speichern kann mappedRdd
; Es wird diese Teilergebnisse dann für beide verwenden rdd1
und rdd2
. Aus der Sichtweise entspricht dies:
mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)