Вопрос

Я думаю, мне, возможно, придется переосмыслить свой дизайн.Мне трудно выявить ошибку, из-за которой мой компьютер полностью зависает, иногда выдавая HRESULT 0x8007000E из VS 2010.

У меня есть консольное приложение (которое я позже преобразую в сервис), которое обрабатывает передачу файлов на основе очереди базы данных.

Я ограничиваю количество потоков, разрешенных для передачи.Это связано с тем, что некоторые системы, к которым мы подключаемся, могут содержать только определенное количество подключений от определенных учетных записей.

Например, Система А может принимать только 3 одновременных соединения (что означает 3 отдельных потока).Каждый из этих потоков имеет свой собственный уникальный объект соединения, поэтому у нас не должно возникнуть проблем с синхронизацией, поскольку они не используют совместное соединение.

Мы хотим обрабатывать файлы из этих систем циклически.Так, например, мы разрешим 3 соединения, которые могут передавать до 100 файлов за одно соединение.Это означает, что для перемещения 1000 файлов из системы А мы можем обработать только 300 файлов за цикл, поскольку разрешено 3 потока по 100 файлов в каждом.Следовательно, за время существования этой передачи у нас будет 10 потоков.Мы можем запускать только 3 одновременно.Итак, будет 3 цикла, и последний цикл будет использовать только 1 поток для передачи последних 100 файлов.(3 потока x 100 файлов = 300 файлов за цикл)

Текущая архитектура в качестве примера:

  1. System.Threading.Timer проверяет очередь каждые 5 секунд на наличие необходимых действий, вызывая GetScheduledTask().
  2. Если делать нечего, GetScheduledTask() просто ничего не делает.
  3. Если есть работа, создайте поток ThreadPool для ее обработки [Рабочий поток A]
  4. Рабочий поток А видит, что нужно передать 1000 файлов.
  5. Рабочий поток A видит, что в системе, из которой он получает файлы, может быть запущено только 3 потока.
  6. Рабочий поток A запускает три новых рабочих потока [B,C,D] и передает
  7. Рабочий поток A ожидает B, C, D [WaitHandle.WaitAll(transfersArray)]
  8. Рабочий поток A видит, что в очереди еще есть файлы (теперь их должно быть 700).
  9. Рабочий поток A создает новый массив для ожидания [transfersArray = new TransferArray[3] это максимум для системы А, но может варьироваться в зависимости от системы
  10. Рабочий поток A запускает три новых рабочих потока [B,C,D] и ждет их. [WaitHandle.WaitAll(transfersArray)]
  11. Процесс повторяется до тех пор, пока не останется файлов для перемещения.
  12. Рабочая нить A сигнализирует о завершении

Я использую ManualResetEvent для обработки сигналов.

Мои вопросы:

  1. Есть ли какие-либо явные обстоятельства, которые могут вызвать утечку ресурсов или проблему, с которой я столкнулся?
  2. Должен ли я перебирать массив после каждого WaitHandle.WaitAll(array) и позвони array[index].Dispose()?
  3. Число дескрипторов этого процесса в диспетчере задач медленно увеличивается.
  4. Я вызываю первоначальное создание рабочего потока A из System.Threading.Timer.Будут ли с этим какие-то проблемы?Код этого таймера:

(Некоторый код класса для планирования)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}
Это было полезно?

Решение 4

Оказывается, источник этой странной проблемы был связан не с архитектурой, а с преобразованием решения из версии 3.5 в версию 4.0.Я воссоздал решение, не внося никаких изменений в код, и проблема больше никогда не возникала.

Другие советы

0x8007000E — ошибка нехватки памяти.Это, а также количество дескрипторов, похоже, указывают на утечку ресурсов.Убедитесь, что вы избавляетесь от каждого объекта, реализующего IDisposable.Сюда входят массивы ManualResetEventвы используете.

Если у вас есть время, вы также можете перейти на использование .NET 4.0. Task сорт;он был разработан для более четкой обработки подобных сложных сценариев.Определяя ребенка Task объектов, вы можете уменьшить общее количество потоков (потоки довольно дороги не только из-за планирования, но и из-за их стекового пространства).

Я ищу ответы на аналогичную проблему (количество дескрипторов увеличивается с течением времени).

Я изучил архитектуру вашего приложения и хотел бы предложить вам кое-что, что могло бы вам помочь:

Слышали ли вы о IOCP (порты завершения ввода-вывода).

Я не уверен в сложности реализации этого с помощью C#, но на C/C++ это проще простого.Используя это, вы создаете уникальный пул потоков (количество потоков в этом пуле, в целом, определяется как 2 x количество процессоров или процессоров ядер на ПК или на сервере). работа.См. справку по этим функциям:CreateIoCompletionPort();PostQueuedCompletionStatus();GetQueuedCompletionStatus();

В общем, создание потоков и выход из них на лету может занять много времени и привести к снижению производительности и фрагментации памяти.В MSDN и Google есть тысячи литературы по IOCP.

Я думаю, вам следует полностью пересмотреть свою архитектуру.Тот факт, что вы можете иметь только 3 одновременных соединения, практически заставляет вас использовать 1 поток для генерации списка файлов и 3 потока для их обработки.Ваш поток-производитель вставит все файлы в очередь, а три потока-потребителя выйдут из очереди и продолжат обработку по мере поступления элементов в очередь.Блокирующая очередь может существенно упростить код.Если вы используете .NET 4.0, вы можете воспользоваться преимуществами БлокированиеСбор сорт.

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

В приведенном выше примере я определенно слишком упростил ситуацию, но надеюсь, что вы уловили общую идею.Обратите внимание, что это намного проще, поскольку в синхронизации потоков не так уж много (большинство из них будет встроено в очередь блокировки) и, конечно же, нет никакой пользы от WaitHandle объекты.Очевидно, вам придется добавить правильные механизмы для корректного закрытия потоков, но это должно быть довольно легко.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top