Yes, you pretty much need to do what you described, passing the additional data from every block to the next one.
But using a couple of helper methods, you can make this much simpler:
public static IPropagatorBlock<TInput, Tuple<TOutput, TInput>>
CreateExtendedSource<TInput, TOutput>(Func<TInput, TOutput> transform)
{
return new TransformBlock<TInput, Tuple<TOutput, TInput>>(
input => Tuple.Create(transform(input), input));
}
public static IPropagatorBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>
CreateExtendedTransform<TInput, TOutput, TExtension>(Func<TInput, TOutput> transform)
{
return new TransformBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>(
tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2));
}
The signatures look daunting, but they are actually not that bad.
Also, you might want to add overloads that pass options to the created block, or overloads that take async delegates.
For example, if you wanted to perform some operations on a number using separate blocks, while passing the original number along the way, you could do something like:
var source = new BufferBlock<int>();
var divided = CreateExtendedSource<int, double>(i => i / 2.0);
var formatted = CreateExtendedTransform<double, string, int>(d => d.ToString("0.0"));
var writer = new ActionBlock<Tuple<string, int>>(tuple => Console.WriteLine(tuple));
source.LinkTo(divided);
divided.LinkTo(formatted);
formatted.LinkTo(writer);
for (int i = 0; i < 10; i++)
source.Post(i);
As you can see, your lambdas (except for the last one) deal only with the “current” value (int
, double
or string
, depending on the stage of the pipeline), the “original” value (always int
) is passed automatically. At any moment, you can use block created using the normal constructor to access both values (like the final ActionBlock
in the example).
(That BufferBlock
isn't actually necessary, but I added it to more closely match your design.)