Question

Is there a good way to pass thread local data into an ActionBlock, such that if you specify MaxDegreeOfParallelization in its DataFlowExecutionOptions to be > 1, then each task that executes the action will have its own thread local data?

Here is some of my code that will perhaps clarify what I want to do:

var options = new ExecutionDataflowBlockOptions() 
     {
        MaxDegreeOfParallelism = 12
     };

ActionBlock<int> actionBlock = new ActionBlock<int>(PerformAction, options);

List<int> resultsList = new List<int>();

void PerformAction(int i)
{
    // do some work

    // add them to resultsList 

    // i want to make sure that each thread that executes this method has its 
    // own copy of resultsList 
}

I want to be able to have the ActionBlock call a thread local init function that I supply. Something like this:

new ActionBlock<int>(PerformAction, options, () => new List<int>()); 

And have it pass my thread local data into my Action function:

void PerformAction(int i, List<int> localUserData) {...}
Was it helpful?

Solution

I still don't understand why do you need thread-local list in a dataflow block. And you're right that TDF doesn't have any explicit support for thread-local values (the way Parallel.ForEach() does). But that doesn't mean you can't use thread-local values, you'll just have to do everything manually, using ThreadLocal (I think [ThreadStatic] wouldn't work well here, because it doesn't allow you to track all thread-local instances). For example:

private static ThreadLocal<List<int>> threadLocalList;

private static void Main()
{
    threadLocalList = new ThreadLocal<List<int>>(() => new List<int>(), true);

    var block = new ActionBlock<int>(
        (Action<int>)PerformAction,
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

    for (int i = 0; i < 10; i++)
        block.Post(i);

    block.Complete();
    block.Completion.Wait();

    foreach (var list in threadLocalList.Values)
        Console.WriteLine(string.Join(", ", list));

    threadLocalList.Dispose();
}

private static void PerformAction(int i)
{
    threadLocalList.Value.Add(i * i);
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top