سؤال

Below I have a fairly simple .NET console app consuming an Azure Service Bus Queue.

As you'll see, I am using Task.Factory to fire up 25 receiver tasks that then call my APM-style BeginMessageReceive method. Then, at the end of EndMessageReceive, I call BeginMessageReceive again to keep the loop going.

My question is how could I achieve the same sort of thing but switching from the APM-style BeginMessageReceive/EndMessageReceive to a TPL/TAP approach using recursive Tasks and possibly utilizing C# 5.0 async/await?

using System;
using System.Configuration;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;


namespace ServiceBusConsumer
{
    class Program
    {
        private static QueueClient _queueClient;

        private static void Main(string[] args)
        {    
            var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
            _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

            for (var i = 0; i < 25; i++ )
            {
                Task.Factory.StartNew(BeginMessageReceive);
            }

            Console.WriteLine("Waiting for messages...");
            Console.ReadKey();

            _queueClient.Close();

        } //end private static void Main(string[] args)

        private static void BeginMessageReceive()
        {
            _queueClient.BeginReceive(TimeSpan.FromMinutes(5), EndMessageReceive, null);
        }

        private static void EndMessageReceive(IAsyncResult iar)
        {
            var message = _queueClient.EndReceive(iar);
            try
            {
                if (message != null)
                {
                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                    {
                        // Mark brokered message as completed at which point it's removed from the queue.
                        message.Complete();
                    }
                }
            }
            catch (Exception ex)
            {
                if (_queueClient.Mode == ReceiveMode.PeekLock)
                {
                    // unlock the message and make it available 
                    message.Abandon();
                }

                Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
            BeginMessageReceive();
        }

    }
}

New modified code for recursively call itself again if the MessageReceive timeout expires:

private static async Task MessageReceiveAsync()
{
    while (true)
    {
        using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))try
        {
            if (message != null)
            {               
                try
                {

                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                    {
                        // Mark brokered message as completed at which point it's removed from the queue.
                        await message.CompleteAsync();
                    }
                }
                catch (Exception ex)
                {
                    message.AbandonAsync();
                    Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
                }
            }
        }
    }
}
هل كانت مفيدة؟

المحلول

It looks like the Azure client libs still haven't been updated with TAP APIs. Not sure what the holdup is there...

Anyway, you can create your own APM->TAP wrappers using TaskFactory.FromAsync, as such:

public static class MyAzureExtensions
{
  public static Task<BrokeredMessage> ReceiveAsync(this QueueClient @this,
      TimeSpan serverWaitTime)
  {
    return Task<BrokeredMessage>.Factory.FromAsync(
        @this.BeginReceive, @this.EndReceive, serverWaitTime, null);
  }

  public static Task CompleteAsync(this BrokeredMessage @this)
  {
    return Task.Factory.FromAsync(@this.BeginComplete, @this.EndComplete, null);
  }
}

Once you've wrapped the Azure calls into a TAP-ready API, you can use them as such:

private static void Main(string[] args)
{    
  var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
  _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

  for (var i = 0; i < 25; i++ )
    MyMessageReceiveAsync();

  Console.WriteLine("Waiting for messages...");
  Console.ReadKey();

  _queueClient.Close();
}

private static async Task MyMessageReceiveAsync()
{
  while (true)
  {
    using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))
    {
      try
      {
        var msg = message.GetBody<string>();
        Console.WriteLine("Message: " + msg);

        if (_queueClient.Mode == ReceiveMode.PeekLock)
        {
          // Mark brokered message as completed at which point it's removed from the queue.
          await message.CompleteAsync();
        }
      }
      catch (Exception ex)
      {
        if (_queueClient.Mode == ReceiveMode.PeekLock)
        {
          // unlock the message and make it available 
          message.Abandon();
        }

        Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
      }
    }
  }
}

One benefit to using async like this is that you don't tie up thread pool threads unnecessarily. The original used 25 threads to listen; my sample will not use any threads to listen. The only time a thread pool thread is tied up in my sample is when a message is being abandoned (in the error handling branch).

There is one major semantic difference from the original code: if the QueueClient's "receive" raises an exception, in the original code it would crash the process; in my example the exception would be ignored.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top