Pergunta

Eu acho que eu precise re-pensar o meu projeto.Eu estou tendo um tempo difícil estreitamento um bug que está causando o meu computador para travar completamente, às vezes jogando um HRESULT 0x8007000E do VS 2010.

Eu tenho uma aplicação de consola (que eu vou mais tarde, converter para um serviço) que lida com a transferência de arquivos com base em um banco de dados de fila.

Eu sou a otimização de threads autorizado a transferir.Isto é porque alguns sistemas que estão a ligar só pode conter um certo número de ligações de determinadas contas.

Por exemplo, o Sistema pode aceitar apenas 3 conexões simultâneas (o que significa que 3 segmentos separados).Cada um desses segmentos tem o seu próprio e único objeto de conexão, por isso não devemos executar em qualquer problemas de sincronização, pois eles não têm o compartilhamento de uma conexão.

Queremos processar os arquivos desses sistemas em ciclos.Assim, por exemplo, vamos permitir-3 conexões que podem transferir até 100 arquivos por conexão.Isso significa que, para mover 1000 arquivos do Sistema, só podemos processar 300 arquivos por ciclo, uma vez que 3 segmentos são permitidos com 100 arquivos de cada um.Portanto, durante a vida útil da transferência, teremos 10 segmentos.Só podemos executar 3 ao mesmo tempo.Assim, não será de 3 ciclos, e o último ciclo vai usar apenas 1 thread para transferir os últimos 100 arquivos.(3 segmentos x 100 arquivos = 300 arquivos por ciclo)

A atual arquitetura por exemplo é:

  1. Um Sistema.Threading.Timer verifica a fila a cada 5 segundos por algo para fazer, chamando GetScheduledTask()
  2. Se não há nada, GetScheduledTask() simplesmente não faz nada
  3. Se há trabalho, criar um pool de threads thread para processar o trabalho [de Trabalho A Thread]
  4. O trabalho de Um Segmento, vê-se que há 1000 arquivos para transferência
  5. O trabalho de enfiar Um vê que ele pode ter apenas 3 threads em execução para o sistema é a obtenção de arquivos a partir de
  6. Trabalho Thread inicia três novos segmentos de trabalho [B,C,D] e transferências
  7. Trabalho Thread aguarda B,C,D [WaitHandle.WaitAll(transfersArray)]
  8. O trabalho de Um Segmento, vê-se que há ainda mais arquivos na fila (deve ser de 700 agora)
  9. Trabalho Thread cria uma nova matriz para esperar [transfersArray = new TransferArray[3] qual é o máximo para o Sistema, mas pode variar em sistema de
  10. Trabalho Thread inicia três novos segmentos de trabalho [B,C,D] e espera [WaitHandle.WaitAll(transfersArray)]
  11. O processo se repete até que não existam mais ficheiros para mover.
  12. O trabalho de Um Segmento sinais de que ele é feito

Eu estou usando ManualResetEvent para lidar com a sinalização.

Minhas perguntas são:

  1. Existe alguma flagrante circunstância que poderia causar um vazamento de recursos ou problema que estou enfrentando?
  2. Devo loop thru a matriz depois de cada WaitHandle.WaitAll(array) e chamada de array[index].Dispose()?
  3. A contagem de Identificador sob o Gestor de Tarefas para este processo arrasta-se lentamente até
  4. Eu estou chamando a criação inicial da Thread de Trabalho de um de Um Sistema.Threading.Temporizador.Lá vai haver problemas com isso?O código para que o timer é:

(Alguns de classe de código para agendamento)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}
Foi útil?

Solução 4

Acontece que a origem deste problema estranho que não está relacionada com a arquitetura, mas sim por causa da conversão de solução de 3.5 a 4.0.Eu re-criado a solução, a efectuar nenhuma alteração de código, e o problema nunca ocorreu novamente.

Outras dicas

0x8007000E é um fora-de-erro de memória.Que e a contagem de identificador parecem apontar para uma fuga de recursos.Garantir que você está descartando qualquer objeto que implementa IDisposable.Isso inclui as matrizes de ManualResetEvents que você está usando.

Se você tiver tempo, você também poderá converter usando o .NET 4.0 Task de classe;ele foi projetado para lidar com cenários complexos como este, muito mais limpa.Pela definição de criança Task objetos, você pode reduzir o volume geral de contagem de segmentos (threads são muito caros, não só devido agendamento, mas também por causa de seu espaço de pilha).

Estou a procura de respostas para um problema semelhante (número de Identificadores de aumentar ao longo do tempo).

Dei uma olhada no seu aplicativo de arquitetura e gostaria de sugerir-lhe algo que poderia ajudá-lo:

Você já ouviu falar sobre o IOCP (Entrada e Saída de Conclusão de Portas).

Eu não tenho certeza de que a dificuldade para implementar isso usando C#, mas em C/C++ é um pedaço de bolo.Usando isso, você pode criar um único pool de thread (O número de segmentos em que a piscina é em geral definido como 2 x o número de processadores ou núcleos de processadores no computador ou servidor) Você associar este conjunto para uma Alça IOCP e o conjunto faz a obra.Consulte a ajuda para estas funções:CreateIoCompletionPort();PostQueuedCompletionStatus();GetQueuedCompletionStatus();

Em Geral, a criação e sair de threads na mosca pode ser demorado e leva a penalidades de desempenho e fragmentação de memória.Existem milhares de literatura sobre o IOCP no MSDN e no google.

Eu acho que você deveria rever seus arquitetura completamente.O fato de que você só pode ter 3, simultaneamente, conexões é quase implorando para que você use 1 thread para gerar a lista de arquivos e 3 segmentos para processá-los.O produtor thread seria inserir todos os arquivos em uma fila e o 3 do consumidor threads irá retirar da fila e continuar o processamento, como itens de chegar na fila.Uma fila de bloquear significativamente pode simplificar o código.Se você estiver usando .NET 4.0, em seguida, você pode aproveitar o BlockingCollection de classe.

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

Eu definitivamente simplista coisas no exemplo acima, mas eu espero que você tenha a idéia geral.Observe como isto é muito mais simples, não há muito na forma de sincronização de thread (a maioria vai ser incorporado no bloqueio de fila) e, claro, não há utilização de WaitHandle os objetos.Obviamente, você teria que adicionar o correto mecanismos para encerrar as threads normalmente, mas que deve ser bastante fácil.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top