タスクパラレルライブラリのタスクはActivityIDにどのように影響しますか?

StackOverflow https://stackoverflow.com/questions/4340948

  •  30-09-2019
  •  | 
  •  

質問

タスクパラレルライブラリを使用する前に、複数のスレッドを使用したトレース/エラーレポートを追跡するために、CorrelationManager.ActivityIDを使用することがよくありました。

ActivityIDはスレッドローカルストレージに保存されるため、各スレッドは独自のコピーを取得します。アイデアは、スレッド(アクティビティ)を起動すると、新しいActivityIDを割り当てるということです。 ActivityIDは、他のトレース情報を使用してログに書き込まれ、単一の「アクティビティ」のトレース情報を選出することが可能になります。 ActivityIDをサービスコンポーネントに引き継ぐことができるため、これはWCFで非常に役立ちます。

これが私が話していることの例です:

static void Main(string[] args)
{
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) =>
    {
        DoWork();
    }));
}

static void DoWork()
{
    try
    {
        Trace.CorrelationManager.ActivityId = Guid.NewGuid();
        //The functions below contain tracing which logs the ActivityID.
        CallFunction1();
        CallFunction2();
        CallFunction3();
    }
    catch (Exception ex)
    {
        Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString());
    }
}

現在、TPLでは、複数のタスクがスレッドを共有することを理解しています。これは、ActivityIDが(別のタスクによる)ミッドタスクの再活性化を起こしやすいことを意味しますか?アクティビティトレースに対処するための新しいメカニズムはありますか?

役に立ちましたか?

解決

私はいくつかの実験を実行しましたが、それは私の質問の仮定が間違っていることがわかりました - TPLで作成された複数のタスクは同じスレッドで同時に実行されません。

ThreadLocalStorageは、.NET 4.0のTPLで使用しても安全です。これは、スレッドは一度に1つのタスクでのみ使用できるためです。

タスクがスレッドを同時に共有できるという仮定は、私が聞いたインタビューに基づいていました C#5.0 の上 dotnetrocks (申し訳ありませんが、それがどのショーであったか覚えていません) - 私の質問はすぐに関連するかもしれません(またはそうでないかもしれません)。

私の実験は多くのタスクを開始し、実行されたタスクの数、かかった時間、および消費されたスレッドの数を記録します。誰かが繰り返したい場合は、コードを以下に示します。

class Program
{
    static void Main(string[] args)
    {
        int totalThreads = 100;
        TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
        Task task = null;
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        Task[] allTasks = new Task[totalThreads];
        for (int i = 0; i < totalThreads; i++)
        {
            task = Task.Factory.StartNew(() =>
           {
               DoLongRunningWork();
           }, taskCreationOpt);

            allTasks[i] = task;
        }

        Task.WaitAll(allTasks);
        stopwatch.Stop();

        Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
        Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
        Console.ReadKey();
    }


    private static List<int> threadIds = new List<int>();
    private static object locker = new object();
    private static void DoLongRunningWork()
    {
        lock (locker)
        {
            //Keep a record of the managed thread used.
            if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
                threadIds.Add(Thread.CurrentThread.ManagedThreadId);
        }
        Guid g1 = Guid.NewGuid();
        Trace.CorrelationManager.ActivityId = g1;
        Thread.Sleep(3000);
        Guid g2 = Trace.CorrelationManager.ActivityId;
        Debug.Assert(g1.Equals(g2));
    }
}

出力(もちろん、これはマシンによって異なります)は次のとおりです。

Completed 100 tasks in 23097 milliseconds
Used 23 threads

taskcreationoptをtaskcreationoptions.longrunningに変更すると、さまざまな結果が得られました。

Completed 100 tasks in 3458 milliseconds 
Used 100 threads

他のヒント

あなたの質問に対する答えではないので、これを答えとして投稿することを許してください。ただし、相関マネージャーの動作とスレッド/タスク/などを扱っているため、質問に関連しています。私はCorrelationManagerの使用を検討しています LogicalOperationStack (と StartLogicalOperation/StopLogicalOperation 方法)マルチスレッドシナリオで追加のコンテキストを提供します。

私はあなたの例を取り、それをわずかに変更して、並列を使用して並行して作業を実行する機能を追加しました。また、私は使用します StartLogicalOperation/StopLogicalOperation ブラケットへ(内部) DoLongRunningWork. 。概念的には、 DoLongRunningWork それが実行されるたびにこのようなことをします:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

これらの論理操作をコード(多かれ少なかれ)に追加すると、すべての論理オペラティンが同期し続けることがわかりました(常にスタックでの操作の予想数とスタックの操作の値は常にのようです期待される)。

私自身のテストのいくつかで、私はこれが常にそうではないことを発見しました。論理操作スタックは「破損」していました。私が思いつくことができる最良の説明は、「子」スレッドが終了したときに、「子」スレッドのコンテキストにcallcontext情報の「マージ」を「古い」子スレッドコンテキスト情報(論理操作)が原因であることです。別の兄弟の子供の糸によって継承されました。

問題は、並列であるという事実にも関連している可能性があります。明らかに、メインスレッド(少なくとも書かれたコードでは、ワーカースレッド」の1つ(または並列ドメインで呼び出されるべきもの)の1つとして使用しているようです。 Dolongrunningworkが実行されるたびに、新しい論理操作が開始され(最初に)(最後に)停止します(つまり、論理オペレーションスタックに押し込まれ、そこから飛び出しました)。メインスレッドに既に論理操作が有効であり、メインスレッドでドロンランニングワークが実行された場合、新しい論理操作が開始されるため、メインスレッドの論理オペレーションスタックに2つの操作があります。 Dolongrunningworkのその後の実行は(このDolongrunningworkの「反復」がメインスレッドで実行されている限り)は、(明らかに)メインスレッドの論理オペレーションスタック(予想される操作だけでなく、2つの操作がある)を継承します。

私の例の修正バージョンの例よりも、私の例の挙動があなたの例の挙動が異なっていた理由を理解するのに長い時間がかかりました。最後に、私のコードでは、論理操作でプログラム全体をブラケットに入れていたのが見られましたが、あなたのテストプログラムの変更されたバージョンではそうしませんでした。意味は、私のテストプログラムでは、私の「仕事」が実行されるたびに(Dolongrunningworkに類似)、すでに論理的な操作が有効になっています。テストプログラムの私の修正バージョンでは、論理操作でプログラム全体をブラケットしていませんでした。

したがって、テストプログラムを修正して、論理操作でプログラム全体をブラケットし、並列を使用している場合、まったく同じ問題に遭遇しました。

上記の概念モデルを使用すると、これは正常に実行されます。

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

これは、明らかに同期がないため、最終的にはAssertがあります。

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

これが私のサンプルプログラムです。それは、ActivityIDと論理オペレーションスタックを操作するドロンランニングワーク方法を持っているという点であなたのものに似ています。また、Dolongrunningworkの蹴りの2つのフレーバーもあります。 1つのフレーバーは、Parallel.forを使用するタスクを使用します。各フレーバーは、並列化された操作全体が論理操作に囲まれるかどうかを実行することもできます。したがって、並列操作を実行する合計4つの方法があります。それぞれを試すには、希望する「使用...」メソッド、再コンパイル、および実行するだけです。 UseTasks, UseTasks(true), 、 と UseParallelFor すべて完了する必要があります。 UseParallelFor(true) LogicalOperationStackには予想されるエントリの数がないため、ある時点でアサートします。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

logicaloperationStackを並行して使用できるかどうかのこの問題全体(および/または他のスレッド/タスク構成)またはそれをどのように使用できるかは、おそらく独自の質問に値するでしょう。たぶん私は質問を投稿します。それまでの間、私はあなたがこれについて何か考えがあるのだろうかと思います(または、ActivityIDが安全であるように見えるので、あなたはLogicalOperationStackを使用することを検討したのだろうかと思います)。

編集

の私の答えを見てください この質問 LogicalOperationStackおよび/またはCallContext.LogicalSetDataの使用の詳細については、さまざまなThreadPool/Task/Parallel Contstructsを使用しています。

So ElogicalOperationStackとParallel Extensionsについての私の質問も参照してください。correlationmanager.logicaloperationstackは、並列と互換性があります。

最後に、Microsoftの並列拡張フォーラムに関する私の質問も参照してください。http://social.msdn.microsoft.com/forums/en-us/parallectensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9d9

私のテストでは、trace.correlationmanager.logicaloperationStackが並列またはparallel.invokeを使用すると破損するように見えます。私のテスト(上記の2つのリンクのいずれかを参照)では、LogicalOperationStackには、Dolongrunningworkが実行されているときに常に正確に2つのエントリが必要です(さまざまな手法を使用してDolongrunningworkを蹴る前にメインスレッドで論理操作を開始した場合)。したがって、「破損した」とは、論理オペレーションスタックが最終的に2つ以上のエントリを持っていることを意味します。

私が言うことができることから、これはおそらく、並行して並行しているためです。インヴォークは、メインスレッドを「ワーカー」スレッドの1つとして使用して、ドロンランニングワークアクションを実行するためです。

callcontext.logicalsetDataに保存されているスタックを使用して、LogicalOperationStackの動作を模倣します(callContext.setDataを介して保存されているLog4NetのLogicalThreadContext.Stackと同様)により、さらに悪い結果が得られます。コンテキストを維持するためにそのようなスタックを使用している場合、メインスレッドに「論理操作」があり、各反復で論理操作があるシナリオのほぼすべてのシナリオで破損します(つまり、予想されるエントリの数がありません) /Dolongrunningwork Delegateの実行。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top