Question

I have few hundreds of files i need to upload to Azure Blob Storage.
I want to use parallel task library.
But instead of running all the 100 threads to upload in a foreach on list of files, how can i put a limit on max number of threads that it can use and finish the job in parallel. or does it balance the things automatically?

Was it helpful?

Solution

You should not be using threads for this at all. There's a Task-based API for this, which is naturally asynchronous: CloudBlockBlob.UploadFromFileAsync. Use it with async/await and SemaphoreSlim to throttle the number of parallel uploads.

Example (untested):

const MAX_PARALLEL_UPLOADS = 5;

async Task UploadFiles()
{
    var files = new List<string>();
    // ... add files to the list

    // init the blob block and
    // upload files asynchronously
    using (var blobBlock = new CloudBlockBlob(url, credentials))
    using (var semaphore = new SemaphoreSlim(MAX_PARALLEL_UPLOADS))
    {
        var tasks = files.Select(async(filename) => 
        {
            await semaphore.WaitAsync();
            try
            {
                await blobBlock.UploadFromFileAsync(filename, FileMode.Create);
            }
            finally
            {
                semaphore.Release();
            }
        }).ToArray();

        await Task.WhenAll(tasks);
    }
}

OTHER TIPS

Did you try use MaxDegreeOfParallelism? Like this:

System.Threading.Tasks.Parallel.Invoke(
new Tasks.ParallelOptions {MaxDegreeOfParallelism =  5 }, actionsArray)

Essentially you're going to want to create an Action or Task for each file to upload, put them in a List, and then process that list, limiting the number that can be processed in parallel.

My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 5 threads in parallel.

var listOfActions = new List<Action>();
foreach (var file in files)
{
    var localFile = file;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => blobBlock.UploadFromFileAsync(localFile, FileMode.Create)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 5};
Parallel.Invoke(options, listOfActions.ToArray());

This option doesn't make use of the async nature of UploadFromFileAsync though, so you might want to use the Task example below.

With Tasks

With Tasks there is no built-in function. However, you can use the one that I provide on my blog.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 5 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
foreach (var file in files)
{
    var localFile = file;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await blobBlock.UploadFromFileAsync(localFile, FileMode.Create)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 5);

You can find out by running this:

class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();

        for (int i = 0; i < 100; i++)
        {
            list.Add(i);
        }

        var runningIndex = 0;

        Task.Factory.StartNew(() => Action(ref runningIndex));

        Parallel.ForEach(list, i =>
        {
            runningIndex ++;
            Console.WriteLine(i);
            Thread.Sleep(3000);
        });

        Console.ReadKey();
    }

    private static void Action(ref int number)
    {
        while (true)
        {
            Console.WriteLine("worked through {0}", number);
            Thread.Sleep(2900);
        }
    }
}

As you can see the number of parallelism is smaller at the start, gets bigger, and grows smaller towards the end. So there definitely is some sort of automatic optimization going on.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top