Domanda

I am trying to do some processing on a SynchronizedQueue using a tail recursive function. The function seems to work properly but the more I think about concurrency the more I believe I could have some race conditions when accessing this queue with different threads. Here is the function that I think I could use some help with:

val unsavedMessages = new SynchronizedQueue[CachedMessage]()

val MAX_BATCH = 256
val rowCount = new AtomicInteger()

  private def validateCacheSize() = if (unsavedMessages.length > MAX_BATCH) {
    implicit val batch = createBatch
    val counter = rowCount.getAndIncrement
    @tailrec
    def processQueue(queue: SynchronizedQueue[CachedMessage]): Unit = if (queue.nonEmpty) {
      val cm = queue.dequeue
      addToBatch(cm.request, cm.timestamp, cm.brokerId, counter)
      processQueue(queue)
    }
    processQueue(unsavedMessages)
    executeBatch
    resetQueue
  }

  def resetQueue = unsavedMessages.clear

Multiple threads call this function:

  def add(request: WebserviceRuleMatch, timestamp: Long, brokerId: String) = {
    validateCacheSize
    //log.info("enquing request "+ unsavedMessages.length)
    unsavedMessages.enqueue(CachedMessage(request, timestamp, brokerId))
  }

Does anyone have any pointers on how to improve this so there would likely not be a race condition?

È stato utile?

Soluzione

there could be a chance that the queue gets emptied between queue.nonempty and queue.dequeue

  • Avoid calling multiple queue operations that must be synchronized within your code. Use the power of SynchronizedQueue to do atomic thread-safe operations. E.g. avoid calling queue.nonempty altogether (alternative to tail-recursion):

    for (cm <- unsavedMessages.dequeueAll(_ => true)) 
      addToBatch(cm.request, cm.timestamp, cm.brokerId, counter)
    executeBatch
    //resetQueue   -- Don't do this!  Not thread-safe
    

I think messages could be added by a thread between processQueue and resetQueue

  • There will always be a point at which your code has taken a 'snapshot' of the queue and emptied it. My previous point ensured that the 'snapshot' and emptying are a single atomic operation. If new entries are enqueued at any point after that atomic 'snapshot & empty' operation - no problem. Your 'snapshot & empty' must occur somewhere and new items enqueued are a fact of life. Make the decision to allow new items to be enqueued at any point subsequent to the 'snapshot & empty'. They'll be processed on next cycle. i.e. nothing extra needed beyond above point.

Robin Green: (By the way, that method seems to have a very misleading name!)

  • Wot he said! :)

Altri suggerimenti

The add function gets gets called from a future so I feel as though there could be a chance that the queue gets emptied between queue.nonempty and queue.dequeue.

Yes, it could. You could use double-checked locking to make validateCacheSize single-threaded. (By the way, that method seems to have a very misleading name!)

Also I think messages could be added by a thread between processQueue and resetQueue.

Yes, they could. But why do you need to call unsavedMessages.clear at all? queue.dequeue already removes them from the queue. So the only unsavedMessages that should exist in the queue then are ones that still remain to be processed.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top