任务并行库中的任务如何影响Activation ID?
-
30-09-2019 - |
题
在使用“任务并行库”之前,我经常使用corseLationManager.ActivityId来跟踪与多个线程的跟踪/错误报告。
ActiveID存储在线程本地存储中,因此每个线程都会获得自己的副本。这个想法是,当您启动线程(活动)时,您会分配一个新的Activation ID。 ActivityID将使用任何其他跟踪信息写入日志,从而可以将痕量信息列出单个“活动”。这对于WCF确实很有用,因为可以将ActivationID传递到服务组件。
这是我在说什么的示例:
static void Main(string[] args)
{
ThreadPool.QueueUserWorkItem(new WaitCallback((o) =>
{
DoWork();
}));
}
static void DoWork()
{
try
{
Trace.CorrelationManager.ActivityId = Guid.NewGuid();
//The functions below contain tracing which logs the ActivityID.
CallFunction1();
CallFunction2();
CallFunction3();
}
catch (Exception ex)
{
Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString());
}
}
现在,有了TPL,我的理解是多个任务共享线程。这是否意味着ActiveID容易被重新定义(通过另一个任务)重新定义中任务?是否有新的机制可以处理活动追踪?
解决方案
我进行了一些实验,事实证明我的问题中的假设是不正确的 - 使用TPL创建的多个任务不会同时在同一线程上运行。
threacalstorage可以安全地与.NET 4.0中的tpl一起使用,因为一个任务只能由一个任务使用。
任务可以同时共享线程的假设是基于我听说的采访 C#5.0 在 点netrocks (对不起,我不记得是哪个表现) - 所以我的问题可能(或可能不)很快变得有意义。
我的实验启动了许多任务,并记录了运行了多少任务,他们花了多长时间以及消耗了多少个线程。如果有人愿意重复该代码,则该代码在下面。
class Program
{
static void Main(string[] args)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork();
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.ReadKey();
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static void DoLongRunningWork()
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
}
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
}
}
输出(当然这取决于机器)是:
Completed 100 tasks in 23097 milliseconds
Used 23 threads
将taskcreationopt更改为taskereationoptions.longrunning给出了不同的结果:
Completed 100 tasks in 3458 milliseconds
Used 100 threads
其他提示
请原谅我的帖子作为答案,因为它并不是真正回答您的问题,但是,它与您的问题有关,因为它涉及相关manager的行为和线程/任务/任务/等。我一直在考虑使用CorrelationManager的 LogicalOperationStack
(和 StartLogicalOperation/StopLogicalOperation
方法)在多线程方案中提供其他上下文。
我以您的榜样进行了稍作修改,以添加并行使用并行执行工作的能力。另外,我使用 StartLogicalOperation/StopLogicalOperation
到支架(内部) DoLongRunningWork
. 。从概念上, DoLongRunningWork
每次执行时都这样做:
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
我发现,如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作蛋白都保持同步(始终是堆栈上的预期操作数,并且堆栈上的操作值始终为预期的)。
在我自己的一些测试中,我发现并非总是如此。逻辑操作堆栈正在“损坏”。我能想到的最好的解释是,当“子”线程退出导致“旧的”儿童线程上下文信息(逻辑操作)为“是”时,CallContext信息的“合并”回到“父”线程上下文中继承了“另一个兄弟姐妹的子线。
该问题也可能与并行的事实有关。显然使用主线程(至少在示例代码中,如书面)作为“工作线程”之一(或在并行域中应调用的任何内容)。每当执行Dolongrunningwork时,就开始(开头)开始了一个新的逻辑操作并停止(在末尾)(即将其推到LogicalOperationStack上并弹出)。如果主线程已经具有生效的逻辑操作,并且在主线程上执行了Dolongrunnunningwork,则启动了一个新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作。任何后续执行Dolongrunningwork(只要Dolongrunningwork的“迭代”在主线程上执行)将(显然)继承主线程的LogicalOperationStack(现在它具有两个操作,而不仅仅是一个预期的操作)。
我花了很长时间才弄清楚逻辑播放插图的行为与我的示例的修改版本中不同。最终,我在代码中看到了我在逻辑操作中对整个程序进行了括起来的,而在我的测试程序的修改版本中,我没有。这意味着在我的测试程序中,每当我的“工作”进行(类似于Dolongrunningwork)时,就已经有一个合理的操作。在我的测试程序的修改版本中,我没有在逻辑操作中对整个程序进行整理。
因此,当我修改您的测试程序以在逻辑操作中括起整个程序,如果我并行使用。因为,我遇到了完全相同的问题。
使用上面的概念模型,这将成功运行:
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
虽然这最终会因为显然是不同步的LogicalOperationStack而断言:
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
这是我的示例程序。它与您的类似,因为它具有一种操纵ActivationId和LogicalOperationStack的Dolongrunningwork方法。我也有两种口味的多龙舞。一种风味使用一个并行使用的任务。每个风味也可以执行,以使整个并行操作都包含在逻辑操作中。因此,总共有4种执行并行操作的方法。要尝试每个,只需输入所需的“使用...”方法,重新编译和运行。 UseTasks
, UseTasks(true)
, , 和 UseParallelFor
应该全部完成。 UseParallelFor(true)
会在某个时候断言,因为逻辑播放脚步没有预期的条目数。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
如果逻辑上的脚步可以与并行使用(和/或其他线程/任务构造),或者如何使用它的整个问题可能值得自己的问题。也许我会发布一个问题。同时,我想知道您是否对此有任何想法(或者,我想知道您是否考虑过使用LogicalOperationStack,因为ActiveID似乎是安全的)。
编辑
看到我的答案 这个问题 有关使用LogicalOperationStack和/或CallContext.logicalSetData的更多信息,请使用各种线程/线程池/任务/并行contstructs中的某些信息。
另请参阅我在此处的问题,关于逻辑播放框架和并行扩展:IS cor corseLationManager.LogicalOperationStack与并行兼容。
最后,另请参阅Microsoft的Parallel Extensions论坛上的我的问题:http://social.msdn.microsoft.com/forums/en-us/parallelextensions/thread/7c5c3c3051-133b-4814-9db0-fc0039b4f9d9
在我的测试中,看起来像trace.correlationmanager.logicaloperationstack在使用并行时可能会损坏。在我的测试中(请参阅上面的两个链接中的任何一个),逻辑poperationstack在执行Dolongrunningwork时始终应完全有2个条目(如果我在使用各种技术启动Dolongrunningwork之前在主线程中启动逻辑操作)。因此,通过“损坏”,我的意思是逻辑上的脚步最终将有2个以上的条目。
据我所知,这可能是因为并行并并行。注册使用主线程作为“工人”线程之一执行Dolongrunningwork操作。
使用存储在CallContext.LogicalSetData中的堆栈来模仿LogicalOperationStack的行为(类似于通过CallContext.setData存储的Log4net的Log4net的LogicalThreadContext.stacks产生的结果甚至更糟糕。如果我使用这样的堆栈来维护上下文,那么它在几乎所有的场景中都会损坏(即没有预期的条目) /执行Dolongrunningwork代表。