Domanda

Penso che potrei aver bisogno di ripensare al mio design. Sto avendo difficoltà a restringere un bug che sta facendo appendere completamente il mio computer, a volte lanciare un hresult 0x8007000e da VS 2010.

Ho un'applicazione console (che succederò in seguito in un servizio) che gestisce il trasferimento di file in base a una coda di database.

I Accolgorei i fili autorizzati a trasferirsi. Questo perché alcuni sistemi che stiamo connettendo possono contenere solo un determinato numero di connessioni da determinati account.

Ad esempio, il sistema A può accettare solo 3 connessioni simultanee (che significa 3 fili separati). Ognuno di questi thread ha il proprio oggetto di connessione univoco, quindi non dovremmo eseguire alcun problema di sincronizzazione poiché non condividono una connessione.

Vogliamo elaborare i file da tali sistemi in cicli. Quindi, ad esempio, consentiremo 3 connessioni che possono trasferire fino a 100 file per connessione. Ciò significa, per spostare 1000 file dal sistema A, possiamo elaborare solo 300 file per ciclo, poiché sono consentiti 3 discussioni con 100 file ciascuno. Pertanto, per tutta la durata di questo trasferimento, avremo 10 fili. Possiamo solo eseguire 3 alla volta. Quindi, ci saranno 3 cicli e l'ultimo ciclo utilizzerà 1 thread per trasferire gli ultimi 100 file. (3 filetti x 100 file= 300 file per ciclo)

L'architettura corrente per esempio è:

    .
  1. A System.Threading.timer controlla la coda ogni 5 secondi per qualcosa da fare chiamando GiastcheduledTask ()
  2. Se non c'è niente da fare, gestcheduledtask () semplicemente non fa nulla
  3. Se c'è lavoro, creare un thread threadpool per elaborare il lavoro [Impianto di lavoro A]
  4. Discussione del lavoro A vede che ci sono 1000 file da trasferire
  5. Discussione del lavoro A vede che può avere solo 3 discussioni in esecuzione sul sistema che sta ricevendo file da
  6. Iscrizione di lavoro A inizia tre nuovi filetti di lavoro [B, C, D] e trasferimenti
  7. Filetto da lavoro A Waits for B, C, D [WaitHandle.WaitAll(transfersArray)]
  8. Filo di lavoro A vede che ci sono ancora più file in coda (dovrebbe essere 700 ora)
  9. Discussione del lavoro A Crea una nuova matrice per attendere [transfersArray = new TransferArray[3] che è il massimo per il sistema A, ma potrebbe variare sul sistema
  10. Discussione del lavoro Avvia tre nuovi filetti di lavoro [B, C, D] e attendono loro [WaitHandle.WaitAll(transfersArray)]
  11. Il processo si ripete fino a quando non ci sono più file da spostare.
  12. Lavorare un segnali che è fatto
  13. Sto usando manualresetent per gestire la segnalazione.

    Le mie domande sono:

      .
    1. C'è qualche circostanza raffreddale che causerebbe una perdita di risorse o un problema che sto vivendo?
    2. dovrei loop attraverso l'array dopo ogni WaitHandle.WaitAll(array) e chiamare array[index].Dispose()?
    3. Il conteggio della maniglia sotto il Task Manager per questo processo si insinua lentamente
    4. Sto chiamando la creazione iniziale del filo del lavoratore A da un sistema.Threading.timer. Ci saranno problemi con questo? Il codice per quel timer è:
    5. (alcuni codice classe per la pianificazione)

      private ManualResetEvent _ResetEvent;
      
      private void Start()
      {
          _IsAlive = true;
          ManualResetEvent transferResetEvent = new ManualResetEvent(false);
          //Set the scheduler timer to 5 second intervals
          _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
      }
      
      private void ScheduledTasks_Tick(object state)
      {
          ManualResetEvent resetEvent = null;
          try
          {
              resetEvent = (ManualResetEvent)state;
              //Block timer until GetScheduledTasks() finishes
              _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
              GetScheduledTasks();
          }
          finally
          {
              _ScheduledTasks.Change(5000, 5000);
              Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
              resetEvent.Set();
          }
      }
      
      
      private void GetScheduledTask()
      {
          try 
          { 
              //Check to see if the database connection is still up
              if (!_IsAlive)
              {
                  //Handle
                  _ConnectionLostNotification = true;
                  return;
              }
      
              //Get scheduled records from the database
              ISchedulerTask task = null;
      
              using (DataTable dt = FastSql.ExecuteDataTable(
                      _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                      new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
              {
                  if (dt != null)
                  {
                      if (dt.Rows.Count == 1)
                      {  //Only 1 row is allowed
                          DataRow dr = dt.Rows[0];
      
                          //Get task information
                          TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                          task = ScheduledTaskFactory.CreateScheduledTask(taskType);
      
                          task.Description = dr["Description"].ToString();
                          task.IsEnabled = (bool)dr["IsEnabled"];
                          task.IsProcessing = (bool)dr["IsProcessing"];
                          task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                          task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                          task.NextRun = (DateTime)dr["NextRun"];
                          task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                          task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                          task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                          task.SleepMinutes = (int)dr["SleepMinutes"];
                          task.ScheduleId = (int)dr["ScheduleId"];
                          task.CurrentRuns = (int)dr["CurrentRuns"];
                          task.TotalRuns = (int)dr["TotalRuns"];
      
                          SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                          //Queue up task to worker thread and start
                          ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                      }
                  }
              }
      
          }
          catch (Exception ex)
          {
              //Handle
          }
      }
      
      private void ThreadProc(object taskObject)
      {
          SchedulerTask task = (SchedulerTask)taskObject;
          ScheduledTaskEngine engine = null;
          try
          {
              engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
              engine.StartTask(task.Task);    
          }
          catch (Exception ex)
          {
              //Handle
          }
          finally
          {
              task.TaskResetEvent.Set();
              task.TaskResetEvent.Dispose();
          }
      }
      
      .

È stato utile?

Soluzione 4

Si scopre che la fonte di questo strano problema non era correlata all'architettura ma piuttosto a causa della conversione della soluzione da 3,5 a 4.0.Ho ricreato la soluzione, eseguendo modifiche al codice e il problema non si è mai verificato di nuovo.

Altri suggerimenti

0x8007000e è un errore fuori memoria.Quello e il conteggio della maniglia sembra puntare a una perdita di risorse.Assicurarsi di essere smaltimento di ogni oggetto che implementa IDisposable.Questo include gli array di ManualResetEvents che stai utilizzando.

Se hai tempo, è possibile anche convertirsi utilizzando la classe .NET 4.0 Task;È stato progettato per gestire scenari complessi come questo molto più pulito.Definizione degli oggetti Task del bambino, è possibile ridurre il numero totale di filefili (i fili sono piuttosto costosi non solo a causa della pianificazione ma anche a causa del loro spazio dello stack).

Sto cercando risposte a un problema simile (maniglie contare crescente nel tempo).

Ho dato un'occhiata alla tua architettura dell'applicazione e mi piace suggerire qualcosa che potrebbe aiutarti:

Hai sentito parlare di IOCP (porti di completamento dell'output di ingresso).

Non sono sicuro della propulsione di implementare questo usando C # ma in c / c ++ è un pezzo di torta. Usando ciò si crea un pool di filettatura unico (il numero di thread in quel pool è in generale definito come 2 x il numero di processori o core dei processori nel PC o nel server) Associare questa piscina a una maniglia IOCP e la piscina fa il lavoro. Vedi l'aiuto per queste funzioni: Creatoocompilazioneportport (); PosqueuedCompletionStatus (); GequeuedCompletionStatetus ();

In generale la creazione ed uscendo i thread al volo potrebbe richiedere tempo e conduce a sanzioni di prestazione e frammentazione della memoria. Ci sono migliaia di letteratura su IOCP in MSDN e in Google.

Penso che dovresti riconsiderare del tutto l'architettura. Il fatto che è possibile avere solo 3 connessioni simultanee si suppone quasi di utilizzare 1 thread per generare l'elenco di file e 3 fili per elaborarli. Il thread del produttore inserirebbe tutti i file in una coda e i 3 fili dei consumatori si dequerano e continuano a elaborare come elementi arrivano in coda. Una coda di blocco può semplificare significativamente il codice. Se stai utilizzando .NET 4.0, è possibile usufruire del BlockingCollection classe.

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}
.

Ho sicuramente semplificato le cose nell'esempio sopra, ma spero che tu abbia l'idea generale. AVVISO Come è molto più semplice in quanto non c'è molto nel modo di sincronizzazione del filo (la maggior parte sarà incorporata nella coda di blocco) e, naturalmente, non ci sono uso di oggetti WaitHandle. Ovviamente dovresti aggiungere i meccanismi corretti per spegnere i fili con grazia, ma dovrebbe essere abbastanza facile.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top