Frage

Ich habe einen großen Datensatz, den ich gemäß bestimmten Parametern in Gruppen teilen muss. Ich möchte, dass der Job so effizient wie möglich verarbeitet. Ich kann mir zwei Möglichkeiten vorstellen, dies zu tun

Option 1 - Erstellen Sie die Karte aus Original RDD und Filter

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Option 2 - FILDEN SIE URIGUM RDD Direkt

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

Die Faustmethode muss alle Datensätze des ursprünglichen Datensatzes dreimal übertreffen, wobei die zweite unter normalen Umständen nur zweimal tun muss effektiv auf die gleiche Weise gemacht. Meine Fragen sind: a.) Ist eine methode effizienter als die andere oder macht das Funkengrafiker sie äquivalent b.

War es hilfreich?

Lösung

Lassen Sie mich Ihnen zunächst sagen, dass ich kein Funkenexperte bin. Ich habe es in den letzten Monaten ziemlich viel benutzt und ich glaube, ich verstehe es jetzt, aber ich kann mich irren.

Beantworten Sie also Ihre Fragen:

a.) Sie sind gleichwertig, aber nicht so, wie Sie es sehen; Spark wird das Diagramm nicht optimieren, wenn Sie sich wundern, aber die customMapper rdd1 und rdd2 sind zwei völlig unterschiedliche RDDs, und es baut das Bottom-up der Transformationsgrafik aus den Blättern auf. Option 1 übersetzt also:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Wie du gesagt hast, customMapper wird zweimal ausgeführt (darüber hinaus auch rddIn wird zweimal gelesen, was bedeutet, dass sie, wenn sie aus einer Datenbank stammt, noch langsamer sein kann.

b.) Es gibt einen Weg, man muss sich nur bewegen cache() am richtigen Platz:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Auf diese Weise sagen wir Spark, dass es die Teilergebnisse von speichern kann mappedRdd; Es wird diese Teilergebnisse dann für beide verwenden rdd1 und rdd2. Aus der Sichtweise entspricht dies:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit datascience.stackexchange
scroll top