Frage

Ich bin mit ZeroMQ experimentieren und versuchen, bekommen etwas arbeiten. Mein erster Gedanke war ein REP / REQ mit dem inproc Transport einzurichten, um zu sehen, ob ich Nachrichten zwischen zwei Threads senden könnte. Die meisten der folgenden Code wird aus den clzmq Beispiele genommen, aber es scheint nicht zu arbeiten.

Sowohl der Server als auch der Client auf den Transport gebunden ist, aber wenn der Client eine Send blockiert es zu tun versucht, und nur dort sitzt. Ich habe keine Erfahrung ZeroMQ so dass ich nicht sicher, wo ich schauen zuerst, wäre jede Hilfe sehr geschätzt. Hier ist der säumige (offensive) Code:

using System;
using System.Diagnostics;
using System.Threading;
using NUnit.Framework;
using ZMQ;

namespace PostBox
{
    [TestFixture]
    public class Class1
    {

        private const string Address = "inproc://test";
        private const uint MessageSize = 10;
        private const int RoundtripCount = 100;

        [Test]
        public void Should()
        {
            var clientThread = new Thread(StartClient);
            clientThread.Start();

            var serverThread = new Thread(StartServer);
            serverThread.Start();

            clientThread.Join();
            serverThread.Join();

            Console.WriteLine("Done with life");
        }

        private void StartServer()
        {


            //  Initialise 0MQ infrastructure
            using (var ctx = new Context(1))
            {
                using (var skt = ctx.Socket(SocketType.REP))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Server has bound");

                    //  Bounce the messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        var msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);
                        skt.Send(msg);
                    }
                    Thread.Sleep(1000);
                }
            }

            Console.WriteLine("Done with server");
        }

        private void StartClient()
        {
            Thread.Sleep(2000);

            //  Initialise 0MQ infrastructure
            using (var ctx = new Context(1))
            {
                using (var skt = ctx.Socket(SocketType.REQ))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Client has bound");

                    //  Create a message to send.
                    var msg = new byte[MessageSize];

                    //  Start measuring the time.
                    var watch = new Stopwatch();
                    watch.Start();

                    //  Start sending messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        skt.Send(msg);
                        msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);

                        Console.Write(".");
                    }

                    //  Stop measuring the time.
                    watch.Stop();
                    var elapsedTime = watch.ElapsedTicks;

                    //  Print out the test parameters.
                    Console.WriteLine("message size: " + MessageSize + " [B]");
                    Console.WriteLine("roundtrip count: " + RoundtripCount);

                    //  Compute and print out the latency.
                    var latency = (double)(elapsedTime) / RoundtripCount / 2 *
                        1000000 / Stopwatch.Frequency;
                    Console.WriteLine("Your average latency is {0} [us]",
                        latency.ToString("f2"));
                }
            }

            Console.WriteLine("Done with client");
        }

    }
}

Edit:

Ich habe diese Arbeit mit Hilfe der unten Antwort, aber es mußte mich auch eine Bind zu einem Connect ändern, was Sinn macht, wenn man darüber nachdenkt, wie wir einen Server der Bindung an einem lokalen Transport und ein Client-Anschluss haben zu einem Remote-Transport. Hier ist der aktualisierte Code:

using System;
using System.Diagnostics;
using System.Threading;
using NUnit.Framework;
using ZMQ;

namespace PostBox
{
    [TestFixture]
    public class Class1
    {

        private const string Address = "inproc://test";
        private const uint MessageSize = 10;
        private const int RoundtripCount = 100;

        private static Context ctx;

        [Test]
        public void Should()
        {
            using (ctx = new Context(1))
            {
                var clientThread = new Thread(StartClient);
                clientThread.Start();

                var serverThread = new Thread(StartServer);
                serverThread.Start();

                clientThread.Join();
                serverThread.Join();

                Console.WriteLine("Done with life");
            }
        }

        private void StartServer()
        {
            try
            {
                using (var skt = ctx.Socket(SocketType.REP))
                {
                    skt.Bind(Address);

                    Console.WriteLine("Server has bound");

                    //  Bounce the messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        var msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);
                        skt.Send(msg);
                    }
                    Thread.Sleep(1000);
                }

                Console.WriteLine("Done with server");
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

        private void StartClient()
        {
            Thread.Sleep(2000);

            try
            {
                //  Initialise 0MQ infrastructure
                using (var skt = ctx.Socket(SocketType.REQ))
                {
                    skt.Connect(Address);

                    Console.WriteLine("Client has bound");

                    //  Create a message to send.
                    var msg = new byte[MessageSize];

                    //  Start measuring the time.
                    var watch = new Stopwatch();
                    watch.Start();

                    //  Start sending messages.
                    for (var i = 0; i < RoundtripCount; i++)
                    {
                        skt.Send(msg);
                        msg = skt.Recv();
                        Debug.Assert(msg.Length == MessageSize);

                        Console.Write(".");
                    }

                    //  Stop measuring the time.
                    watch.Stop();
                    var elapsedTime = watch.ElapsedTicks;

                    //  Print out the test parameters.
                    Console.WriteLine("message size: " + MessageSize + " [B]");
                    Console.WriteLine("roundtrip count: " + RoundtripCount);

                    //  Compute and print out the latency.
                    var latency = (double)(elapsedTime) / RoundtripCount / 2 *
                                  1000000 / Stopwatch.Frequency;
                    Console.WriteLine("Your average latency is {0} [us]",
                                      latency.ToString("f2"));
                }

                Console.WriteLine("Done with client");
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

    }
}
War es hilfreich?

Lösung

Ich glaube, müssen beide Threads den gleichen Kontext verwenden. Zeromq Führung empfiehlt nicht mehr als einen Kontext in einem Prozess zu verwenden. Erstellen Sie einen Kontext, teilen diese Zusammenhang zwischen den beiden Themen. Dies sollte funktionieren.

http://zguide.zeromq.org/chapter:all

Sie müssen ein ‚Kontext‘ Objekt für Ihren Prozess erstellen, und übergeben Sie das zu Alle Themen. Der Kontext sammelt omq des Staates. So erstellen Sie eine Verbindung über den inproc: Transport, sowohl Server als auch Client-Thread muss Teile das gleiche Kontextobjekt.

Andere Tipps

Nur ein Ende der andere muss Connect binden können, können Sie mehrere Verbindungen haben.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top