First, since you want one-way communication with only one socket receiving messages, that generally means PUSH-PULL. Here is a version of the client:
import zmq
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PUSH)
url = 'tcp://127.0.0.1:5555'
s.connect(url)
while True:
msg = raw_input("msg > ")
s.send(msg)
if msg == 'quit':
break
so a PUSH socket sends the messages we get from raw_input. It should be clear how to change that logic to generate the messages you want. A bit of bonus is that if you type 'quit', both the client and the server will quit.
There are a variety of ways to do the non-blocking server, depending on the complexity of your application. I'll show a few examples, from the most basic one to the most powerful / extensible.
All of these server examples assume this at the top, setting up the server's PULL socket:
import time
import zmq
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PULL)
url = 'tcp://127.0.0.1:5555'
s.bind(url)
The first example is simple non-blocking recv, which raises a zmq.Again
Exception if there are no messages ready to be received:
# server0.py
while True:
try:
msg = s.recv(zmq.NOBLOCK) # note NOBLOCK here
except zmq.Again:
# no message to recv, do other things
time.sleep(1)
else:
print("received %r" % msg)
if msg == 'quit':
break
But that pattern is pretty hard to extend beyond very simple cases. The second example uses a Poller, to check for events on the socket:
# server1.py
poller = zmq.Poller()
poller.register(s)
while True:
events = dict(poller.poll(0))
if s in events:
msg = s.recv()
print("received %r" % msg)
if msg == 'quit':
break
else:
# no message to recv, do other things
time.sleep(1)
In this toy example, this is very similar to the first. But, unlike the first, it is easy to extend to many sockets or events with further calls to poller.register
, or passing a timeout other than zero to poller.poll
.
The last example uses an eventloop, and actually registers a callback for when messages arrive. You can build very complex applications with this sort of pattern, and it is a fairly straightforward way to write code that only does work when there is work to be done.
# server2.py
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
def print_msg(msg):
print("received %r" % ' '.join(msg))
if msg[0] == 'quit':
ioloop.IOLoop.instance().stop()
# register the print_msg callback to be fired
# whenever there is a message on our socket
stream = ZMQStream(s)
stream.on_recv(print_msg)
# do other things in the meantime
tic = time.time()
def do_other_things():
print("%.3f" % (time.time() - tic))
pc = ioloop.PeriodicCallback(do_other_things, 1000)
pc.start()
# start the eventloop
ioloop.IOLoop.instance().start()
So that's a few basic ways to deal with zmq messages without blocking. You can grab these examples together as a gist.