Pregunta

When I print out received messages on the Console the displayed messages are all messed up, each message containing 5 string sub-messages that are printed on the Console before control reverts back to the incoming message callback. I strongly assume this is because the incoming message event is raised async in Booksleeve?

I refer to the following post, How does PubSub work in BookSleeve/ Redis?, where the author, Marc Gravell, pointed to the ability to force sync reception by setting Completion Mode to "PreserveOrder". I have done that, tried before and after connecting the client. Neither seems to work.

Any ideas how I can receive messages and print them on the console in the exact order they were sent? I only have one single publisher in this case.

Thanks

Edit:

Below some code snippets to show how I send messages and the Booksleeve wrapper I quickly wrote.

Here the client (I have a similar Client2 that receives the messages and checks order, but I omitted it as it seems trivial).

class Client1
{
    const string ClientId = "Client1";
    private static Messaging Client { get; set; }

    private static void Main(string[] args)
    {
        var settings = new MessagingSettings("127.0.0.1", 6379, -1, 60, 5000, 1000);
        Client = new Messaging(ClientId, settings, ReceiveMessage);
        Client.Connect();

        Console.WriteLine("Press key to start sending messages...");
        Console.ReadLine();

        for (int index = 1; index <= 100; index++)
        {
            //I turned this off because I want to preserve 
            //the order even if messages are sent in rapit succession

            //Thread.Sleep(5); 

            var msg = new MessageEnvelope("Client1", "Client2", index.ToString());
            Client.SendOneWayMessage(msg);
        }

        Console.WriteLine("Press key to exit....");
        Console.ReadLine();

        Client.Disconnect();
    }

    private static void ReceiveMessage(MessageEnvelope msg)
    {
        Console.WriteLine("Message Received");
    }
}

Here the relevant code snippets of the library:

public void Connect()
    {
        RequestForReplyMessageIds = new ConcurrentBag<string>();

        Connection = new RedisConnection(Settings.HostName, Settings.Port, Settings.IoTimeOut);
        Connection.Closed += OnConnectionClosed;
        Connection.CompletionMode = ResultCompletionMode.PreserveOrder;
        Connection.SetKeepAlive(Settings.PingAliveSeconds);

        try
        {
            if (Connection.Open().Wait(Settings.RequestTimeOutMilliseconds))
            {
                //Subscribe to own ClientId Channel ID
                SubscribeToChannel(ClientId);
            }
            else
            {
                throw new Exception("Could not connect Redis client to server");
            }
        }
        catch
        {
            throw new Exception("Could not connect Redis Client to Server");
        }
    }

public void SendOneWayMessage(MessageEnvelope message)
    {
        SendMessage(message);
    }

private void SendMessage(MessageEnvelope msg)
    {
        //Connection.Publish(msg.To, msg.GetByteArray());
        Connection.Publish(msg.To, msg.GetByteArray()).Wait();
    }

private void IncomingChannelSubscriptionMessage(string channel, byte[] body)
    {
        var msg = MessageEnvelope.GetMessageEnvelope(body);

        //forward received message
        ReceivedMessageCallback(msg);

        //release requestMessage if returned msgId matches
        string msgId = msg.MessageId;
        if (RequestForReplyMessageIds.Contains(msgId))
        {
            RequestForReplyMessageIds.TryTake(out msgId);
        }
    }

public void SubscribeToChannel(string channelName)
    {
        if (!ChannelSubscriptions.Contains(channelName))
        {
            var subscriberChannel = Connection.GetOpenSubscriberChannel();
            subscriberChannel.Subscribe(channelName, IncomingChannelSubscriptionMessage).Wait();
            ChannelSubscriptions.Add(channelName);
        }
    }
¿Fue útil?

Solución

Without seeing exactly how you are checking for this, it is hard to comment, but what I can say is that any threading oddity is going to be hard to track down and fix, and is therefore very unlikely to be addressed in BookSleeve, given that it has been succeeded. However! It will absolutely be checked in StackExchange.Redis. Here's the a rig I've put together in SE.Redis (and, embarrassingly, it did highlight a slight bug, fixed in next release, so .222 or later); output first:

Subscribing...

Sending (preserved order)...
Allowing time for delivery etc...
Checking...
Received: 500 in 2993ms
Out of order: 0

Sending (any order)...
Allowing time for delivery etc...
Checking...
Received: 500 in 341ms
Out of order: 306

(keep in mind that 500 x 5ms is 2500, so we should not be amazed by the 2993ms number, or the 341ms - this is mainly the cost of the Thread.Sleep we have added to nudge the thread-pool into overlapping them; if we remove that, both loops take 0ms, which is awesome - but we can't see the overlapping issue so convincingly)

As you can see, the first run has the correct order output; the second run has mixed order, but it ten times faster. And that is when doing trivial work; for real work it would be even more noticeable. As always, it is a trade-off.

Here's the test rig:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using StackExchange.Redis;

static class Program
{
    static void Main()
    {
        using (var conn = ConnectionMultiplexer.Connect("localhost"))
        {
            var sub = conn.GetSubscriber();
            var received = new List<int>();
            Console.WriteLine("Subscribing...");
            const int COUNT = 500;
            sub.Subscribe("foo", (channel, message) =>
            {
                lock (received)
                {
                    received.Add((int)message);
                    if (received.Count == COUNT)
                        Monitor.PulseAll(received); // wake the test rig
                }
                Thread.Sleep(5); // you kinda need to be slow, otherwise
                // the pool will end up doing everything on one thread
            });
            SendAndCheck(conn, received, COUNT, true);
            SendAndCheck(conn, received, COUNT, false);
        }

        Console.WriteLine("Press any key");
        Console.ReadLine();
    }
    static void SendAndCheck(ConnectionMultiplexer conn, List<int> received, int quantity, bool preserveAsyncOrder)
    {
        conn.PreserveAsyncOrder = preserveAsyncOrder;
        var sub = conn.GetSubscriber();
        Console.WriteLine();
        Console.WriteLine("Sending ({0})...", (preserveAsyncOrder ? "preserved order" : "any order"));
        lock (received)
        {
            received.Clear();
            // we'll also use received as a wait-detection mechanism; sneaky

            // note: this does not do any cheating;
            // it all goes to the server and back
            for (int i = 0; i < quantity; i++)
            {
                sub.Publish("foo", i);
            }

            Console.WriteLine("Allowing time for delivery etc...");
            var watch = Stopwatch.StartNew();
            if (!Monitor.Wait(received, 10000))
            {
                Console.WriteLine("Timed out; expect less data");
            }
            watch.Stop();
            Console.WriteLine("Checking...");
            lock (received)
            {
                Console.WriteLine("Received: {0} in {1}ms", received.Count, watch.ElapsedMilliseconds);
                int wrongOrder = 0;
                for (int i = 0; i < Math.Min(quantity, received.Count); i++)
                {
                    if (received[i] != i) wrongOrder++;
                }
                Console.WriteLine("Out of order: " + wrongOrder);
            }
        }
    }
}
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top