It looks like the Azure client libs still haven't been updated with TAP APIs. Not sure what the holdup is there...
Anyway, you can create your own APM->TAP wrappers using TaskFactory.FromAsync
, as such:
public static class MyAzureExtensions
{
public static Task<BrokeredMessage> ReceiveAsync(this QueueClient @this,
TimeSpan serverWaitTime)
{
return Task<BrokeredMessage>.Factory.FromAsync(
@this.BeginReceive, @this.EndReceive, serverWaitTime, null);
}
public static Task CompleteAsync(this BrokeredMessage @this)
{
return Task.Factory.FromAsync(@this.BeginComplete, @this.EndComplete, null);
}
}
Once you've wrapped the Azure calls into a TAP-ready API, you can use them as such:
private static void Main(string[] args)
{
var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
_queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");
for (var i = 0; i < 25; i++ )
MyMessageReceiveAsync();
Console.WriteLine("Waiting for messages...");
Console.ReadKey();
_queueClient.Close();
}
private static async Task MyMessageReceiveAsync()
{
while (true)
{
using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))
{
try
{
var msg = message.GetBody<string>();
Console.WriteLine("Message: " + msg);
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// Mark brokered message as completed at which point it's removed from the queue.
await message.CompleteAsync();
}
}
catch (Exception ex)
{
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// unlock the message and make it available
message.Abandon();
}
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
}
}
}
One benefit to using async
like this is that you don't tie up thread pool threads unnecessarily. The original used 25 threads to listen; my sample will not use any threads to listen. The only time a thread pool thread is tied up in my sample is when a message is being abandoned (in the error handling branch).
There is one major semantic difference from the original code: if the QueueClient
's "receive" raises an exception, in the original code it would crash the process; in my example the exception would be ignored.