Question

I have issues with a list collection that is passed into a broadcast block. Here is what I have so far (pseudo code as the complete code base is too long):

private BroadcastBlock<List<Quote>> tempBCB;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb1;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb2;
private BatchBlock<Dictionary<int, IParentOrder>> batchBlock;
private JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]> joinBlock;
private TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>,List<MySignal>> transformBlock;

tempBCB = new BroadcastBlock<List<Quote>>(quoteList => {
    return quoteList;
    //return Cloning.CloneListCloneValues<Quote>(quoteList);
});

tfb1 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

tfb2 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

batchBlock = new BatchBlock<Dictionary<int, IParentOrder>>(2);

joinBlock = new JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]>(
    new GroupingDataflowBlockOptions { Greedy = false });

transformBlock = new TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>, List<MySignal>>(
    tuple => { //do something and return List<MySignal>;});

//Linking
tempBCB.LinkTo(tfb1);
tempBCB.LinkTo(tfb2);
tfb1.LinkTo(batchBlock);
tfb2.LinkTo(batchBlock);
tempBCB.LinkTo(joinBlock.Target1);
batchBlock.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(transformBlock);

My problem is that with the current implementation of tempBCB I get strange results in the final TransformBlock<TInput, TOutput>.

For example the Dictionary<int, IParentrOrder> collections as part of the tuple are NOT of equal size even the implementations of tfb1 and tfb2 are 100% identical.

The commented-out line in the tempBCB implementation does a deep copy of the broadcasted list and that does seem to solve the problem but the issue is that this deep copy makes my code about 10 times slower, which is on such magnitude that I need to find a different solution.

First of all I am not absolutely sure that this is the problem or that its just this slow down which causes the concurrent operations to perform as expected even though a bug is still hiding in there.

Secondly, if the lack of a deep copy in the broadcast block causes those problems how can I make it faster?

Here is my deep copy code:

public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) 
    where TValue : ICloneable
{
    List<TValue> ret = new List<TValue>(original.Count);

    foreach (TValue entry in original)
    {
        ret.Add((TValue)entry.Clone());
    }

    return ret;
}

I could potentially feed a Quote[] instead of List<Quote> into the broadcast block, but I do not see how it would help to speed up performance of the deep copy.

My questions are:

  • Is the deep copy issue the real issue here (I doubt because the List<Quote>, streamed into the broadcast block is never altered by any of the transform blocks)?
  • If yes why and how can I make the deep copy more efficient?
Was it helpful?

Solution

I answer my own question because I solved the problem eventually. The issue as svick cautioned was not related to whether List<Quote> required a deep copy in the broadcastBlock or not (in fact it did not require a deep copy). The problem was related to the broadcastBlock which was requested to complete (complete propagation set to true to linked data flow blocks) before batchBlock, which also links to the joinBlock, potentially streamed all items to joinBlock. I simply took out joinBlock because I rewrote the transform blocks (they now return their own transformed items as well as the original item as well making the joinBlock obsolete.

Note on concurrency in the main transformBlock: setting MaxDegreeOfParallelism to > 1 already provides performance benefits even with this light workload, however, it really kicks in when throwing heavier workloads at it.

Here the complete code which compiles and works (I renamed some classes but the structure remains as described):

public class Test
{
    private Stopwatch watch;

    private BroadcastBlock<List<InputObject>> tempBCB;
    private BatchBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> batchBlock;
    private TransformBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>[], List<FinalObject>> transformBlock;
    private ActionBlock<List<FinalObject>> justToFlushTransformBlock;

    private CoreLogic core1;
    private CoreLogic core2;

    public Test()
    {
        tempBCB = new BroadcastBlock<List<InputObject>>(input => input);

        //here batch size = 2
        batchBlock = new BatchBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>>(2, new GroupingDataflowBlockOptions { Greedy = false });

        transformBlock = new TransformBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>[],List<FinalObject>>(array =>
        {
            List<InputObject> inputObjects = array[0].Item1;
            List<FinalObject> ret = inputObjects.ConvertAll(x => new FinalObject(x));

            foreach (var tuple in array)
            {
                //iterate over each individual object
                foreach (var dictionary in tuple.Item2)
                {
                    ret[dictionary.Key].outputList.Add(dictionary.Value);
                }
            }

            return ret;
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        justToFlushTransformBlock = new ActionBlock<List<FinalObject>>(list =>
            {
                //just in order to accept items from the transformBlock output queue
            });

        //Generate 2 CoreLogic objects
        core1 = new CoreLogic();
        core2 = new CoreLogic();

        //linking
        tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
        tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        core1.transformBlock.LinkTo(batchBlock);
        core2.transformBlock.LinkTo(batchBlock);

        batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numberChunks = 30;

        watch = new Stopwatch();
        watch.Start();

        for (int j = 1; j <= numberChunks; j++)
        {
            int collectionSize = 10000 * j;

            List<InputObject> collection = new List<InputObject>(collectionSize);
            for (int i = 0; i < collectionSize; i++)
            {
                collection.Add(new InputObject(i));
            }

            tempBCB.Post(collection);
        }

        tempBCB.Complete();

        Task.WhenAll(core1.transformBlock.Completion, core2.transformBlock.Completion).ContinueWith(_ =>
            {
                batchBlock.Complete();
            });

        transformBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
        Console.ReadLine();
    }
}

public class CoreLogic
{
    private Random rand;
    public TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> transformBlock;

    public CoreLogic()
    {
        const int numberIntermediateObjects = 10000;

        transformBlock = new TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>>(input =>
        {
            //please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return

            Dictionary<int, IntermediateObject> ret = new Dictionary<int, IntermediateObject>();
            for (int i = 0; i < numberIntermediateObjects; i++)
            {
                IntermediateObject value = new IntermediateObject(i);

                ret.Add(i, value);
            }

            var tuple = new Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>(input, ret);

            return tuple;
        });
    }
}

public class InputObject : ICloneable
{
    public int value1 { get; private set; }

    public InputObject(int value)
    {
        this.value1 = value;
    }

    object ICloneable.Clone()
    {
        return Clone();
    }

    public InputObject Clone()
    {
        return (InputObject)this.MemberwiseClone();
    }
}

public class IntermediateObject
{
    public int value1 { get; private set; }

    public IntermediateObject(int value)
    {
        this.value1 = value;
    }
}

public class FinalObject
{
    public InputObject input { get; private set; }
    public List<IntermediateObject> outputList;

    public FinalObject(InputObject input)
    {
        this.input = input;

        this.outputList = new List<IntermediateObject>();
    }
}

public static class Cloning
{
    public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) where TValue : ICloneable
    {
        List<TValue> ret = new List<TValue>(original.Count);

        foreach (TValue entry in original)
        {
            ret.Add((TValue)entry.Clone());
        }

        return ret;
    }
}

I hope this helps others who may struggle with similar issues. I love TPL Dataflow and svick in particular really helped and motivated me to dig deeper. Thank you svick!!!

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top