Question

In order to avoid duplicates in my redis channel I'm checking if the message is already there by keeping an index in Redis set. Following is my implementation. However, it is giving an exception.

redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method.
    at redis.clients.jedis.Response.get(Response.java:23)

Here is the implementation.

          Jedis jedis = pool.getResource();

          String id = message.getId();
          Transaction transaction = jedis.multi();
          redis.clients.jedis.Response<java.lang.Boolean> response = transaction.sismember(ID_SET_REDIS_KEY, id);
          if (response != null && !response.get().booleanValue()) {
                //add it to the 
                transaction.sadd(ID_SET_REDIS_KEY, id);
                transaction.publish(redisChannelName, message);
            }
            transaction.exec();
            pool.returnResource(jedis);

I need to do the get inside the transaction because there are multiple publishers who may publish the exact same message.

Était-ce utile?

La solution

You can't have the result of your get before you end the transaction.

If you are using Redis > 2.6.X, what you can do is use a Lua Script to create a function with you logic. See Redis Lua

This is exactly what I did to guarantee concurrency in my project.

Edit: Including a more complete example

You should create something like a PUBLISHNX script (not tested):

local shouldPublish = redis.call('SISMEMBER', KEYS[1], ARGV[1])

if shouldPublish == 0
    redis.call('SADD', KEYS[1], ARGV[1])
    redis.call('PUBLISH', ARGV[2], ARGV[3])
end

And you pass all the arguments necessary, channel, messageId, message, controlKey.

PS. Wei Li is right, you could accomplish the same result using WATCH and a loop for retrying in case of concurrency, but I still prefer using a Lua script.

Autres conseils

Based on @Axexandre's comment above I used the following piece of code to do perform the operation.

import redis.clients.jedis.Jedis;

public class RedisLuaDemo {

    public static void main(String args[])
    {
        Jedis jedis = new Jedis("localhost");
        jedis.sadd("a", "b");
        int numberOfKeys = 1 //we are using only one Redis set 'setvar' 
        jedis.eval("if redis.call('sismember', KEYS[1], ARGV[1]) == 1 then return ARGV[2] else redis.call('sadd', KEYS[1], ARGV[1]); redis.call('publish', 'channel.mychannel', ARGV[2])  end", numberOfKeys, "setvar", "joe", "message from joe!");

    }
}

Here is some more information about the script. It took some time to understand the syntax.

if redis.call('sismember', KEYS[1], ARGV[1]) == 1 eqavalent to SISMEMBER setvar joe

redis.call('sadd', KEYS[1], ARGV[1]);

For some reason if I don't have this jedis.sadd("a", "b"); line I get an exception (see below).

Exception in thread "main" java.lang.NullPointerException
    at redis.clients.jedis.Connection.setTimeoutInfinite(Connection.java:41)
    at redis.clients.jedis.Jedis.eval(Jedis.java:2763)
    at redis.RedisLuaDemo.main(RedisLuaDemo.java:13)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) 
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top