Question

I would like to parallelize the application that processes multiple video clips frame by frame. Sequence of each frame per clip is important (obviously). I decided to go with TPL Dataflow since I believe this is a good example of dataflow (movie frames being data).

So I have one process that loads frames from database (lets say in a batch of 500, all bunched up)

Example sequence:    
|mid:1 fr:1|mid:1 fr:2|mid:2 fr:1|mid:3 fr:1|mid:1 fr:3|mid:2 fr:2|mid:2 fr:3|mid:1 fr:4|

and posts them to BufferBlock. To this BufferBlock I have linked ActionBlocks with the filter to have one ActionBlock per MovieID so that I get some kind of data partitioning. Each ActionBlock is sequential, but ideally multiple ActionBlocks for multiple movies can run in parallel.

I do have the above described network working and it does run in parallel, but from my calculations only eight to ten ActionBlocks are executing simultaneously. I timed each ActionBlock's running time and its around 100-200ms. What steps can I take to at least double concurrency?

I did try converting action delegates to async methods and make database access asynchronous within ActionBlock action delegate but it did not help.

EDIT: I implemented extra level of data partitioning: frames for Movies with Odd IDs are processed on ServerA, frames for Even movies are processed on ServerB. Both instances of the application hit the same database. If my problem was DB IO, then I would not see any improvement in total frames processed count (or very little, under 20%). But I do see it doubling. So this leads me to conclude that Threadpool is not spawning more threads to do more frames in parallel (both servers are quad-cores and profiler shows about 25-30 threads per application).

Was it helpful?

Solution

Some assumptions:

  • From your example data, you are receiving movie frames (and possibly the frames in the movies) out of order

  • Your ActionBlock<T> instances are generic; they all call the same method for processing, you just create a list of them based on each movie id (you have a list of movie ids beforehand) like so:

// The movie IDs
IEnumerable<int> movieIds = ...;

// The actions.
var actions = movieIds.Select(
    i => new { Id = i, Action = new ActionBlock<Frame>(MethodToProcessFrame) });

// The buffer block.
BufferBlock<Frame> buffer = ...;

// Link everything up.
foreach (var action in actions) 
{
    // Not necessary in C# 5.0, but still, good practice.
    // The copy of the action.
    var actionCopy = action;

    // Link.
    bufferBlock.LinkTo(actionCopy.Action, f => f.MovieId == actionCopy.Id);
}

If this is the case, you're creating too many ActionBlock<T> instances which aren't being given work; because your frames (and possibly movies) are out-of-order, you aren't guaranteed that all of the ActionBlock<T> instances will have work to do.

Additionally, when you create an ActionBlock<T> instance it's going to be created with a MaxDegreeOfParallelism of 1, meaning that it's thread safe because only one thread can access the block at the same time.

Additionally, the TPL DataFlow library ultimately relies on the Task<TResult> class, which schedules by default on the thread pool. The thread pool is going to do a few things here:

  • Make sure that all processor cores are saturated. This is very different from making sure that your ActionBlock<T> instances are saturated and this is the metric you should be concerned with

  • Make sure that while the processor cores are saturated, make sure that the work is distributed evenly, as well as make sure that not too many concurrent tasks are executing (context switches are expensive).

It also looks like your method that processes your movies is generic, and it doesn't matter what frame from what movie is passed in (if it does matter, then you need to update your question with that, as it changes a lot of things). This would also mean that it's thread-safe.

Also, if it can be assumed that the processing of one frame doesn't rely on the processing of any previous frames (or, it looks like the frames of the movie come in order) you can use a single ActionBlock<T> but tweak up the MaxDegreeOfParallelism value, like so:

// The buffer block.
BufferBlock<Frame> buffer = ...;

// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
    // This is where you tweak the concurrency:
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = 4,
    }
);

// Link.  No filter needed.
bufferBlock.LinkTo(action);

Now, your ActionBlock<T> will always be saturated. Granted, any responsible task scheduler (the thread pool by default) is still going to limit the maximum amount of concurrency, but it's going to do as much as it can reasonably do at the same time.

To that end, if your action is truly thread safe, you can set the MaxDegreeOfParallelism to DataflowBlockOptions.Unbounded, like so:

// Have *one* ActionBlock<T>
var action = new ActionBlock<Frame>(MethodToProcessFrame,
    // This is where you tweak the concurrency:
    new ExecutionDataflowBlockOptions {
        // We're thread-safe, let the scheduler determine
        // how nuts we can go.
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    }
);

Of course, all of this assumes that everything else is optimal (I/O reads/writes, etc.)

OTHER TIPS

Odds are that's the optimal degree of parallelization. The thread pool is honestly pretty darn good at determining the optimal number of actual threads to have active. My guess is that your hardware can support about that many parallel processes actually working in parallel. If you added more you wouldn't actually be increasing throughput, you'd just be spending more time doing context switches between threads and less time actually working on them.

If you notice that, over an extended period of time, your CPU load, memory bus, network connection, disk access, etc. are all working below capacity then you might have a problem, and you'd want to check to see what is actually bottlenecking. Chances are though some resource somewhere is at it's capacity, and the TPL has recognized that and ensured that it doesn't over saturate that resource.

I suspect you are IO bound. The question is where? On the read or the write. Are you writing more data than reading. CPU may be under 50% because it cannot write out faster.

I am not saying the ActionBlock is wrong but I would consider a producer consumer with BlockingCollection. Optimize how you read and write data.

This different but I have an app where I read blocks of text. Parse the text and then write the words back to SQL. I read the on a single thread, then parallel the parse, and then write on a single thread. I write on a single thread so as not to fracture indexes. If you are IO bound you need to figure out what is the slowest IO then optimize that process.

Tell me more about that IO.

In the question you mention reading from database also.
I would give BlockingCollections a try.
BlockingCollection Class
And have size limit for each as so you don't blow memory.
Make it just big enough that it (almost) never goes empty.
The Blocking Collection after the slowest step will go empty. If you can parallel process then do so.
What I have found is parallel inserts in a table are not faster.
Let one process take lock and hold it and keep that hose open.
Look close at how you insert.
One row at a time is slow.
I use TVP and insert 10,000 at a time but a lot of people like Drapper or BulkInsert.
If you drop indexes and triggers and insert sorted by clustered index will be fastest. Take a tablock and hold it. I am getting inserts in the 10 ms range.
Right now the update is the slowest. Look at that - are you doing just one row at a time?
Look at taking tablock and doing by video clip.
Unless it is an ugly update it should not take longer than in insert.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top