Domanda

Ho un grande insieme di dati che ho bisogno di dividere in gruppi in base a parametri specifici. Voglio che il lavoro di elaborare nel modo più efficiente possibile. Posso immaginare due modi di fare così

1 - Crea mappa da RDD originale e filtro

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

2 - Filtro RDD originale direttamente

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

Il metodo pugno deve itterate su tutte le registrazioni dei dati originali, situata a 3 volte, in cui il secondo solo ha a che fare in modo due volte, in circostanze normali, tuttavia, scintilla fa alcuni dietro l'edificio quinte grafico, così ho potuto immaginare che essi sono effettivamente fatte nello stesso modo. Le mie domande sono: a.) è un metodo più efficiente rispetto all'altro, o se l'edificio grafico scintilla renderli equivalente b.) E 'possibile fare questa scissione in un singolo passaggio

È stato utile?

Soluzione

Prima di tutto lasciate che vi dica che io non sono un esperto di Spark; Ho usato un bel po 'negli ultimi mesi, e credo che ora capisco, ma potrei sbagliarmi.

Quindi, rispondendo alle vostre domande:

a) sono equivalenti, ma non nel modo in cui si sta vedendo.; Spark non ottimizzerà il grafico se vi state chiedendo, ma la customMapper sarà ancora eseguito due volte in entrambi i casi; ciò è dovuto al fatto che per scintilla, rdd1 e rdd2 sono due RDDs completamente diversi, e sarà costruire il grafico trasformazione bottom-up a partire dai fogli; così Opzione 1 si tradurrà in:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Come hai detto, customMapper viene eseguita due volte (due volte di più, anche rddIn sarà letto, il che significa che se viene da un database, può essere anche più lento).

.

b) c'è un modo, basta spostare cache() nel posto giusto:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

In questo modo, stiamo dicendo scintilla che può memorizzare i risultati parziali di mappedRdd; sarà poi utilizzare questi risultati parziali sia per rdd1 e rdd2. Dal punto di vista scintilla questo equivale a:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
Autorizzato sotto: CC-BY-SA insieme a attribuzione
scroll top