Как задачи в задаче параллельной библиотеки влияют на активность?

StackOverflow https://stackoverflow.com/questions/4340948

  •  30-09-2019
  •  | 
  •  

Вопрос

Перед использованием параллельной библиотеки задачи я часто использую CorrelationManager.actixiveID, чтобы отслеживать отслеживание / сообщение об ошибке с несколькими потоками.

AficeID хранится в потоке локальное хранилище, поэтому каждый нить получает свою собственную копию. Идея состоит в том, что когда вы стреляете поток (активность), вы назначаете новую активность. Активность будет записана в журналы с любой другой информацией о трассировке, что позволяет выделить информацию о трассировке для одной «активности». Это действительно полезно с WCF, поскольку активность может быть перенесена на сервисный компонент.

Вот пример того, о чем я говорю:

static void Main(string[] args)
{
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) =>
    {
        DoWork();
    }));
}

static void DoWork()
{
    try
    {
        Trace.CorrelationManager.ActivityId = Guid.NewGuid();
        //The functions below contain tracing which logs the ActivityID.
        CallFunction1();
        CallFunction2();
        CallFunction3();
    }
    catch (Exception ex)
    {
        Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString());
    }
}

Теперь, с TPL, мое понимание состоит в том, что несколько задач обмениваются потоками. Значит ли это, что активность подвержена повторному среднему заданию (по другой задаче)? Есть ли новый механизм для борьбы с трассировкой активности?

Это было полезно?

Решение

Я провел некоторые эксперименты, и оказывается, предположение, в моем вопросе неверно - несколько задач, созданных с TPL, не работают в одном и том же потоке одновременно.

ThreadLocalStorage безопасен в использовании с TPL в .NET 4.0, поскольку нить может использоваться только одной задачей одновременно.

Предположение о том, что задачи могут делиться потоками одновременно были основаны на собеседовании, о котором я слышал C # 5.0 на Dotnetrocks. (Извините, я не могу вспомнить, что показать это было) - так что мой вопрос может (или не может) стать актуальным в ближайшее время.

Мой эксперимент начинает ряд задач, и записывает, сколько задач бежала, как долго они взяли, и сколько потоков были потреблены. Код ниже, если кто-нибудь хотел бы повторить его.

class Program
{
    static void Main(string[] args)
    {
        int totalThreads = 100;
        TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
        Task task = null;
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        Task[] allTasks = new Task[totalThreads];
        for (int i = 0; i < totalThreads; i++)
        {
            task = Task.Factory.StartNew(() =>
           {
               DoLongRunningWork();
           }, taskCreationOpt);

            allTasks[i] = task;
        }

        Task.WaitAll(allTasks);
        stopwatch.Stop();

        Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
        Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
        Console.ReadKey();
    }


    private static List<int> threadIds = new List<int>();
    private static object locker = new object();
    private static void DoLongRunningWork()
    {
        lock (locker)
        {
            //Keep a record of the managed thread used.
            if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
                threadIds.Add(Thread.CurrentThread.ManagedThreadId);
        }
        Guid g1 = Guid.NewGuid();
        Trace.CorrelationManager.ActivityId = g1;
        Thread.Sleep(3000);
        Guid g2 = Trace.CorrelationManager.ActivityId;
        Debug.Assert(g1.Equals(g2));
    }
}

Выход (конечно, это будет зависеть от машины) было:

Completed 100 tasks in 23097 milliseconds
Used 23 threads

Изменение задачCreationOPT для задачCreationOptions.longrunning дал разные результаты:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads

Другие советы

Пожалуйста, простите мою публикацию этого как ответ, так как не на самом деле не отвечает на ваш вопрос, однако это связано с вашим вопросом, так как он имеет дело с поведением корреляции и потоков / задачах / etc. Я смотрел на использование корреляционногоМаджера LogicalOperationStack (а также StartLogicalOperation/StopLogicalOperation Методы) для предоставления дополнительного контекста в многопотативных сценариях.

Я взял ваш пример и немного изменил его, чтобы добавить возможность выполнять работу параллельно, используя параллельно. Кроме того, я использую StartLogicalOperation/StopLogicalOperation к кронштейну (внутренне) DoLongRunningWork. Отказ Концептуально, DoLongRunningWork Что-то вроде каждый раз, когда он выполнен:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

Я обнаружил, что если я добавлю эти логические операции к вашему коду (более или менее как есть), все логические операторы остаются в синхронизации (всегда ожидаемое количество операций на стеке и значения операций на стеке всегда как ожидал).

В некоторых собственных тестировании я обнаружил, что это не всегда так. Стек логической операции стал «поврежден». Лучшее объяснение, которое я мог придумать, является то, что «объединение» обратная часть информации CallContext в «родительский» контекст резьбы, когда «ребенок» выходит из выходов «дочерней», вызывала «старую» информацию об контексте дочерней резьбы (логическую операцию), чтобы быть « Унаследовано «другим братом детской нити.

Проблема также может быть связана с тем, что параллельно. Для по-видимому, использует основной поток (по крайней мере, в примере кода, как написано) в качестве одного из «рабочих потоков» (или что их следует вызывать в параллельном домене). Всякий раз, когда выполняется DolongrunningWork, начинается новая логическая операция (в начале) и остановилась (в конце) (то есть нажатой на логический запас и выскочил от него). Если основной нить уже имеет логическую операцию, и если Dolongrunningwork выполняет на главной ните, то начата новая логическая операция, поэтому логика управления основным потоком теперь имеет две операции. Любые последующие исполнения DolongrunningWorkwork (до тех пор, пока эта «Итерация» долонгуннингва.

Мне потребовалось много времени, чтобы выяснить, почему поведение логикатации было отличным в моем примере, чем в моей модифицированной версии вашего примера. Наконец я увидел, что в моем коде я пробил всю программу в логическую операцию, тогда как в моей модифицированной версии вашей тестовой программы я не сделал. Подразделение заключается в том, что в моей программе тестирования каждый раз, когда была выполнена моя «работа» (аналогично долонгуннингом), уже существует логическая операция. В моей модифицированной версии вашей программы тестирования я не пробил всю программу в логическую операцию.

Итак, когда я изменил свою программу тестирования, чтобы сконструировать всю программу в логическую операцию, и если я использую параллельно. Forfor, я столкнулся с тем же проблемой.

Используя концептуальную модель выше, это будет успешно работать:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

Хотя это в конечном итоге утверждается из-за, по-видимому, из-за логика синхронизации:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Вот моя выборка программы. Это похоже на вас в том, что у него есть метод долонгуннинга, который манипулирует активность, а также логикал. У меня также есть два аромата пинания долонгуннинга. Один аромат использует задачи, которые используются параллельно. For. Каждый аромат также может быть выполнен таким образом, чтобы вся параллелизованная операция заключена в логическую операцию или нет. Таким образом, в общей сложности 4 способа выполнить параллельную работу. Чтобы попробовать каждую, просто вотворить желаемое «использовать ...» метод, перекомпилировать и прогон. UseTasks, UseTasks(true), а также UseParallelFor Все должны запускать до завершения. UseParallelFor(true) Будет утверждать в какой-то момент, потому что логика - не имеет ожидаемого количества записей.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

Вся в этом вопросе, если логика для логика можно использовать с параллельным. Для (и / или других конструкций потоков / задач) или как его можно использовать, вероятно, заслуживает собственного вопроса. Может быть, я опубликую вопрос. Между тем, мне интересно, если у вас есть какие-либо мысли по этому поводу (или, интересно, если бы вы рассмотрели использование логика, поскольку активность, кажется, безопасна).

РЕДАКТИРОВАТЬ

Увидеть мой ответ на этот вопрос Для получения дополнительной информации об использовании логикатации и / или CallContext.logicalSetData с некоторыми из различных потоков / ThreadPool / Task / Parallel CONTSTRUPTS.

См. Также мой вопрос здесь на так о логическом стандарте и параллельных расширениях:CorrelationManager.logicalOperationStack совместим с параллельными задачами, потоками, потоками и т. Д.

Наконец, см. Также мой вопрос здесь на Forum Microsoft Parallel Extensions:http://social.msdn.microsoft.com/forums/en-us/parallelextensions/Thrad/7c5c3051-133b-4814-9db0-fc0039b4f9d9.

В моем тестировании он выглядит как Trace.correlationManager.logicalOperationStackStack может быть поврежден при использовании параллельных. Для работы Parallel.invoke, если вы запускаете логическую работу в главной ните, а затем запускать / остановить логические операции в делегате. В моих тестах (см. Любое из двух ссылок выше). Логический запас всегда должен иметь ровно 2 записи, когда DolongrunningWork выполняется (если я запускаю логическую операцию в основном потоке перед началом пикирования Dolongrunningwork с использованием различных методов). Итак, «испорченным», я имею в виду, что логика, в конечном итоге будет много более 2 записей.

Из того, что я могу сказать, это, вероятно, потому, что параллель. Для Parallel.invoke Используйте основную нить как один из «рабочих» потоков для выполнения действия Dolongrunningwork.

Использование стека, хранящегося в CallContext.logicalSetData, чтобы имитировать поведение логика управления (аналогично логическому стандарту Log4net. Если я использую такой стек, чтобы поддерживать контекст, он становится поврежденным (т. Е. Не имеет ожидаемого количества записей) практически во всех сценариях, где у меня есть «логическая операция» в основном потоке и логической операции в каждой итерации / Выполнение делегата Dolongrunningwork.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top