Below I have a fairly simple .NET console app consuming an Azure Service Bus Queue.

As you'll see, I am using Task.Factory to fire up 25 receiver tasks that then call my APM-style BeginMessageReceive method. Then, at the end of EndMessageReceive, I call BeginMessageReceive again to keep the loop going.

My question is how could I achieve the same sort of thing but switching from the APM-style BeginMessageReceive/EndMessageReceive to a TPL/TAP approach using recursive Tasks and possibly utilizing C# 5.0 async/await?

using System;
using System.Configuration;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;

namespace ServiceBusConsumer
    class Program
        private static QueueClient _queueClient;

        private static void Main(string[] args)
            var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
            _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

            for (var i = 0; i < 25; i++ )

            Console.WriteLine("Waiting for messages...");


        } //end private static void Main(string[] args)

        private static void BeginMessageReceive()
            _queueClient.BeginReceive(TimeSpan.FromMinutes(5), EndMessageReceive, null);

        private static void EndMessageReceive(IAsyncResult iar)
            var message = _queueClient.EndReceive(iar);
                if (message != null)
                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                        // Mark brokered message as completed at which point it's removed from the queue.
            catch (Exception ex)
                if (_queueClient.Mode == ReceiveMode.PeekLock)
                    // unlock the message and make it available 

                Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
                if (message != null)


New modified code for recursively call itself again if the MessageReceive timeout expires:

private static async Task MessageReceiveAsync()
    while (true)
        using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))try
            if (message != null)

                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                        // Mark brokered message as completed at which point it's removed from the queue.
                        await message.CompleteAsync();
                catch (Exception ex)
                    Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);


It looks like the Azure client libs still haven't been updated with TAP APIs. Not sure what the holdup is there...

Anyway, you can create your own APM->TAP wrappers using TaskFactory.FromAsync, as such:

public static class MyAzureExtensions
  public static Task<BrokeredMessage> ReceiveAsync(this QueueClient @this,
      TimeSpan serverWaitTime)
    return Task<BrokeredMessage>.Factory.FromAsync(
        @this.BeginReceive, @this.EndReceive, serverWaitTime, null);

  public static Task CompleteAsync(this BrokeredMessage @this)
    return Task.Factory.FromAsync(@this.BeginComplete, @this.EndComplete, null);

Once you've wrapped the Azure calls into a TAP-ready API, you can use them as such:

private static void Main(string[] args)
  var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
  _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

  for (var i = 0; i < 25; i++ )

  Console.WriteLine("Waiting for messages...");


private static async Task MyMessageReceiveAsync()
  while (true)
    using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))
        var msg = message.GetBody<string>();
        Console.WriteLine("Message: " + msg);

        if (_queueClient.Mode == ReceiveMode.PeekLock)
          // Mark brokered message as completed at which point it's removed from the queue.
          await message.CompleteAsync();
      catch (Exception ex)
        if (_queueClient.Mode == ReceiveMode.PeekLock)
          // unlock the message and make it available 

        Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);

One benefit to using async like this is that you don't tie up thread pool threads unnecessarily. The original used 25 threads to listen; my sample will not use any threads to listen. The only time a thread pool thread is tied up in my sample is when a message is being abandoned (in the error handling branch).

There is one major semantic difference from the original code: if the QueueClient's "receive" raises an exception, in the original code it would crash the process; in my example the exception would be ignored.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top