Question

I have a multithreaded application written in Java using jeromq 0.3.2. I'm working on putting it into the Tanuki service wrapper so that it runs as a windows service, and I'm having some trouble with cleanly stopping the threads. The run() method of the proxy is a variation on the Simple Pirate pattern from the guide, and looks like this:

public void run()
{
    ZContext context = new ZContext();
    Socket frontend = context.createSocket(ZMQ.ROUTER);
    Socket backend = context.createSocket(ZMQ.ROUTER);
    frontend.bind("tcp://*:5555");
    backend.bind("inproc://workers");

    while (!Thread.currentThread().isInterrupted())
    {
        ZMQ.Poller poller = new ZMQ.Poller(2);
        poller.register(frontend, ZMQ.Poller.POLLIN);
        poller.register(backend, ZMQ.Poller.POLLIN);
        poller.poll();
        if (poller.pollin(0))
        {
            processFrontend(frontend, backend, context);
        }
        if (poller.pollin(1))
        {
            processBackend(frontend, backend);
        }
    }

    // additonal code to close worker threads
}

How can I cleanly exit from this loop when the controlling wrapper wants to stop the application?

If there are no clients currently connected, then the loop is blocked at the call to poller.poll(), so if the wrapper calls interrupt() on the thread it is ignored. If there are clients currently sending in messages, then a call to interrupt() causes the poller.poll() method to throw a zmq.ZError$IOException: java.nio.channels.ClosedByInterruptException

I've also tried to use:

PollItem[] items = {
    new PollItem(frontend, Poller.POLLIN),
    new PollItem(backend, Poller.POLLIN)
};
int rc = ZMQ.poll(items, 2, -1);

if (rc == -1)
    break;

if (items[0].isReadable())
{
    processFrontend(frontend, backend, context);
}

if (items[1].isReadable())
{
    processBackend(frontend, backend);
}

but the call to ZMQ.poll exhibits the same behaviour. Two possible alternatives are:

  • set a timeout on ZMQ.poll and wrap the content of the run() method in a try/catch for the IOException.
  • add a method to my Runnable that will connect to the frontend and send a special message that will be read in processFrontend and cause the code to break out of the loop.

The first seems a bit dirty, and the second feels a bit fragile. Are there any other approaches I should consider? Is there some other polling method I can use that reacts more cleanly to a thread interrupt?

Was it helpful?

Solution

It's better to extract the loop to the separate thread and open an additional inproc PULL socket, so that the loop polls messages from 3 sockets: frontend, backend and control. If the main thread receives interrupt signal, is should send STOP command to the proxy loop.

Exactly this pattern is implemented in my project: https://github.com/thriftzmq/thriftzmq-java/blob/master/thriftzmq/src/main/java/org/thriftzmq/ProxyLoop.java

On the main thread you may add a shutdownHook and send STOP command to the loop from within it.

 Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
        try {
            //Connect to controlSocket and send STOP command
            ZMQ.Socket peer = context.socket(ZMQ.PUSH);
            peer.connect(CONTROL_ENDPOINT);
            peer.send(STOP_MESSAGE, 0);
            peer.close();
        } catch (Exception ex) {
            logger.error("Failed to stop service", ex);
            System.exit(1);
        }
   }
});

P.S. Using primitives from guava-library to manage service lifecycle makes the life a lot easier.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top