Question

I took a look at the various ZMQ messaging patterns and I'm not sure which one would do for my project. All I want to do is be able to connect to a server and send a command (the client never receive anything). On the server side, I want to be able to check if there is a message, if there is one, process it, else continue to do other stuff without blocking. That way, the server could continue to work even if there is no client connected.

#client.py
while(True):
    select = raw_input()
    if select == "1":
       socket.send(msg1)
    elif select == "2":
       socket.send(msg2)
    ...

#server.py
while(True):
    msg = socket.recv() #should not block
    if msg == ...
        #do stuff
    #do other stuff

So which pattern should I use with ZMQ to do that? Example code would be appreciated.

Was it helpful?

Solution

First, since you want one-way communication with only one socket receiving messages, that generally means PUSH-PULL. Here is a version of the client:

import zmq

ctx = zmq.Context.instance()
s = ctx.socket(zmq.PUSH)
url = 'tcp://127.0.0.1:5555'
s.connect(url)

while True:
    msg = raw_input("msg > ")
    s.send(msg)
    if msg == 'quit':
        break

so a PUSH socket sends the messages we get from raw_input. It should be clear how to change that logic to generate the messages you want. A bit of bonus is that if you type 'quit', both the client and the server will quit.

There are a variety of ways to do the non-blocking server, depending on the complexity of your application. I'll show a few examples, from the most basic one to the most powerful / extensible.

All of these server examples assume this at the top, setting up the server's PULL socket:

import time
import zmq

ctx = zmq.Context.instance()
s = ctx.socket(zmq.PULL)
url = 'tcp://127.0.0.1:5555'
s.bind(url)

The first example is simple non-blocking recv, which raises a zmq.Again Exception if there are no messages ready to be received:

# server0.py

while True:
    try:
        msg = s.recv(zmq.NOBLOCK) # note NOBLOCK here
    except zmq.Again:
        # no message to recv, do other things
        time.sleep(1)
    else:
        print("received %r" % msg)
        if msg == 'quit':
            break

But that pattern is pretty hard to extend beyond very simple cases. The second example uses a Poller, to check for events on the socket:

# server1.py

poller = zmq.Poller()
poller.register(s)

while True:
    events = dict(poller.poll(0))
    if s in events:
        msg = s.recv()
        print("received %r" % msg)
        if msg == 'quit':
            break
    else:
        # no message to recv, do other things
        time.sleep(1)

In this toy example, this is very similar to the first. But, unlike the first, it is easy to extend to many sockets or events with further calls to poller.register, or passing a timeout other than zero to poller.poll.

The last example uses an eventloop, and actually registers a callback for when messages arrive. You can build very complex applications with this sort of pattern, and it is a fairly straightforward way to write code that only does work when there is work to be done.

# server2.py

from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

def print_msg(msg):
    print("received %r" % ' '.join(msg))
    if msg[0] == 'quit':
        ioloop.IOLoop.instance().stop()

# register the print_msg callback to be fired
# whenever there is a message on our socket
stream = ZMQStream(s)
stream.on_recv(print_msg)

# do other things in the meantime
tic = time.time()
def do_other_things():
    print("%.3f" % (time.time() - tic))

pc = ioloop.PeriodicCallback(do_other_things, 1000)
pc.start()

# start the eventloop
ioloop.IOLoop.instance().start()

So that's a few basic ways to deal with zmq messages without blocking. You can grab these examples together as a gist.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top