문제

In order to avoid duplicates in my redis channel I'm checking if the message is already there by keeping an index in Redis set. Following is my implementation. However, it is giving an exception.

redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method.
    at redis.clients.jedis.Response.get(Response.java:23)

Here is the implementation.

          Jedis jedis = pool.getResource();

          String id = message.getId();
          Transaction transaction = jedis.multi();
          redis.clients.jedis.Response<java.lang.Boolean> response = transaction.sismember(ID_SET_REDIS_KEY, id);
          if (response != null && !response.get().booleanValue()) {
                //add it to the 
                transaction.sadd(ID_SET_REDIS_KEY, id);
                transaction.publish(redisChannelName, message);
            }
            transaction.exec();
            pool.returnResource(jedis);

I need to do the get inside the transaction because there are multiple publishers who may publish the exact same message.

도움이 되었습니까?

해결책

You can't have the result of your get before you end the transaction.

If you are using Redis > 2.6.X, what you can do is use a Lua Script to create a function with you logic. See Redis Lua

This is exactly what I did to guarantee concurrency in my project.

Edit: Including a more complete example

You should create something like a PUBLISHNX script (not tested):

local shouldPublish = redis.call('SISMEMBER', KEYS[1], ARGV[1])

if shouldPublish == 0
    redis.call('SADD', KEYS[1], ARGV[1])
    redis.call('PUBLISH', ARGV[2], ARGV[3])
end

And you pass all the arguments necessary, channel, messageId, message, controlKey.

PS. Wei Li is right, you could accomplish the same result using WATCH and a loop for retrying in case of concurrency, but I still prefer using a Lua script.

다른 팁

Based on @Axexandre's comment above I used the following piece of code to do perform the operation.

import redis.clients.jedis.Jedis;

public class RedisLuaDemo {

    public static void main(String args[])
    {
        Jedis jedis = new Jedis("localhost");
        jedis.sadd("a", "b");
        int numberOfKeys = 1 //we are using only one Redis set 'setvar' 
        jedis.eval("if redis.call('sismember', KEYS[1], ARGV[1]) == 1 then return ARGV[2] else redis.call('sadd', KEYS[1], ARGV[1]); redis.call('publish', 'channel.mychannel', ARGV[2])  end", numberOfKeys, "setvar", "joe", "message from joe!");

    }
}

Here is some more information about the script. It took some time to understand the syntax.

if redis.call('sismember', KEYS[1], ARGV[1]) == 1 eqavalent to SISMEMBER setvar joe

redis.call('sadd', KEYS[1], ARGV[1]);

For some reason if I don't have this jedis.sadd("a", "b"); line I get an exception (see below).

Exception in thread "main" java.lang.NullPointerException
    at redis.clients.jedis.Connection.setTimeoutInfinite(Connection.java:41)
    at redis.clients.jedis.Jedis.eval(Jedis.java:2763)
    at redis.RedisLuaDemo.main(RedisLuaDemo.java:13)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) 
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top