Pregunta

I try to create well-designed TPL dataflow pipeline with optimal using of system resources. My project is a HTML parser that adds parsed values into SQL Server DB. I already have all methods of my future pipeline, and now my question is what is the optimal way to place them in Dataflow blocks, and how much blocks i should use? Some of methods are CPU-bound, and some of them - I/O-bound(loading from Internet, SQL Server DB queries). For now I think that placing each I/O operation in separate block is the right way like on this scheme: TPL Dataflow pipeline

What are the basic rules of designing pipelines in that case?

¿Fue útil?

Solución

One way to choose how to divide the blocks is to decide which parts you want to scale independently of the others. A good starting point is to divide the CPU-bound portions from the I/O-bound portions. I'd consider combining the last two blocks, since they are both I/O-bound (presumably to the same database).

Otros consejos

I post a sample general pipeline from Concurrent Programming on Windows . The good pipeline is a balanced pipeline it means that each stage could not bottle neck within pipeline. Base on the sample code, you could create as many threads to execute each stage.

Source code:

public class Pipeline<TSource, TDest> : IPipeline
{
  private readonly IPipelineStage[] _stages;

  public Pipeline(Func<TSource, TDest> transform, int degree) : 
     this (new IPipelineStage[0], transform, degree) {}

  internal Pipeline(IPipelineStage[] toCopy, Func<TSource, TDest> transform, int degree) 
  {
     _stages = new IPipelineStage[toCopy.Length] + 1;
     Array.Copy(toCopy, _stages, _stages.Length);
     _stages[_stages.Length - 1] = new PipelineStage(transform, degree);
  }

  public Pipeline<TSource, TNew> AddStage<TNew>(Func<TDest, TNew> transform, degree) 
  {
     return new Pipeline<TSource, TNew>(_stages, transform, degree);
  }

  public IEnumerator<TDest> GetEnumerator(IEnumerable<TSrouce> arg)
  {
     IEnumerable er = arg;
     CountdownEvent ev = null;

     for (int i = 0; i < _stages.Length; i++)
       er = _stages[i].Start(er, ref ev);

     foreach (TDest elem in ef)
       yield return elem;
  }
}

class PipelineStage<TInput, TOutput> : IPipelineStage
{
   private readonly Func<TInput, TOutput> _transform;
   private readonly int _degree;

   internal PipelineStage(Func<TInput, TOutput> transform, int degree)
   {
      _transform = transform;
      _degree = degree;
   }

   internal IEnumerable Start(IEnumerable src)
   {
       //...
   }
}

interface IPipelineStage 
{
   IEnumerable Start(IEnumerable Src);
}
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top