Переписывание foreach с использованием IObservable и Реактивного фреймворка
-
18-09-2019 - |
Вопрос
Я нахожусь в VS2008 с Entity Framework.Я получаю доступ к объектам из базы данных, используя esql для определения функциональности WHERE IN.Я передаю тонну идентификаторов оператору select, поэтому разбиваю его на наборы по 800.Затем я объединяю результаты вместе из каждого фрагмента.Моя цель - получать результаты для каждого блока параллельно, а не ждать синхронно.Я установил Reactive Framework и почти уверен, что мне нужно использовать forkJoin.Однако я не могу понять, как преобразовать эту функцию, чтобы использовать ее.Вот мой существующий код:
public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
{
var chunkedIds = idList.Split(CHUNK_SIZE);
string entitySetName = typeof(TElement).Name + "Set";
var retList = new List<TElement>();
foreach (var idChunk in chunkedIds)
{
string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
retList.AddRange(query);
}
return retList;
}
Спасибо!
РЕДАКТИРОВАТЬ >>> Я изменил код, чтобы использовать Poor Man's, как показано ниже:
public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
{
var chunkedIds = idList.Split(CHUNK_SIZE);
string entitySetName = typeof(TElement).Name + "Set";
var chunkLists = new List<IEnumerable<TElement>>();
Parallel.ForEach(chunkedIds, idChunk =>
{
string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
chunkLists.Add(query.ToList());
});
var retList = new List<TElement>();
foreach (var chunkList in chunkLists)
{
retList.AddRange(chunkList);
}
return retList;
}
В первый раз это сработало отлично.Но во второй раз, когда я запустил его, я получил эту ошибку:
Соединение не было закрыто.Текущее состояние соединения - это подключение.Описание:Необработанное исключение возникло во время выполнения текущего веб-запроса.Пожалуйста, просмотрите трассировку стека для получения дополнительной информации об ошибке и о том, где она возникла в коде.
Сведения об исключении:Система.Исключение InvalidOperationException:Соединение не было закрыто.Текущее состояние соединения - это подключение.
Ошибка источника:
Строка 49:foreach (переменная IAsyncResult в списке результатов) Строка 50:{ Строка 51:del.Завершение вызова(IAsyncResult);Строка 52:IAsyncResult.AsyncWaitHandle.Закрыть();Строка 53:}
Интересно, что b / c Emre (автор библиотеки) отредактировал свой оригинальный пост, рассказывая о том, как он добавил эти строки кода для дополнительной безопасности.Правильно ли я его использую?Или его v1 все-таки был безопаснее?
Решение
У VS2010 действительно есть это с PLINQ.Использование расширений AsParallel().WithDegreeOfParallelism(nbProcessors)
сделал бы то, что вам нужно.
С VS2008 я использовал Параллель бедняка.Итератор ForEach от Эмре Айдинсерена в прошлом, когда я пытался обойти узкое место в производительности, попробуйте попробовать.
Редактировать:В ответ на ошибку, которую вы добавили, это может быть случайный снимок в темноте, но отдельные контексты для каждого потока ?Вот так:
Parallel.ForEach(chunkedIds, idChunk =>
{
ObjectContext context = new MyContext(connStr);//depending what's your config
// like, with or w/o conn string
string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
chunkLists.Add(query.ToList());
});
Возможно, вам придется изменить некоторые вещи (например, взять строку подключения из расширенного контекста для создания новых контекстов).