Question

I have an application that uses Redis publish/subscribe to transfer messages between clients using the Jedis client in Java. I want to be able to subscribe to channels at runtime when the user types a command but as subscribe is a blocking operation as it listens on the thread that calls subscribe I'm not sure how to go about subscribing to other channels at a later date on the original thread.

Example:

private PubSubListener psl = new PubSubListener();

public void onCommand(String[] args) {
    subscribe(args[0]);
}

public void subscribe(String channel) {
    Jedis jedis = jedisPool.getResource();

    jedis.subscribe(psl, channel);
}

This would work except the thread that dispatches the command would then be used to poll Redis and I wouldn't be able to subscribe to any more channels with that thread.

Était-ce utile?

La solution

I observed the same problem, namely that the subscribing thread blocks once you subscribe. To address this, I implemented an optimized pub/sub client using Netty and incorporated it into a Jedis fork here. It's not a comprehensive solution and I have not had time to really finish it up, but it works for basic channel and pattern subscriptions.The basics are:

Acquire a pubsub instance using:

public static OptimizedPubSub getInstance(String host, int port, String auth, long timeout)

Issue/Cancel pattern subscriptions using:

public ChannelFuture psubscribe(String... patterns)
public ChannelFuture punsubscribe(String... patterns)

you can ignore the returned ChannelFuture unless you want to make 100% certain your request gets through (it's asynch).

Issue/Cancel channel subscriptions using:

public ChannelFuture subscribe(String... channels)
public ChannelFuture unsubscribe(String... channels)

Then implement SubListener instances:

public interface SubListener {
    /**
     * Callback when a message is published on a subscribed channel
     * @param channel The channel the message was received on
     * @param message The received message
     */
    public void onChannelMessage(String channel, String message);

    /**
     * Callback when a message is published on a subscribed channel matching a subscribed pattern
     * @param pattern The pattern that the channel matched
     * @param channel The channel the message was received on
     * @param message The received message
     */
    public void onPatternMessage(String pattern, String channel, String message);
}

and register/unregister the listeners using:

public void registerListener(SubListener listener)
public void unregisterListener(SubListener listener)

OptimizedPubSub never blocks and events are delivered to the registered SubListeners asynchronously.

The fork is a bit old now, so it may not be useful to you in its current form, but you can easily pull the source in that package and build it stand-alone. The dependencies are Jedis and Netty.

Sorry I did not have a more comprehensive solution.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top