IObservable および Reactive Framework を使用した foreach の書き換え
-
18-09-2019 - |
質問
私はEntity Frameworkを使用したVS2008を使用しています。WHERE IN 機能に esql を使用してデータベースからオブジェクトにアクセスしています。大量の ID を select ステートメントに渡すので、それを 800 個のセットに分割します。次に、各チャンクの結果を結合します。私の目標は、同期的に待つのではなく、各チャンクの結果を並行して取得することです。Reactive Framework をインストールしましたが、ForkJoin を使用する必要があると確信しています。ただし、この関数を使用するために変換する方法がわかりません。私の既存のコードは次のとおりです。
public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
{
var chunkedIds = idList.Split(CHUNK_SIZE);
string entitySetName = typeof(TElement).Name + "Set";
var retList = new List<TElement>();
foreach (var idChunk in chunkedIds)
{
string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
retList.AddRange(query);
}
return retList;
}
ありがとう!
編集>>>コードを変更して、以下のように貧しい人間を使用します。
public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
{
var chunkedIds = idList.Split(CHUNK_SIZE);
string entitySetName = typeof(TElement).Name + "Set";
var chunkLists = new List<IEnumerable<TElement>>();
Parallel.ForEach(chunkedIds, idChunk =>
{
string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
chunkLists.Add(query.ToList());
});
var retList = new List<TElement>();
foreach (var chunkList in chunkLists)
{
retList.AddRange(chunkList);
}
return retList;
}
初めてでもうまくいきました。しかし、2 回目に実行すると、次のエラーが発生しました。
接続が閉じられていませんでした。接続の現在の状態は接続中です。説明:現在の Web リクエストの実行中に、ハンドルされない例外が発生しました。エラーとコード内のどこでエラーが発生したかの詳細については、スタック トレースを確認してください。
例外の詳細:System.InvalidOperationException:接続が閉じられていませんでした。接続の現在の状態は接続中です。
ソースエラー:
49行目:foreach(var iasyncresult in ructeslist)行50:{行51:del.EndInvoke(iAsyncResult);52行目:iAsyncResult.AsyncWaitHandle.Close();53行目:}
興味深いことに、B/C Emre (ライブラリの作成者) は、安全性を高めるためにこれらのコード行をどのように追加したかについて元の投稿を編集しています。私は正しく使っていますか?それとも結局のところ、彼の v1 の方が安全だったのでしょうか?
解決
VS2010 には PLINQ が備わっています。拡張機能の使用 AsParallel().WithDegreeOfParallelism(nbProcessors)
あなたが必要なことをするでしょう。
VS2008では、私は使用しました Poor Man's Parallel.ForEach Iterator by Emre Aydinceren 以前、パフォーマンスのボトルネックを回避しようとしていたときに、試してみました。
編集:あなたが追加したエラーへの反応として、それは暗闇の中のランダムなショットである可能性がありますが、スレッドごとに個別のコンテキストがありますか?そのようです:
Parallel.ForEach(chunkedIds, idChunk =>
{
ObjectContext context = new MyContext(connStr);//depending what's your config
// like, with or w/o conn string
string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
chunkLists.Add(query.ToList());
});
いくつかの調整が必要な場合があります (新しいコンテキストをインスタンス化するために拡張されたコンテキストから接続文字列を取得するなど)。