Каков наилучший способ разделить большие файлы в Python для многопроцессорной обработки?

StackOverflow https://stackoverflow.com/questions/1823300

Вопрос

Я сталкиваюсь со множеством "смущающе параллельных" проектов, которые я хотел бы распараллелить с multiprocessing модуль.Однако они часто включают чтение огромных файлов (более 2 Гб), их построчную обработку, выполнение базовых вычислений, а затем запись результатов.Каков наилучший способ разделить файл и обработать его с помощью многопроцессорного модуля Python?Следует Queue или JoinableQueue в multiprocessing быть использованным?Или тот Queue сам модуль?Или, должен ли я сопоставить файл, который можно повторять по пулу процессов, используя multiprocessing?Я экспериментировал с этими подходами, но при построчном распределении данных накладные расходы огромны.Я остановился на облегченной конструкции трубчатых фильтров, используя cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2, который передает определенный процент входных данных первого процесса непосредственно на второй вход (см. этот пост), но я бы хотел, чтобы решение полностью содержалось на Python.

Удивительно, но документация Python не предлагает канонического способа сделать это (несмотря на длинный раздел о руководствах по программированию в multiprocessing документация).

Спасибо, Винс

Дополнительная информация:Время обработки для каждой строки различно.Некоторые проблемы возникают быстро и едва ли не связаны с вводом-выводом, некоторые связаны с процессором.Связанные с процессором, не зависящие от задач, получат преимущество от распараллеливания, так что даже неэффективные способы назначения данных функции обработки все равно будут выгодны с точки зрения времени настенных часов.

Ярким примером является скрипт, который извлекает поля из строк, проверяет наличие различных побитовых флагов и записывает строки с определенными флагами в новый файл в совершенно новом формате.Это похоже на проблему, связанную с вводом-выводом, но когда я запустил ее с моей дешевой параллельной версией с каналами, это было примерно на 20% быстрее.Когда я запускаю его с пулом и картой, или ставлю в очередь в multiprocessing это всегда более чем на 100% медленнее.

Это было полезно?

Решение

Одна из лучших архитектур уже является частью ОС Linux.Никаких специальных библиотек не требуется.

Вам нужен дизайн "веером".

  1. "Основная" программа создает ряд подпроцессов, соединенных каналами.

  2. Основная программа считывает файл, записывая строки в каналы, выполняя минимальную фильтрацию, необходимую для передачи строк соответствующим подпроцессам.

Каждый подпроцесс, вероятно, должен представлять собой конвейер из отдельных процессов, которые читают и записывают из stdin.

Вам не нужна структура данных очереди, это именно то, что представляет собой конвейер в памяти - очередь байтов между двумя параллельными процессами.

Другие советы

Одна из стратегий заключается в присвоении каждому рабочему смещению, поэтому, если у вас восемь рабочих процессов, вы присваиваете им номера от 0 до 7.Рабочий номер 0 считывает первую запись, обрабатывает ее, затем пропускает 7 и переходит к обработке 8-й записи и т.д., рабочий номер 1 считывает вторую запись, затем пропускает 7 и обрабатывает 9-ю запись.........

У этой схемы есть ряд преимуществ.Не имеет значения, насколько велик файл, работа всегда распределяется равномерно, процессы на одном компьютере будут выполняться примерно с одинаковой скоростью и использовать одни и те же области буфера, так что вы не понесете чрезмерных затрат на ввод-вывод.Пока файл не был обновлен, вы можете повторно запускать отдельные потоки для восстановления после сбоев.

Вы не упоминаете, как вы обрабатываете строки;возможно, это самая важная информация.

Является ли каждая строка независимой?Зависит ли вычисление от того, что одна строка идет перед следующей?Должны ли они обрабатываться блоками?Сколько времени занимает обработка каждой строки?Существует ли этап обработки, который должен включать "все" данные в конце?Или можно отбросить промежуточные результаты и сохранить только текущий итог?Можно ли изначально разделить файл, разделив размер файла на количество потоков?Или она растет по мере того, как вы ее обрабатываете?

Если строки независимы и файл не увеличивается, единственная координация, которая вам нужна, - это распределение "начальных адресов" и "длин" для каждого из рабочих элементов;они могут независимо открывать файл и выполнять поиск в нем, а затем вы должны просто координировать их результаты;возможно, ожидая, пока N результатов вернутся в очередь.

Если строки не являются независимыми, ответ будет сильно зависеть от структуры файла.

Я знаю, что вы специально спрашивали о Python, но я посоветую вам взглянуть на Hadoop (http://hadoop.apache.org/):он реализует алгоритм Map and Reduce, который был специально разработан для решения такого рода проблем.

Удачи

Это во многом зависит от формата вашего файла.

Есть ли смысл разделять его где-нибудь?Или вам нужно разделить его на новой строке?Или вам нужно убедиться, что вы разделили его в конце определения объекта?

Вместо разделения файла вам следует использовать несколько считывателей для одного и того же файла, используя os.lseek чтобы перейти к соответствующей части файла.

Обновить:Постер добавил, что он хочет разделиться на новые линии.Тогда я предлагаю следующее:

Допустим, у вас есть 4 процесса.Тогда простое решение - обратиться к операционной системе.просматривайте 0%, 25%, 50% и 75% файла и считывайте байты, пока не дойдете до первой новой строки.Это ваша отправная точка для каждого процесса.Для этого вам не нужно разделять файл, просто найдите нужное место в большом файле в каждом процессе и начните чтение оттуда.

Фредрик Ландх Некоторые заметки о бенчмарке Wide Finder Тима Брэя это интересное чтение об очень похожем варианте использования, с большим количеством полезных советов.Различные другие авторы также реализовали то же самое, на некоторые из них даны ссылки из статьи, но вы можете попробовать поискать в Google "python wide finder" или что-то еще, чтобы найти еще.(где -то также было решение, основанное на multiprocessing модуль, но, похоже, он больше не доступен)

Если время выполнения велико, вместо того, чтобы каждый процесс считывал свою следующую строку через Queue, пусть процессы считывают пакеты строк.Таким образом, накладные расходы амортизируются по нескольким строкам (напримертысячи или более).

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top