Frage

Ich denke, ich muss mein Design möglicherweise überdenken.Es fällt mir schwer, einen Fehler einzugrenzen, der dazu führt, dass mein Computer völlig hängen bleibt und manchmal ein HRESULT 0x8007000E von VS 2010 auslöst.

Ich habe eine Konsolenanwendung (die ich später in einen Dienst umwandeln werde), die die Übertragung von Dateien basierend auf einer Datenbankwarteschlange übernimmt.

Ich drossle die Threads, die übertragen werden dürfen.Dies liegt daran, dass einige Systeme, zu denen wir eine Verbindung herstellen, nur eine bestimmte Anzahl von Verbindungen von bestimmten Konten aus aufnehmen können.

Beispielsweise kann System A nur drei gleichzeitige Verbindungen akzeptieren (was drei separate Threads bedeutet).Jeder dieser Threads hat sein eigenes einzigartiges Verbindungsobjekt, daher sollten keine Synchronisierungsprobleme auftreten, da sie keine gemeinsame Verbindung haben.

Wir wollen die Dateien aus diesen Systemen zyklisch verarbeiten.So lassen wir beispielsweise drei Verbindungen zu, die bis zu 100 Dateien pro Verbindung übertragen können.Das bedeutet, dass wir zum Verschieben von 1000 Dateien von System A nur 300 Dateien pro Zyklus verarbeiten können, da 3 Threads mit jeweils 100 Dateien zulässig sind.Daher werden wir über die Lebensdauer dieser Übertragung 10 Threads haben.Wir können nur 3 gleichzeitig ausführen.Es gibt also drei Zyklen, und im letzten Zyklus wird nur ein Thread zum Übertragen der letzten 100 Dateien verwendet.(3 Threads x 100 Dateien = 300 Dateien pro Zyklus)

Die aktuelle Architektur ist beispielsweise:

  1. Ein System.Threading.Timer überprüft die Warteschlange alle 5 Sekunden auf etwas, was zu tun ist, indem er GetScheduledTask() aufruft.
  2. Wenn nichts zu tun ist, führt GetScheduledTask() einfach nichts aus
  3. Wenn Arbeit vorhanden ist, erstellen Sie einen ThreadPool-Thread, um die Arbeit zu verarbeiten [Arbeitsthread A]
  4. Arbeitsthread A sieht, dass 1000 Dateien übertragen werden müssen
  5. Arbeitsthread A stellt fest, dass auf dem System, von dem er Dateien abruft, nur drei Threads ausgeführt werden können
  6. Arbeitsthread A startet drei neue Arbeitsthreads [B,C,D] und überträgt sie
  7. Arbeitsthread A wartet auf B,C,D [WaitHandle.WaitAll(transfersArray)]
  8. Arbeitsthread A sieht, dass sich noch mehr Dateien in der Warteschlange befinden (sollten jetzt 700 sein)
  9. Arbeitsthread A erstellt ein neues Array, auf das gewartet werden soll [transfersArray = new TransferArray[3] Dies ist das Maximum für System A, kann jedoch je nach System variieren
  10. Arbeitsthread A startet drei neue Arbeitsthreads [B,C,D] und wartet auf diese [WaitHandle.WaitAll(transfersArray)]
  11. Der Vorgang wird wiederholt, bis keine Dateien mehr zum Verschieben vorhanden sind.
  12. Arbeitsfaden A signalisiert, dass er fertig ist

Ich verwende ManualResetEvent, um die Signalisierung zu verarbeiten.

Meine Fragen sind:

  1. Gibt es einen eklatanten Umstand, der zu einem Ressourcenleck oder einem Problem führen würde?
  2. Soll ich das Array nach jedem durchlaufen? WaitHandle.WaitAll(array) und Ruf an array[index].Dispose()?
  3. Die Handle-Anzahl im Task-Manager für diesen Prozess steigt langsam an
  4. Ich rufe die anfängliche Erstellung von Worker-Thread A aus einem System.Threading.Timer auf.Wird es dabei irgendwelche Probleme geben?Der Code für diesen Timer lautet:

(Einige Klassencodes für die Planung)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}
War es hilfreich?

Lösung 4

Es stellte sich heraus, dass die Ursache dieses seltsamen Problems nicht in der Architektur lag, sondern vielmehr in der Konvertierung der Lösung von 3.5 auf 4.0.Ich habe die Lösung neu erstellt, ohne Codeänderungen vorzunehmen, und das Problem ist nie wieder aufgetreten.

Andere Tipps

0x8007000E ist ein Fehler wegen unzureichendem Arbeitsspeicher.Dies und die Anzahl der Handles scheinen auf ein Ressourcenleck hinzuweisen.Stellen Sie sicher, dass Sie jedes implementierte Objekt entsorgen IDisposable.Dazu gehören die Arrays von ManualResetEvents, das du verwendest.

Wenn Sie Zeit haben, möchten Sie möglicherweise auch auf die Verwendung von .NET 4.0 umsteigen Task Klasse;Es wurde entwickelt, um komplexe Szenarien wie dieses viel sauberer zu bewältigen.Durch die Definition von Kind Task Objekte können Sie die Gesamtzahl der Threads reduzieren (Threads sind nicht nur wegen der Planung, sondern auch wegen ihres Stapelplatzes ziemlich teuer).

Ich suche nach Antworten auf ein ähnliches Problem (Anzahl der Handles nimmt mit der Zeit zu).

Ich habe mir Ihre Anwendungsarchitektur angesehen und möchte Ihnen etwas vorschlagen, das Ihnen weiterhelfen könnte:

Haben Sie schon von IOCP (Input Output Completion Ports) gehört?

Ich bin mir nicht sicher, wie schwierig es ist, dies mit C# zu implementieren, aber in C/C++ ist es ein Kinderspiel.Durch die Verwendung dieses Jahres erstellen Sie einen eindeutigen Thread -Pool (die Anzahl der Threads in diesem Pool ist im Allgemeinen als 2 x definiert. arbeiten.Sehen Sie sich die Hilfe zu diesen Funktionen an:CreateIoCompletionPort();PostQueuedCompletionStatus();GetQueuedCompletionStatus();

Im Allgemeinen kann das schnelle Erstellen und Beenden von Threads zeitaufwändig sein und zu Leistungseinbußen und Speicherfragmentierung führen.Es gibt Tausende von Literatur zu IOCP in MSDN und in Google.

Ich denke, Sie sollten Ihre Architektur völlig überdenken.Die Tatsache, dass Sie nur drei gleichzeitige Verbindungen haben können, schreit fast dazu, einen Thread zum Generieren der Dateiliste und drei Threads für deren Verarbeitung zu verwenden.Ihr Produzenten-Thread würde alle Dateien in eine Warteschlange einfügen und die drei Konsumenten-Threads werden aus der Warteschlange entfernt und mit der Verarbeitung fortfahren, sobald Elemente in der Warteschlange ankommen.Eine Blockierungswarteschlange kann den Code erheblich vereinfachen.Wenn Sie .NET 4.0 verwenden, können Sie die Vorteile nutzen BlockingCollection Klasse.

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

Ich habe die Dinge im obigen Beispiel definitiv zu stark vereinfacht, aber ich hoffe, dass Sie eine allgemeine Vorstellung davon bekommen.Beachten Sie, dass dies viel einfacher ist, da der Thread-Synchronisierung nicht viel im Wege steht (die meisten werden in die Blockierungswarteschlange eingebettet) und es natürlich keinen Nutzen von hat WaitHandle Objekte.Natürlich müssten Sie die richtigen Mechanismen hinzufügen, um die Threads ordnungsgemäß herunterzufahren, aber das sollte ziemlich einfach sein.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top