Question

EDIT: As recommended by svick I replaced the custom IPropagatorBlock with a simple TransformBlock, however, I still see a mismatch between the order of input items and order of output items. Below my TransformBlock instantiation and Func that I pass in:

quoteBuffer = new TransformBlock<Tuple<Symbol, int>, List<Quote>>(syncExecution, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true,  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

//Function that performs Sync Processing
Func<Tuple<Symbol, int>, List<Quote>> syncExecution = new Func<Tuple<Symbol, int>, List<Quote>>(partitionTuple =>
{
    Symbol symbol = partitionTuple.Item1;
    int partitionIndex = partitionTuple.Item2;

    //Read Binary Data
    byte[] byteArray = binaryDataReaders[symbol].ReadBytes(partitionIndex);

    //Deserialize and return quote list
    List<Quote> quoteList = dataInterfaces[symbol].Deserialize(symbol, byteArray);

    return quoteList;
});

And this is how I post to the transform block:

quoteBuffer.SendAsync(new Tuple<Symbol, int>(symbol, counter));

ORIGINIAL QUESTION:

someone helped me with the following custom transform block. The idea is to post/sendasync TInput and have TInput being acted upon in an async manner while the custom transform block preserves the order of posted items when returning transformed items.

For example, if posting 1,2,3 in the respective order and the transform function squares each input and returns the item, the correct output values and order should be 1, 4, 9, irregardless of which of the 3 transform operations completes when.

However, I suspect there is an error with the code because the output order is not correct. Worse yet, the messed up order location is random which makes it harder to debug but is a reflection of the fact that apparently the tasks that are kicked off to transform input elements into output elements complete always differently.

Can someone please take a look and possible give some hints what I am missing here? Thanks

public static IPropagatorBlock<TInput, TOutput> CreateConcurrentOrderedTransformBlock<TInput, TOutput>(Func<TInput, TOutput> transform)
    {
        var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

        var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
            tuple => tuple.Item2(transform(tuple.Item1)),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        var enqueuer = new ActionBlock<TInput>(
            async item =>
            {
                var tcs = new TaskCompletionSource<TOutput>();
                await processor.SendAsync(
                    new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
                await queue.SendAsync(tcs.Task);
            });

        enqueuer.Completion.ContinueWith(
            _ =>
            {
                queue.Complete();
                processor.Complete();
            });

        return DataflowBlock.Encapsulate(enqueuer, queue);
    }
Was it helpful?

Solution

I answer my own question because I found the bug which caused all this trouble. As can be seen from my lambda expression I read byte arrays within the data block, meaning that as soon as degree of parallelism is set to >1 byte arrays are read from the same file from a physical disk concurrently. This apparently really messes with the locations at which the bytes are read. I set the starting point of the read operation with br.basestream.seek(...) and read bytes through br.readbytes(numberBytes). As several operations concurrently affect the locations within the file the binary reader most likely reads bytes in an unordered fashion which causes the mess ups.

I solved the problem by pulling the binary reader out of the lambda expression and instead pass read byte arrays into the expression and use concurrency only for deserialization and merge/sorting purposes which solved the problem. And yes, transform block preserves the order. Thanks svick for sharing your vast expertise on the tpl dataflow side.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top