Domanda

(see my edit at the bottom of this original post for the solution)

Setup

I have two stored procedures in my Microsoft SQL Server 2005 Express database:

  • WaitForMyMessage(@myName NVARCHAR(50), @myMessage NVARCHAR(MAX) OUTPUT)
  • ProvideMessage(@name NVARCHAR(50), @message NVARCHAR(MAX))

I want WaitForMyMessage() to block until someone calls ProvideMessage() with the corresponding name. If someone had already called ProvideMessage() with that name then WaitForMyMessage() would come back immediately with the value provided.

I originally considered implementing this with a simple table with FIFO queue behavior, but could not find a way to block on INSERT into this table. So WaitForMyMessage() would have to poll, and that was unacceptable for obvious reasons.

Q1:

Is there an efficient way to block until a certain record appears in a table? The WAITFOR statement would be fantastic, but SQL doesn't seem to support it for queries (only supports DELAY, TIME, or RECEIVE). But something like it would be great,

e.g.:

-- It would be great is SQL supported this, but as far as I can tell it doesn't.
DECLARE @t TABLE (ans NVARCHAR(MAX));
WAITFOR (
  WITH A AS (
    SELECT TOP (1) * 
    FROM ProviderMessage A 
    WHERE ProviderMessage.Name = @myName
    ORDER BY A.ID
  )
  DELETE FROM A 
  OUTPUT deleted.ID INTO @t
);
SET @myMessage = (SELECT ans FROM @t);

So it would sit idle until someone inserted a record with the appropriate Name into the ProviderMessage table, and as soon as that happened then that record would be deleted back out by the above while retrieving its Value field to be returned to the caller.

Other ideas

Alas, I could not find an answer to Q1 so I proceeded to implement this using actual message queuing as provided by Service Broker. This seemed overkill considering the power and reach of Service Broker, but without an answer to Q1, I had to give it a try. I defined my service and simple queue as follows:

CREATE QUEUE q1
CREATE SERVICE s1 ON QUEUE q1 ([DEFAULT])

And then WaitForMyMessage() became:

DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (
  SELECT FarHandle 
  FROM ProviderInfo 
  WHERE ProviderInfo.Name = @myName
);
WAITFOR (
  RECEIVE @myMessage = CONVERT(NVARCHAR(MAX), message_body) 
  FROM q1
  WHERE conversation_handle = @farHandle
);

and ProvideMessage() would send the message, like this:

DECLARE @nearHandle UNIQUEIDENTIFIER;
SET @nearHandle = (
  SELECT NearHandle 
  FROM ProviderInfo 
  WHERE ProviderInfo.Name = @name
);
SEND ON CONVERSATION @nearHandle (@message)

This all works perfectly well, except for one thing: it seems discovering both the near and far handles of a given conversation is unsupported by Service Broker. I must know both so I can populate the ProviderInfo table for both procedures to communicate privately.

Q2:

How can one get both the near conversation handle AND the far conversation handle of a new conversation? Right now I'm doing this by querying sys.conversation_endpoints like this:

-- Create the conversation
DECLARE @nearHandle UNIQUEIDENTIFIER; 
BEGIN DIALOG CONVERSATION @nearHandle 
FROM SERVICE s1 
TO SERVICE 's1' 
WITH ENCRYPTION = OFF; 

-- Queue an initialization message
SEND ON CONVERSATION @nearHandle ('');

-- Figure out the handle to the receiving side of this conversation
DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (
  SELECT conversation_handle 
  FROM sys.conversation_endpoints 
  WHERE conversation_id = (
    SELECT A.conversation_id 
    FROM sys.conversation_endpoints A 
    WHERE A.conversation_handle = @nearHandle 
  ) AND conversation_handle <> @nearHandle 
);

-- Get our initialization message out of the queue
DECLARE @unused TINYINT;
WAITFOR (
  RECEIVE @unused = status 
  FROM q1
  WHERE conversation_handle = @farHandle
);

-- Store both conversation handles, associated with this name
INSERT INTO ProviderInfo (Name, NearHandle, FarHandle)
 SELECT @name, @nearHandle, @farHandle

but with the Service Broker architecture designed to support much more complicated scenarios, including distributed services and such, with messages potentially being placed in sys.transmission_queue even locally, and with other implementation complexities, I'm not super-confident that my approach is robust enough for production.

So if the way I'm doing it is not robust, is there a "right" way? I considered trying to avoid this need by using conversation groups but couldn't work it out due to essentially the same problem (conversation group ids aren't communicated to the far side either), and these topics I found don't provide a solution either:

Conclusion

The hurdles to get this working make me worry it's not meant to be used this way, and so either won't work in certain production scenarios, or maybe won't be supported in the future. Can anyone provide documentation indicating this way is reliable, or provide an alternative solution (with or without Service Broker) that is reliable and still efficient?

Thanks!

EDIT: (solution)

The central question here was Q2 (How can one get both the near conversation handle AND the far conversation handle of a new conversation?).

Thanks to the good ideas of several contributors, the answer that came to light (and now seems obvious!) is to get the far handle simply by SELECTing from the queue itself right after SENDint the initialization message, because the SELECT statement lets you filter on any queue column!

So instead of doing this as my original post had:

-- Queue an initialization message
SEND ON CONVERSATION @nearHandle ('');

-- Figure out the handle to the receiving side of this conversation
DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (
  SELECT conversation_handle 
  FROM sys.conversation_endpoints 
  WHERE conversation_id = (
    SELECT A.conversation_id 
    FROM sys.conversation_endpoints A 
    WHERE A.conversation_handle = @nearHandle 
  ) AND conversation_handle <> @nearHandle 
);

-- Get our initialization message out of the queue
...

we can much more simply (and efficiently!) do this:

-- Queue an initialization message with a unique identifier
DECLARE @initbin VARBINARY(36);
SET @initbin = CONVERT(VARBINARY(32), @nearHandle);
SEND ON CONVERSATION @nearHandle (@initbin);

-- Figure out the handle to the receiving side of this conversation using the known unique identifier
DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (SELECT conversation_handle FROM q1 WHERE message_body = @initbin)

-- Get our initialization message out of the queue
...

Thanks all!

È stato utile?

Soluzione

If you are going to need to pass messages back and forth, Service Broker is going to work well for you. There is no builtin functionality to block until a row is inserted, however you can use a query subscription to notify you when the resultset of a query would change. This is built on top of the Service Broker architecture, and I think you need to setup the subscription from .NET, not from SQL.

Going back to Service Broker, if it's local you don't have to worry about a lot, like routing, etc., so your case is the easiest one. The transmission queue is just a holding area for messages that need to be sent, nothing to worry about.

I think the thing that is tripping you up is that you don't need to worry about getting the conversation handles, as you get them anyway.

(1) Have the receiver of messages block on the queue:

declare @status tinyint, @far_handle uniqueidentifier, @myMessage nvarchar(max);

waitfor (
    receive @status = status, 
            @far_handle = conversation_handle, 
            @myMessage = CONVERT(NVARCHAR(MAX), message_body) 
    from q1
)

(2) Start a conversation from your service back to itself:

declare @near_handle uniqueidentifier

begin dialog conversation @near_handle
from service s1 to service 's1'
with encryption = off

send on conversation @near_handle ('hello')

Now when you start the conversation in (2), you will get the conversation handle for it's side of the connection, which then you can do what you want with, i.e. insert into a table, etc..

On the blocking side, when a message arrives, you collect the status & message body, as well as the conversation handle, which is the handle for that side of the conversation. You can use this to reply, store it, update a table row with it, etc..

Now the thing is, because at first it was receiving without a conversation handle, because it didn't have one, when the conversation is established, it should put it's conversation_handle in the where clause for the receive.

receive @status = status, 
        @myMessage = CONVERT(NVARCHAR(MAX), message_body) 
from q1
where conversation_handle = @far_handle

Otherwise it will start receiving its own sent messages. This is because you have one service talking to itself on the same queue. You could get around this by using 2 services talking to each other. This is usually a cleaner approach.

This basically eliminates the need for going to sys.conversation_endpoints, which is really for admin of conversations.

Also, for conversations to end cleanly, you should end them from both sides. Never get yourself into the situation where you need to use end conversation with cleanup.

To handle multiple conversations handled concurrently, you could use a Service Broker feature called queue activation. If you didn't need them to be handled concurrently, you wouldn't need this. To use activation, its going to be best to use two services & queues.

Complete Example

(1) Do some setup

create queue srcq
create service src on queue srcq([DEFAULT])
GO

create queue destq
create service dest on queue destq([DEFAULT])
GO

(2) Create a procedure to handle messages received

create procedure messageHandler as 

declare @far_handle uniqueidentifier,
        @message xml,
        @message_type nvarchar(256),
        @name varchar(32),
        @payload nvarchar(max),
        @handler varchar(128)

waitfor (
    receive @far_handle = conversation_handle, @message_type = message_type_name, @message = cast(message_body as xml) 
    from destq
)

if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
    -- Deal with error
    exec dealWithError 
else if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') 
begin
    -- End the Conversation
    end conversation @far_handle; 
end 
else  
begin
    set @name = @message.value('(/xml/name)[1]', 'varchar(32)');
    set @payload = @message.value('(/xml/payload)[1]', 'nvarchar(max)');

    if (select ReceiverHandle from ProviderInfo where Name = @name) is null
        update ProviderInfo
            set ReceiverHandle = @far_handle
        where Name = @name;

    -- Now Process @name however you want to
    -- This basically creates a string, say 'bobHandler', and then executes it as an sp, passing it the payload
    set @handler = @name + 'Handler';
    exec @handler @payload; 
end 
GO

(3) Create a Handler for messages associated with the name 'bob'

create procedure bobHandler (@payload nvarchar(max))
as
    print 'hello'
GO

(4) Set the destination queue to use activation

alter queue destq 
with activation (
    status = on,
    procedure_name = messageHandler,
    max_queue_readers = 10,
    execute as 'dbo'
)
GO

(5) On the Sender, start a conversation, store the sending handle, then send a message

declare @near_handle uniqueidentifier
begin dialog conversation @near_handle
from service src to service 'dest'
with encryption = off

-- Store this handle somewhere for future use
merge into ProviderInfo p
using (
    select 'bob' as Name, @near_handle as SenderHandle
) t on p.Name = t.Name
when matched then
    update set SenderHandle = t.SenderHandle, ReceiverHandle = null
when not matched then
    insert (Name, SenderHandle) values (t.Name, t.SenderHandle);

send on conversation @near_handle ('<xml><name>bob</name><payload>89237981273982173</payload></xml>')
GO

Sending the message will cause the message handler to wake up, and call the 'bobHandler' stored procedure. Setting max_queue_readers to 10 means that 10 messages can be handled concurrently.

If you didn't want to use activation, so one thread at the receiver processing all messages coming in, you could do this by simply turning it off on the destination queue, and changing the 'messageHandler' stored procedure to use wait for (receive), and running it's code in a loop.

If all of this is out, because you actually want a human person to call the receiving procedure, forget activation, and try this:

create procedure handleMessage (@name varchar(32))
as
    declare @far_handle uniqueidentifier,
        @message xml,
        @message_type nvarchar(256),
        @payload nvarchar(max),
        @handler varchar(128),
        @loop bit = 1

    while (@loop = 1)
    begin
        -- Wait for a handle with our name
        select @far_handle = conversation_handle
        from destq
        where cast(message_body as xml).value('(/xml/name)[1]', 'varchar(32)') = @name

        if (@far_handle is not null)
            set @loop = 0
        else
            waitfor delay '00:00:02'
    end

    set @loop = 1

    while (@loop = 1)
    begin
        waitfor (
            receive @message_type = message_type_name, @message = cast(message_body as xml) 
            from destq
            where conversation_handle = @far_handle
        )

        if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
            -- Deal with error
            exec dealWithError
        else if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
        begin
            -- End the Conversation
            end conversation @far_handle;

            --Exit
            set @loop = 0
        end
        else 
        begin
            set @payload = @message.value('(/xml/payload)[1]', 'nvarchar(max)');

            if (select ReceiverHandle from ProviderInfo where Name = @name) is null
                update ProviderInfo
                    set ReceiverHandle = @far_handle
                where Name = @name;

            -- Now Process @name however you want to
            -- This basically creates a string, say 'bobHandler', and then executes it as an sp, passing it the payload
            set @handler = @name + 'Handler';
            exec @handler @payload;
        end
    end
GO

Altri suggerimenti

I think, as if you are talking about message exchange in SQL Server environment Service Broker is the thing which provides you with solution for sure. This is not perfect solution for all tasks and all possible requirement but it definitely will work.

As usually, your requirements need to be specified in more details, but taking in mind some assumptions we can found two possible solutions for you. Suppose that SB configured for conversation between Sender (ProvideMessage), and Target (WaitForMyMessage)

  1. Suppose our Target use WaitFor_Any_Message procedure. When Target receives any message it invoces WaitFor_Any_Message which checks name and invoces WaitForMyMessage in turn for processing this message. The question is - do you need another messages to be processed? If no - no problem, drop them. If yes - invoce another procedure to process them.

  2. The second way is to configure separate queues for any name.

Anyways, SB is good for local message exchanges withing single database. This is working well for me in several production deployments.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top