Question

I would like to access Service Bus Queues and Topics from Workflows with some specific activities.

I couldn't find anything fitting this scenario (this MSDN article and this article by Roman Kiss) are the nearest one.

I would like to design a custom activity which uses the QueueClient to receive asynchronously the brokered messages, using the BeginReceive method implemented with the async/await pattern (please see my question about it).

First of all, I would like to ask if it there are any reasons why I should prefer the suggested approach (adapted WCF) instead of my desired one (using the QueueClient).

Then, I would appreciate help designing it in a persistence-friendly way.

Update:

This is what I tried so far:

public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
    [RequiredArgument]
    public InArgument<string> ConnectionString { get; set; }

    [RequiredArgument]
    public InArgument<string> Path { get; set; }

    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var connectionString = this.ConnectionString.Get(context);
        var path = this.Path.Get(context);
        var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
        var cts = new CancellationTokenSource();
        context.UserState = new ReceiveState
                                {
                                    CancellationTokenSource = cts,
                                    QueueClient = queueClient
                                };
        var task = ExecuteAsync(context, cts.Token);
        var tcs = new TaskCompletionSource<BrokeredMessage>(state);
        task.ContinueWith(
            t =>
                {
                    if (t.IsFaulted)
                    {
                        tcs.TrySetException(t.Exception.InnerExceptions);
                    }
                    else if (t.IsCanceled)
                    {
                        tcs.TrySetCanceled();
                    }
                    else
                    {
                        tcs.TrySetResult(t.Result);
                    }

                    if (callback != null)
                    {
                        callback(tcs.Task);
                    }
                });

        return tcs.Task;
    }

    protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = (Task<BrokeredMessage>)result;
        try
        {
            return task.Result;
        }
        catch (OperationCanceledException)
        {
            if (context.IsCancellationRequested)
            {
                context.MarkCanceled();
            }
            else
            {
                throw;
            }

            return null; // or throw?
        }
        catch (AggregateException exception)
        {
            if (exception.InnerException is OperationCanceledException)
            {
                if (context.IsCancellationRequested)
                {
                    context.MarkCanceled();
                }
                else
                {
                    throw;
                }

                return null; // or throw?
            }

            ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
            throw;
        }
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        var state = (ReceiveState)context.UserState;
        state.CancellationTokenSource.Cancel();
    }

    private async Task<BrokeredMessage> ExecuteAsync(
        AsyncCodeActivityContext context, CancellationToken cancellationToken)
    {
        var receiveState = context.UserState as ReceiveState;
        var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
            receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
        var completionTask = receiveTask.ContinueWith(
             t =>
                 {
                     BrokeredMessage result;
                     if (t.IsCanceled)
                     {
                         context.MarkCanceled();
                         result = null;
                     }
                     else if (t.IsFaulted)
                     {
                         result = null;
                     }
                     else
                     {

                         t.Result.Complete();
                         result = t.Result;
                     }

                     receiveState.QueueClient.Close();
                     return result;
                 },
             cancellationToken);
        return await completionTask;
    }

    private class ReceiveState
    {
        public CancellationTokenSource CancellationTokenSource { get; set; }

        public QueueClient QueueClient { get; set; }
    }
}

And tested this way (using local Windows Server Service Bus):

var connectionString = new Variable<string>
                                   {
                                       Default = connectionStringValue
                                   };
        var path = new Variable<string>
                       {
                           Default = pathValue
                       };
        var test = new While
                       {
                           Body =
                               new Pick
                                   {
                                       Branches =
                                           {
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new AsyncReceiveBrokeredMessage
                                                               {
                                                                   ConnectionString = new InArgument<string>(connectionString),
                                                                   Path = new InArgument<string>(path)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Received message"
                                                               }
                                                   },
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new Delay
                                                               {
                                                                   Duration = TimeSpan.FromSeconds(10)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Timeout!"
                                                               }
                                                   }
                                           }
                                   },
                           Condition = true,
                           Variables = { connectionString, path }
                       };
        WorkflowInvoker.Invoke(test);

I receive messages as expected if I continuously send them. Problems come after the first timeout, because then I'm not receiving anymore any message. Any clarification is appreciated.

No correct solution

OTHER TIPS

First you need to know some important things: 1) Workflows are long running processes meant to be pausable and restorable later. 2) The way workflows get woken up and restored is Bookmarks. 3) Usually people like their workflows to be persistable while being paused as well. (If you don't care about persistance why are you using WF anyway - just for the visual design tooling?)

Logical problem:

If all your workflows and their activities are persisted and suspended, then none of your activity code is even loaded, so who is doing the listening? Answer: something else, not an Activity, has to be the thing that is listening on the ServiceBus queue and taking responsibility for resuming bookmarks to wake up your workflows.

That something is the workflow 'Host', or some extension of it. Here are a couple blog posts about how you can customize a host to listens to messages [from a GUI button] and wake up a workflow activity.

http://blogs.msdn.com/b/tilovell/archive/2011/02/26/wf4-workflow-4-0-hosting-extensions-redux.aspx

http://blogs.msdn.com/b/tilovell/archive/2010/06/08/wf4-workflow-4-0-hosting-extensions.aspx

What you could do is take this code and adapt it to listen on a ServiceBus queue instead of a GUI button, and wake up your own ReceiveFromServiceBus activity, which is analogous to PageActivity - note you have to be writing a NativeActivity in order to work with bookmarks properly.

All rather cumbersome... but I believe the 'right' way to do it with WF.

May be the problem is in DefaultMessageTimeToLive ot TimeToLive properties.

NamespaceManager.CreateSubscription(
        new SubscriptionDescription(TopicName, SubscriptionName)
            {
                LockDuration = TimeSpan.FromMinutes(5),
                DefaultMessageTimeToLive = TimeSpan.FromDays(7),
                EnableDeadLetteringOnMessageExpiration = true
            });

Queue entities provide the following capabilities: "The ability to specify a time at which the message will be added to the queue."

After some timeout you may not receive, because of this rule?

May resolution is:

Detection of inbound message duplicates, allowing clients to send the same message multiple times without adverse consequences.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top