ZeroMQ mutliple publishers and subscribers using XPUB/XSUB - is this a correct implementation?

StackOverflow https://stackoverflow.com/questions/21768823

  •  11-10-2022
  •  | 
  •  

Question

I am attempting to build a multiple publishers / multiple subscriber topology using ZMQ. I have created an example using the espresso.py sample by doing some slight modifications to it. I wanted to make sure what I am doing is right as I am fairly new to zeromq. Please feel free to critique and comment.

I have basically taken a few lessons to heart.

  • A zmq socket can bind to one port only across multiple processes to a single network card (aka regular sockets)

  • Binding does not mean listen i.e. you can issue a connect() after a bind (very confusing for a socket developer but hey this is not sockets)

  • The Proxy and XPUB/XSUB is meant to be used a s pattern when subscribers should not have to figure out and connect to all the publishers.

What I really dont like about the code below is that Each subscriber binds to a separate socket. While this is a necessary evil, Somehow I kept thinking this does not look right.

So here is my sample code.

# Espresso Pattern
# This shows how to capture data using a pub-sub proxy
#

import time

from random import randint
from string import uppercase
from threading import Thread

import zmq
from zmq.devices import monitored_queue

from zhelpers import zpipe

# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.


def subscriber_thread():
    ctx = zmq.Context.instance()

    # Subscribe to "A" and "B"
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:6001")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B")

    count = 0
    while True:
        try:
            msg = subscriber.recv_multipart()
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        count += 1

    print ("Subscriber received %d messages" % count)


# .split publisher thread
# The publisher sends random messages starting with A-J:

def publisher_thread(port, char):
    ctx = zmq.Context.instance()

    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:"+str(port))

    while True:
        string = "%s-%05d" % (char, randint(port, port+500))
        try:
            publisher.send(string)
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        time.sleep(0.1)         # Wait for 1/10th second

# .split listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:

def listener_thread(pipe):

    # Print everything that arrives on pipe
    while True:
        try:
            print (pipe.recv_multipart())
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted


# .split main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:

def main():

    # Start child threads
    ctx = zmq.Context.instance()
    p_thread1 = Thread(target=publisher_thread, args=(6000,'A'))
    p_thread2 = Thread(target=publisher_thread, args=(7000,'B'))
    s_thread = Thread(target=subscriber_thread)
    p_thread1.start()
    p_thread2.start()
    s_thread.start()

    pipe = zpipe(ctx)

    subscriber = ctx.socket(zmq.XSUB)
    subscriber.connect("tcp://localhost:6000")
    subscriber.connect("tcp://localhost:7000")

    publisher = ctx.socket(zmq.XPUB)
    publisher.bind("tcp://*:6001")

    l_thread = Thread(target=listener_thread, args=(pipe[1],))
    l_thread.start()

    try:
        monitored_queue(subscriber, publisher, pipe[0], 'pub', 'sub')
    except KeyboardInterrupt:
        print ("Interrupted")

    del subscriber, publisher, pipe
    ctx.term()

if __name__ == '__main__':
    main()
Was it helpful?

Solution

I raised an issue on ZeroMQ github page and got a response. It is a known bug in ZeroMQ that is caused due to the fact that publish and subscribe are happening in different threads that are raising subscription requests before the receivers of the subscription messages are fully ready. More details can be found here.

https://github.com/zeromq/libzmq/issues/897

I tried to simulate the issue here

https://gist.github.com/vivekfantain/9021979

Sharing all this for anybody else who stumbles on the same issue.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top