Вопрос

У меня есть большой набор данных, который мне нужно разделить на группы в соответствии с конкретными параметрами. Я хочу, чтобы работа обрабатывала как можно более эффективно. Я могу представить два способа сделать это

Опция 1 - Создать карту из исходного RDD и фильтра

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Вариант 2 - Отфильтруйте оригинал RDD напрямую

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

Метод кулака имеет для работы со всеми записями исходного набора данных 3 раза, где второй должен сделать это только дважды, при нормальных обстоятельствах, однако, Spark делает некоторые за кулисами, так что я мог себе представить, что они есть эффективно сделано таким же образом. Мои вопросы: a.) - это один метод более эффективный, чем другой, или здание Graph Graph делает их эквивалентными b.) Возможно ли сделать это разделить за один проход

Это было полезно?

Решение

Прежде всего, позвольте мне сказать вам, что я не эксперт по иске; Я использовал это довольно много в последние несколько месяцев, и я верю, что теперь понимаю это, но я могу ошибаться.

Итак, отвечая на ваши вопросы:

а.) Они эквивалентны, но не так, как вы это видите; Spark не оптимизирует график, если вам интересно, но customMapper все еще будет выполнен дважды в обоих случаях; Это связано с тем, что для искры, rdd1 а также rdd2 два совершенно разных RDD, и он построит график преобразования снизу вверх, начиная с листьев; Итак, вариант 1 будет переведен в:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Как ты сказал, customMapper выполняется дважды (кроме того, также rddIn будет прочитан дважды, что означает, что если это будет из базы данных, это может быть еще медленнее).

б.) Есть способ, вам просто нужно двигаться cache() В правильном месте:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Делая это, мы говорим Spark, что он может сохранить частичные результаты mappedRdd; Затем он будет использовать эти частичные результаты как для rdd1 а также rdd2. Анкет С точки зрения искры это эквивалентно:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
Лицензировано под: CC-BY-SA с атрибуция
Не связан с datascience.stackexchange
scroll top