Question

I am attempting to implement a class to enforce concurrency in my java application by blocking asynchronous attempts to a modify a given instance of an entity (identified by a key) using RentrantLock. The goal is to block/queue multiple concurrent attempts to modify a given instance of an object until previous threads have completed. The class implements this in a generic way, allowing any block of code to obtain a lock and release it once done (identical to the RentrantLock semantics) with the added utility of only blocking threads attempting to modify the same instance of the object (as identified by a key) rather than blocking all threads entering the block of code.

This class provides a simple construct to allow synchronizing a block of code just for one instance of an entity. For example if I want a block of code to be synchronized for all threads coming from user with id 33, but i don't want threads from any other user blocked by the thread servicing user 33.

The class is implemented as follows

public class EntitySynchronizer {
  private static final int DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS = 300; // 5 minutes
  private Object mutex = new Object();
  private ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<Object, ReentrantLock>();
  private static ThreadLocal<Object> keyThreadLocal = new ThreadLocal<Object>();
  private int maximumLockDurationSeconds;
  public EntitySynchronizer() {
    this(DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS);
  }
  public EntitySynchronizer(int maximumLockDurationSeconds) {
    this.maximumLockDurationSeconds = maximumLockDurationSeconds;
  }
  /**
   * Initiate a lock for all threads with this key value
   * @param key the instance identifier for concurrency synchronization
   */
  public void lock(Object key) {
    if (key == null) {
      return;
    }
    /*
     * returns the existing lock for specified key, or null if there was no existing lock for the
     * key
     */
    ReentrantLock lock;
    synchronized (mutex) {
      lock = locks.putIfAbsent(key, new ReentrantLock(true));
      if (lock == null) {
        lock = locks.get(key);
      }
    }
    /*
     * Acquires the lock and returns immediately with the value true if it is not held by another
     * thread within the given waiting time and the current thread has not been interrupted. If this
     * lock has been set to use a fair ordering policy then an available lock will NOT be acquired
     * if any other threads are waiting for the lock. If the current thread already holds this lock
     * then the hold count is incremented by one and the method returns true. If the lock is held by
     * another thread then the current thread becomes disabled for thread scheduling purposes and
     * lies dormant until one of three things happens: - The lock is acquired by the current thread;
     * or - Some other thread interrupts the current thread; or - The specified waiting time elapses
     */
    try {
      /*
       * using tryLock(timeout) instead of lock() to prevent deadlock situation in case acquired
       * lock is not released normalRelease will be false if the lock was released because the
       * timeout expired
       */
      boolean normalRelease = lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS);
      /*
       * lock was release because timeout expired. We do not want to proceed, we should throw a
       * concurrency exception for waiting thread
       */
      if (!normalRelease) {
        throw new ConcurrentModificationException(
            "Entity synchronization concurrency lock timeout expired for item key: " + key);
      }
    } catch (InterruptedException e) {
      throw new IllegalStateException("Entity synchronization interrupted exception for item key: "
          + key);
    }
    keyThreadLocal.set(key);
  }
  /**
   * Unlock this thread's lock. This takes care of preserving the lock for any waiting threads with
   * the same entity key
   */
  public void unlock() {
    Object key = keyThreadLocal.get();
    keyThreadLocal.remove();
    if (key != null) {
      ReentrantLock lock = locks.get(key);
      if (lock != null) {
        try {
          synchronized (mutex) {
            if (!lock.hasQueuedThreads()) {
              locks.remove(key);
            }
          }
        } finally {
          lock.unlock();
        }
      } else {
        synchronized (mutex) {
          locks.remove(key);
        }
      }
    }
  }
}

This class is used as follows:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId);  // 'user' is the entity by which i want to differentiate my synchronization
try {
  //execute my code here ...
} finally {
  entitySynchronizer.unlock();
}

The problem is that it doesn't work perfectly. At very high concurrency loads, there are still some cases wherein multiple threads with the same key are not being synchronized. I've been through and tested fairly thoroughly and cannot figure out why/where this can happen.

Any concurrency experts out there?

Was it helpful?

Solution

Your solution suffers from the lack of atomicity. Consider the following scenario:

  • Thread A enters lock() and obtains existing lock from the map.
  • Thread B enters unlock() for the same key, unlocks and removes the lock from the map (since thread A haven't called tryLock() yet).
  • Thread A successfully calls tryLock().

One possible option is to keep track of number of locks "checked out" from the map:

public class EntitySynchronizer {
    private Map<Object, Token> tokens = new HashMap<Object, Token>();
    private ThreadLocal<Token> currentToken = new ThreadLocal<Token>();
    private Object mutex = new Object();

    ...

    public void lock(Object key) throws InterruptedException {
        Token token = checkOut(key);
        boolean locked = false;
        try {
            locked = token.lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS));
            if (locked) currentToken.set(token);
        } finally {
            if (!locked) checkIn(token);
        }
    }

    public void unlock() {
        Token token = currentToken.get();
        if (token != null) {
            token.lock.unlock();
            checkIn(token);
            currentToken.remove();
        }
    }

    private Token checkOut(Object key) {
        synchronized (mutex) {
            Token token = tokens.get(key);
            if (token != null) {
                token.checkedOutCount++;
            } else {
                token = new Token(key); 
                tokens.put(key, token);
            }
            return token;
        }
    }

    private void checkIn(Token token) {
        synchronized (mutex) {
            token.checkedOutCount--;
            if (token.checkedOutCount == 0)
                tokens.remove(token.key);
        }
    }

    private class Token {
        public final Object key;
        public final ReentrantLock lock = new ReentrantLock();
        public int checkedOutCount = 1;

        public Token(Object key) {
            this.key = key;
        }
    }
}

Note that tokens doesn't have to be ConcurentHashMap since its methods are called in synchronized blocks anyway.

OTHER TIPS

One of the things you should fix is this:

ReentrantLock lock;
synchronized (mutex) {
  lock = locks.putIfAbsent(key, new ReentrantLock(true));
  if (lock == null) {
    lock = locks.get(key);
  }
}

This misses the whole point of a concurrent map. Why didn't you write it like this:

ReentrantLock lock = new ReentrantLock(true);
final ReentrantLock oldLock = locks.putIfAbsent(key, lock);
lock = oldLock != null? oldLock : lock;

I'm assuming you're not really using your class as follows:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId);  // 'user' is the entity by which i want to differentiate my synchronization
try {
  //execute my code here ...
} finally {
  entitySynchronizer.unlock();
}

But have a singleton instance of the EntitySynchronizer? Because otherwise that's your problem right there.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top