Question

There are occasions that I need to group some of my messages together so that they can be processed in order by my worker processes.

But every single example that I find shows only how to do one message per conversation.

What I am looking for is an example that sends more than one message in a conversation AND a an example that shows how to process them too. (I may be sending them both, but I only seem to be able to get one back out.)

Was it helpful?

Solution

A conversation provides the boundary within which messages are processed in order - so you need to SEND all the messages in the group with the same ConversationId. The way I do this is with a utility table that stores the ConversationId when created so that each time a message is sent, it looks up the appropriate conversationid to send it on.

SELECT @conversationHandle = ConversationHandle FROM Qproc.SessionConversation
WHERE 
FromService = @fromService
AND ToService = @toService
AND OnContract = @onContract
AND Terminated IS NULL

IF @conversationHandle IS NULL
BEGIN
    BEGIN DIALOG CONVERSATION @conversationHandle
        FROM SERVICE @fromService
        TO SERVICE @toService
        ON CONTRACT @onContract
        WITH ENCRYPTION = OFF; --, LIFETIME = 60*60*24*100;

    -- Store the ongoing conversation for further use
    INSERT INTO QProc.SessionConversation ( FromService, ToService, OnContract,ConversationHandle)
    VALUES(     @fromService,   @toService, @onContract,    @conversationHandle )
END


    -- Create the dialog timer, timeout is seconds; this will notify the ClientQueue if nothing has happened on the conversation
    --in the timeout period
    BEGIN CONVERSATION TIMER (@conversationHandle) TIMEOUT = 60*8;
    SEND ON CONVERSATION @conversationHandle
    MESSAGE TYPE [http://COMPANYNAME/AsyncTriggerRequestMesssage]
        (@messageBody);

The reason you only see one message at the other end is to do with conversation group locking - you should read up on this to understand what is going on, but basically once a message processing procedure has seen a message, its view on the message queue is restricted to a single conversation. This won't be an issue once you reuse the same conversationID. Here is an example receive:

DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg VARCHAR(8000);
DECLARE @RecvReqMsgName sysname;


WHILE (1=1)
    BEGIN
        BEGIN TRANSACTION;

        WAITFOR
        ( RECEIVE TOP(1)
            @RecvReqDlgHandle = conversation_handle,
            @RecvReqMsg = message_body,
            @RecvReqMsgName = message_type_name
          FROM  QProc.AsyncTaskServiceQueue
        ), TIMEOUT 500;

        IF @@ROWCOUNT=0
        BEGIN
            ROLLBACK TRANSACTION;
            BREAK
        END


        IF @RecvReqMsgName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' 
            BEGIN
            END CONVERSATION @RecvReqDlgHandle;
            END

        IF @RecvReqMsgName='http://COMPANYNAME/AsyncTriggerRequestMesssage'
            BEGIN
                DECLARE @BodyDoc XML;
                SET @BodyDoc=CONVERT(XML, @RecvReqMsg)  ;   
                EXEC QProc.AsyncTaskRunTask @BodyDoc;

            END 

        COMMIT TRANSACTION;

    END

FInally, you'll need to clean those conversations up once they're no longer used, something like this:

DECLARE @conversationHandle UNIQUEIDENTIFIER;
DECLARE @messageTypeName SYSNAME;

BEGIN TRANSACTION;

RECEIVE TOP(1) 
    @conversationHandle = conversation_handle,
    @messageTypeName = message_type_name
FROM QProc.AsyncTaskClientQueue;

IF @conversationHandle IS NOT NULL
BEGIN
    --If the DialogTimer message arrives, then there has been no activity on this conversation for a while (see timeout setting in [QProc].[DispatchAsyncTaskMessage])
    --so we terminate gracefully and go home.


    IF @messageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer'
    OR @messageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
        END CONVERSATION @conversationHandle;
        UPDATE Qproc.SessionConversation SET TERMINATED = getUtcDate() WHERE ConversationHandle = @conversationHandle;

    END
    END

COMMIT TRANSACTION;
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top