Question

Je pense que je devrai peut-être repenser ma conception.J'ai du mal à identifier un bug qui provoque le blocage complet de mon ordinateur, lançant parfois un HRESULT 0x8007000E à partir de VS 2010.

J'ai une application console (que je convertirai plus tard en service) qui gère le transfert de fichiers en fonction d'une file d'attente de base de données.

Je limite les threads autorisés à transférer.En effet, certains systèmes auxquels nous nous connectons ne peuvent contenir qu'un certain nombre de connexions provenant de certains comptes.

Par exemple, le système A ne peut accepter que 3 connexions simultanées (ce qui signifie 3 threads distincts).Chacun de ces threads a son propre objet de connexion unique, nous ne devrions donc pas rencontrer de problèmes de synchronisation puisqu'ils ne partagent pas de connexion.

Nous voulons traiter les fichiers de ces systèmes par cycles.Ainsi, par exemple, nous autoriserons 3 connexions pouvant transférer jusqu'à 100 fichiers par connexion.Cela signifie que pour déplacer 1 000 fichiers du système A, nous ne pouvons traiter que 300 fichiers par cycle, puisque 3 threads sont autorisés avec 100 fichiers chacun.Par conséquent, sur la durée de vie de ce transfert, nous aurons 10 threads.Nous ne pouvons en exécuter que 3 à la fois.Il y aura donc 3 cycles, et le dernier cycle n'utilisera qu'un seul thread pour transférer les 100 derniers fichiers.(3 threads x 100 fichiers = 300 fichiers par cycle)

L'architecture actuelle par exemple est :

  1. Un System.Threading.Timer vérifie la file d'attente toutes les 5 secondes pour quelque chose à faire en appelant GetScheduledTask()
  2. S'il n'y a rien à faire, GetScheduledTask() ne fait tout simplement rien
  3. S'il y a du travail, créez un thread ThreadPool pour traiter le travail [Work Thread A]
  4. Le fil de travail A détecte qu'il y a 1 000 fichiers à transférer.
  5. Le fil de travail A voit qu'il ne peut y avoir que 3 threads exécutés sur le système à partir duquel il récupère les fichiers.
  6. Le fil de travail A démarre trois nouveaux fils de travail [B, C, D] et les transferts
  7. Le fil de travail A attend B, C, D [WaitHandle.WaitAll(transfersArray)]
  8. Le fil de travail A voit qu'il y a encore plus de fichiers dans la file d'attente (devrait être 700 maintenant)
  9. Le fil de travail A crée un nouveau tableau à attendre [transfersArray = new TransferArray[3] qui est le maximum pour le système A, mais peut varier selon le système
  10. Le fil de travail A démarre trois nouveaux fils de travail [B, C, D] et les attend [WaitHandle.WaitAll(transfersArray)]
  11. Le processus se répète jusqu'à ce qu'il n'y ait plus de fichiers à déplacer.
  12. Le fil de travail A signale que c'est terminé

J'utilise ManualResetEvent pour gérer la signalisation.

Mes questions sont :

  1. Y a-t-il une circonstance flagrante qui pourrait provoquer une fuite de ressources ou un problème que je rencontre ?
  2. Dois-je parcourir le tableau après chaque WaitHandle.WaitAll(array) et appelle array[index].Dispose()?
  3. Le nombre de handles sous le Gestionnaire des tâches pour ce processus augmente lentement
  4. J'appelle la création initiale du Worker Thread A à partir d'un System.Threading.Timer.Est-ce qu'il va y avoir des problèmes avec ça ?Le code de cette minuterie est :

(Un code de classe pour la planification)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}
Était-ce utile?

La solution 4

Il s'avère que la source de cet étrange problème n'était pas liée à l'architecture mais plutôt à la conversion de la solution de 3.5 à 4.0.J'ai recréé la solution sans effectuer de modification de code et le problème ne s'est plus jamais reproduit.

Autres conseils

0x8007000E est une erreur de mémoire insuffisante.Cela et le nombre de handles semblent indiquer une fuite de ressources.Assurez-vous de disposer de chaque objet qui implémente IDisposable.Cela inclut les tableaux de ManualResetEvents que vous utilisez.

Si vous avez le temps, vous souhaiterez peut-être également passer à l'utilisation de .NET 4.0. Task classe;il a été conçu pour gérer des scénarios complexes comme celui-ci de manière beaucoup plus propre.En définissant l'enfant Task objets, vous pouvez réduire votre nombre global de threads (les threads sont assez chers non seulement en raison de la planification mais également en raison de leur espace de pile).

Je cherche des réponses à un problème similaire (le nombre de poignées augmente avec le temps).

J'ai jeté un œil à l'architecture de votre application et j'aimerais vous suggérer quelque chose qui pourrait vous aider :

Avez-vous entendu parler des IOCP (Input Output Completion Ports).

Je ne suis pas sûr de la difficulté d'implémenter cela en utilisant C# mais en C/C++ c'est un jeu d'enfant.En utilisant cela, vous créez un pool de threads unique (le nombre de threads dans ce pool est en général défini comme 2 x le nombre de processeurs ou de cœurs de processeurs sur le PC ou le serveur), vous associez ce pool à une poignée IOCP et le pool fait le fait le fait que le pool fait le fait que travail.Voir l'aide pour ces fonctions :CreateIoCompletionPort();PostQueuedCompletionStatus();GetQueuedCompletionStatus();

En général, la création et la fermeture de threads à la volée peuvent prendre du temps et entraîner des pénalités de performances et une fragmentation de la mémoire.Il existe des milliers de publications sur l'IOCP dans MSDN et Google.

Je pense que vous devriez reconsidérer complètement votre architecture.Le fait que vous ne puissiez avoir que 3 connexions simultanément vous supplie presque d'utiliser 1 thread pour générer la liste des fichiers et 3 threads pour les traiter.Votre thread producteur insèrera tous les fichiers dans une file d'attente et les 3 threads consommateurs seront retirés de la file d'attente et continueront le traitement à mesure que les éléments arriveront dans la file d'attente.Une file d'attente de blocage peut simplifier considérablement le code.Si vous utilisez .NET 4.0, vous pouvez profiter de BlocageCollection classe.

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

J'ai certainement simplifié à l'extrême les choses dans l'exemple ci-dessus, mais j'espère que vous comprenez l'idée générale.Remarquez à quel point cela est beaucoup plus simple car il n'y a pas beaucoup de synchronisation des threads (la plupart seront intégrés dans la file d'attente de blocage) et bien sûr, il n'y a aucune utilité de WaitHandle objets.Évidemment, vous devrez ajouter les mécanismes appropriés pour arrêter les threads correctement, mais cela devrait être assez simple.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top