Question

I want to create a parallel pipeline in C#. I have declaered an Interface named IOperation:

public interface IOperation<Tin, Tout>
{
    BlockingCollection<Tout> BlockingCollection(IEnumerable<Tin> input);
}

Now i want to write a class, which executes multiple of these operations parallel. I bagan with this:

public class Pipeline : IPipeline
{
    private List<IOperation<Object, Object>> operations = new List<IOperation<Object, Object>>();
    private List<BlockingCollection<Object>> buffers = new List<BlockingCollection<Object>>();
    public void Register(IOperation<Object, Object> operation)
    {
        operations.Add(operation);
    }

    public void Execute()
    {

    }
}

But i don't find any solution to save the operations and the buffers between the operations, because they all have different generic types. Does anyone have an idea?

Was it helpful?

Solution

Have you considered using Parallel.ForEach from the TPL?
The Task Parallel Library (TPL) is a set of public types and APIs in .NET 4.

OTHER TIPS

It's not very clear how your Pipeline is meant to work. Why are you passing around BlockingCollections? Why are you using generics but then putting object in as the type?

Consider instead having a Pipeline that you load with deleggates of type Action and then use the task parallel library to create Tasks that execute those actions in parallel.

public void Register(Action operation)
    {
        operations.Add(operation);
    }

public void Execute()
    {
        foreach (var action in operations)
          Task.StartNew(operation);
    }

But that's not really a 'pipeline', it's just a bundle of operations that execute in parallel.

A pipeline would more normally have pipeline steps with an input type and an output type. You could handle this by creating something like PipelineStep<T,U> and you'd construct each pipeline step passing in a Func operation. Internally, each pipeline step could consume an input IEnumerable and produce an output IEnumerable and it could do this using Task or more simply using a parallel foreach loop.

Alternatively you could perhaps use the TPL's Task.ContinueWith method to chain the Tasks together from input to output.

Microsoft has something exactly like this -- TPL Dataflow lets you define blocks in a pipeline, with fine-grained controls on how they are buffered and parallelized.

Unlike your solution, it uses a fully asynchronous push design. It does not use a BlockingCollection (a blocking pull design), and will be significantly faster for it if you have a deep pipeline.

There's a good article at http://msdn.microsoft.com/en-us/library/ff963548.aspx about parallel pipelines with BlockingCollection.

Basically each step should have an output queue of type BlockingCollection. It takes in items from the previous step's output queue, and adds them to it's output once done processing.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top