Question

I am currently struggling to get something up and running on an nServiceBus hosted application. I have an azure ServiceBus queue that a 3rd party is posting messages to and I want my application (which is hosted locally at the moment) to receive these messages.

I have googled for answers on how to configure the endpoint but I have had no luck in a valid config. Has anyone ever done this as I can find examples of how to connect to Azure storage queues but NOT servicebus queue. (I need azure servicebus queues for other reasons)

The config I have is as below

public void Init()
    {
        Configure.With()
           .DefaultBuilder()
           .XmlSerializer()
           .UnicastBus()
           .AzureServiceBusMessageQueue()
           .IsTransactional(true)
           .MessageForwardingInCaseOfFault()
           .UseInMemoryTimeoutPersister()
           .InMemorySubscriptionStorage();
    }

. Message=Exception when starting endpoint, error has been logged. Reason: Input queue [mytimeoutmanager@sb://[*].servicebus.windows.net/] must be on the same machine as this Source=NServiceBus.Host

.

<configuration>
  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
    <section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
    <section name="AzureServiceBusQueueConfig" type="NServiceBus.Config.AzureServiceBusQueueConfig, NServiceBus.Azure" />
    <section name="AzureTimeoutPersisterConfig" type="NServiceBus.Timeout.Hosting.Azure.AzureTimeoutPersisterConfig, NServiceBus.Timeout.Hosting.Azure" />
  </configSections>
  <AzureServiceBusQueueConfig IssuerName="owner" QueueName="testqueue" IssuerKey="[KEY]" ServiceNamespace="[NS]" />
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
  <!-- Use the following line to explicitly set the Timeout manager address -->
  <UnicastBusConfig TimeoutManagerAddress="MyTimeoutManager" />
  <!-- Use the following line to explicity set the Timeout persisters connectionstring -->
  <AzureTimeoutPersisterConfig ConnectionString="UseDevelopmentStorage=true" />
  <startup useLegacyV2RuntimeActivationPolicy="true">
    <supportedruntime version="v4.0" />
    <requiredruntime version="v4.0.20506" />
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0" />
  </startup>
</configuration>
Was it helpful?

Solution

Try moving UnicastBus() to the end of your call, like this:

    Configure.With()
       .DefaultBuilder()
       .XmlSerializer()
       .AzureServiceBusMessageQueue()
       .IsTransactional(true)
       .MessageForwardingInCaseOfFault()
       .UseInMemoryTimeoutPersister()
       .InMemorySubscriptionStorage()
       .UnicastBus(); // <- Here

And about those third parties posting messages to the queue. Keep in mind that they need to respect how NServiceBus handles serialization/deserialization. Here is how this is done in NServiceBus (the most important part is that the BrokeredMessage is initialized with a raw message, the result of a serialziation using the BinaryFormatter):

    private void Send(Byte[] rawMessage, QueueClient sender)
    {
        var numRetries = 0;
        var sent = false;

        while(!sent)
        {
            try
            {
                var brokeredMessage = new BrokeredMessage(rawMessage);

                sender.Send(brokeredMessage);

                sent = true;
            }
                // back off when we're being throttled
            catch (ServerBusyException)
            {
                numRetries++;

                if (numRetries >= MaxDeliveryCount) throw;

                Thread.Sleep(TimeSpan.FromSeconds(numRetries * DefaultBackoffTimeInSeconds));
            }
        }

    }

    private static byte[] SerializeMessage(TransportMessage message)
    {
        if (message.Headers == null)
            message.Headers = new Dictionary<string, string>();

        if (!message.Headers.ContainsKey(Idforcorrelation))
            message.Headers.Add(Idforcorrelation, null);

        if (String.IsNullOrEmpty(message.Headers[Idforcorrelation]))
            message.Headers[Idforcorrelation] = message.IdForCorrelation;

        using (var stream = new MemoryStream())
        {
            var formatter = new BinaryFormatter();
            formatter.Serialize(stream, message);
            return stream.ToArray();
        }
    }

If you want NServiceBus to correctly deserialize the message, make sure your thierd parties serialize it correctly.

OTHER TIPS

I now had exactly the same problem and spent several hours to figure out how to solve it. Basically Azure timeout persister is only supported for Azure hosted endpoints that use NServiceBus.Hosting.Azure. If you use NServiceBus.Host process to host your endpoints, it uses NServiceBus.Timeout.Hosting.Windows namespace classes. It initialized a TransactionalTransport with MSMQ and there you get this message.

I used two methods to avoid it:

  1. If you must use As_Server endpoint configuration, you can use .DisableTimeoutManager() in your initialization, it will skip the TimeoutDispatcher initialization completely
  2. Use As_Client endpoint configuration, it doesn't use transactional mode for the transport and timeout dispatcher is not inialized

There could be a way to inject Azure timeout manager somehow but I have not found it yet and I actually need As_Client thingy, so it works fine for me.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top