Переписывание foreach с использованием IObservable и Реактивного фреймворка

StackOverflow https://stackoverflow.com/questions/2201753

Вопрос

Я нахожусь в VS2008 с Entity Framework.Я получаю доступ к объектам из базы данных, используя esql для определения функциональности WHERE IN.Я передаю тонну идентификаторов оператору select, поэтому разбиваю его на наборы по 800.Затем я объединяю результаты вместе из каждого фрагмента.Моя цель - получать результаты для каждого блока параллельно, а не ждать синхронно.Я установил Reactive Framework и почти уверен, что мне нужно использовать forkJoin.Однако я не могу понять, как преобразовать эту функцию, чтобы использовать ее.Вот мой существующий код:

    public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
    {
        var chunkedIds = idList.Split(CHUNK_SIZE);
        string entitySetName = typeof(TElement).Name + "Set";
        var retList = new List<TElement>();
        foreach (var idChunk in chunkedIds)
        {
            string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
            ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
            query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
            retList.AddRange(query);
        }
        return retList;
    }

Спасибо!

РЕДАКТИРОВАТЬ >>> Я изменил код, чтобы использовать Poor Man's, как показано ниже:

    public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
    {
        var chunkedIds = idList.Split(CHUNK_SIZE);
        string entitySetName = typeof(TElement).Name + "Set";
        var chunkLists = new List<IEnumerable<TElement>>();
        Parallel.ForEach(chunkedIds, idChunk =>
        {
            string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
            ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
            query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
            chunkLists.Add(query.ToList());
        });
        var retList = new List<TElement>();
        foreach (var chunkList in chunkLists)
        {
            retList.AddRange(chunkList);
        }
        return retList;
    }

В первый раз это сработало отлично.Но во второй раз, когда я запустил его, я получил эту ошибку:

Соединение не было закрыто.Текущее состояние соединения - это подключение.Описание:Необработанное исключение возникло во время выполнения текущего веб-запроса.Пожалуйста, просмотрите трассировку стека для получения дополнительной информации об ошибке и о том, где она возникла в коде.

Сведения об исключении:Система.Исключение InvalidOperationException:Соединение не было закрыто.Текущее состояние соединения - это подключение.

Ошибка источника:

Строка 49:foreach (переменная IAsyncResult в списке результатов) Строка 50:{ Строка 51:del.Завершение вызова(IAsyncResult);Строка 52:IAsyncResult.AsyncWaitHandle.Закрыть();Строка 53:}

Интересно, что b / c Emre (автор библиотеки) отредактировал свой оригинальный пост, рассказывая о том, как он добавил эти строки кода для дополнительной безопасности.Правильно ли я его использую?Или его v1 все-таки был безопаснее?

Это было полезно?

Решение

У VS2010 действительно есть это с PLINQ.Использование расширений AsParallel().WithDegreeOfParallelism(nbProcessors) сделал бы то, что вам нужно.

С VS2008 я использовал Параллель бедняка.Итератор ForEach от Эмре Айдинсерена в прошлом, когда я пытался обойти узкое место в производительности, попробуйте попробовать.

Редактировать:В ответ на ошибку, которую вы добавили, это может быть случайный снимок в темноте, но отдельные контексты для каждого потока ?Вот так:

Parallel.ForEach(chunkedIds, idChunk =>
    {
        ObjectContext context = new MyContext(connStr);//depending what's your config
                                                       // like, with or w/o conn string

        string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
        ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
        query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
        chunkLists.Add(query.ToList());
    });

Возможно, вам придется изменить некоторые вещи (например, взять строку подключения из расширенного контекста для создания новых контекстов).

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top