Pregunta

Tengo un gran conjunto de datos que tengo que dividir en grupos de acuerdo a los parámetros específicos. Quiero que el trabajo para procesar la mayor eficiencia posible. Puedo prever dos maneras de hacerlo

1 - Crear un mapa de RDD original y filtro

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Opción 2 - Filtro RDD original directamente

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

El método puño tiene que itterate sobre todos los registros de los datos originales set 3 veces, en el que el segundo sólo hay que hacerlo dos veces, en circunstancias normales, sin embargo, la chispa tiene alguna detrás del edificio escenas gráfica, por lo que se podía imaginar que se llevan a cabo de manera efectiva de la misma manera. Mis preguntas son: a.) Es un método más eficiente que el otro, o no el edificio gráfico chispa que sean equivalentes b.) ¿Es posible hacer esto dividida en una sola pasada

¿Fue útil?

Solución

En primer lugar déjeme decirle que no soy un experto en la chispa; He estado usando mucho en los últimos meses, y yo creo que ahora entiendo, pero puedo estar equivocado.

Así que, respondiendo a sus preguntas:

a) son equivalentes, pero no en la forma en que lo están viendo.; Spark no optimizará la gráfica si se preguntan, pero el customMapper todavía será ejecutada dos veces en ambos casos; esto es debido al hecho de que para la chispa, rdd1 y rdd2 dos DDR completamente diferentes, y se va a construir el gráfico de transformación de abajo hacia arriba a partir de las hojas; por lo que la opción 1 se traducirá en:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Como usted ha dicho, customMapper se ejecuta dos veces (por otra parte, también será leído rddIn dos veces, lo que significa que si se trata de una base de datos, puede ser incluso más lento).

.

b) hay una manera, sólo hay que mover cache() en el lugar correcto:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Al hacer esto, le estamos diciendo a la chispa que puede almacenar los resultados parciales de mappedRdd; se utilizará entonces estos resultados parciales tanto para rdd1 y rdd2. Desde el punto de vista de chispa esto es equivalente a:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
Licenciado bajo: CC-BY-SA con atribución
scroll top