マルチプロセッシングのためにPythonで大きなファイルを分割する最良の方法は何ですか?
-
22-07-2019 - |
質問
多くの「恥ずかしいほど平行」に出くわします。 multiprocessing
モジュールで並列化するプロジェクト。ただし、多くの場合、巨大なファイル(2 GBを超える)を読み取り、それらを1行ずつ処理し、基本的な計算を実行してから結果を書き込みます。ファイルを分割してPythonのマルチプロセッシングモジュールを使用して処理する最良の方法は何ですか? multiprocessing
の Queue
または JoinableQueue
を使用する必要がありますか?または、 Queue
モジュール自体ですか?または、 multiprocessing
を使用してプロセスのプールで反復可能なファイルをマップする必要がありますか?私はこれらのアプローチを試しましたが、データを行ごとに分散する際のオーバーヘッドは計り知れません。 catファイルを使用して、軽量のパイプフィルターの設計に着手しました。 process1 --out-file out1 --num-processes 2 | process2 --out-file out2
。最初のプロセスの入力の特定の割合を2番目の入力に直接渡します(この投稿)が、ソリューションを完全にPythonに含めたい。
驚くべきことに、Pythonのドキュメントは、これを行うための標準的な方法を提案していません( multiprocessing
ドキュメントのプログラミングガイドラインに関する長いセクションにもかかわらず)。
ありがとう、 ビンス
追加情報:行ごとの処理時間は異なります。いくつかの問題は高速で、ほとんどI / Oバウンドではなく、CPUバウンドです。 CPUに依存する非依存タスクは、並列化によりポストを獲得します。そのため、処理機能にデータを割り当てる非効率的な方法でも、壁時計時間の観点からは有益です。
主な例は、行からフィールドを抽出し、さまざまなビットごとのフラグをチェックし、特定のフラグを持つ行をまったく新しい形式で新しいファイルに書き込むスクリプトです。これはI / Oバウンドの問題のように見えますが、パイプを使用する安価な同時バージョンで実行すると、約20%高速になりました。プールとマップ、または multiprocessing
のキューで実行すると、常に100%以上遅くなります。
解決
最高のアーキテクチャの1つはすでにLinux OSの一部です。特別なライブラリは必要ありません。
「ファンアウト」が必要な場合デザイン。
-
" main"プログラムはパイプで接続されたいくつかのサブプロセスを作成します。
-
メインプログラムはファイルを読み取り、適切なサブプロセスに行を処理するために必要な最小限のフィルタリングを行うパイプに行を書き込みます。
各サブプロセスは、stdinから読み取りおよび書き込みを行う個別のプロセスのパイプラインである必要があります。
キューデータ構造は必要ありません。これはインメモリパイプラインとまったく同じです。2つの同時プロセス間のバイトのキューです。
他のヒント
1つの戦略は、各ワーカーにオフセットを割り当てることです。したがって、8つのワーカープロセスを割り当てる場合、0〜7の番号を割り当てます。ワーカー番号0は最初のレコードを読み取り、7をスキップして8番目のレコードを処理します。ワーカー番号1は2番目のレコードを読み取り、7をスキップして9番目のレコードを処理します.........
このスキームには多くの利点があります。ファイルの大きさは問題ではなく、常に均等に分割され、同じマシン上のプロセスはほぼ同じ速度で処理され、同じバッファー領域を使用するため、過度のI / Oオーバーヘッドが発生しません。ファイルが更新されていない限り、個々のスレッドを再実行して障害から回復できます。
行の処理方法については言及しないでください。おそらく最も重要な情報です。
各行は独立していますか?計算は次の行の前に来る行に依存していますか?ブロックで処理する必要がありますか?各行の処理にはどれくらい時間がかかりますか? 「すべて」を組み込む必要がある処理ステップはありますか?最後のデータ?または、中間結果を破棄して、現在の合計だけを維持できますか?ファイルサイズをスレッド数で除算することにより、最初にファイルを分割できますか?または、処理するにつれて成長しますか?
行が独立していて、ファイルが大きくならない場合、必要な調整は「開始アドレス」を整理することだけです。および「長さ」各労働者に。彼らは独立してファイルを開いてシークすることができます。その後、単に結果を調整するだけです。おそらく、N個の結果がキューに戻るのを待つことによって。
行が独立していない場合、答えはファイルの構造に大きく依存します。
Pythonについて具体的に尋ねられたことは知っていますが、Hadoop( http:// hadoopをご覧になることをお勧めします。 apache.org/ ):この種の問題に対処するために特別に設計されたMap and Reduceアルゴリズムを実装しています。
幸運
ファイルの形式に大きく依存します。
どこかで分割しても意味がありますか?または、新しい行で分割する必要がありますか?または、オブジェクト定義の最後で分割することを確認する必要がありますか?
ファイルを分割する代わりに、 os.lseek
を使用してファイルの適切な部分にジャンプし、同じファイルで複数のリーダーを使用する必要があります。
更新:ポスターは、新しい行で分割したいことを追加しました。次に、以下を提案します。
4つのプロセスがあるとします。次に、os.lseekをファイルの0%、25%、50%、および75%に設定し、最初の新しい行に達するまでバイトを読み取るという単純な解決策があります。それが各プロセスの出発点です。これを行うためにファイルを分割する必要はありません。各プロセスで大きなファイルの適切な場所を探し、そこから読み取りを開始するだけです。
Fredrik Lundhの Tim BrayのWide Finder Benchmarkに関するいくつかのメモは興味深い読み物です。非常によく似たユースケースについて、多くの良いアドバイスがあります。他のさまざまな著者も同じことを実装しており、一部は記事からリンクされていますが、「python wide finder」のグーグル検索を試してみてください。または何かを見つけるために何か。 ( multiprocessing
モジュールに基づいたソリューションもありましたが、もう利用できないようです)
実行時間が長い場合、各プロセスに Queue
を介して次の行を読み込ませる代わりに、プロセスに行のバッチを読み込ませます。これにより、オーバーヘッドは複数の行(たとえば、数千行以上)で償却されます。