there could be a chance that the queue gets emptied between queue.nonempty and queue.dequeue
Avoid calling multiple queue operations that must be synchronized within your code. Use the power of
SynchronizedQueue
to do atomic thread-safe operations. E.g. avoid callingqueue.nonempty
altogether (alternative to tail-recursion):for (cm <- unsavedMessages.dequeueAll(_ => true)) addToBatch(cm.request, cm.timestamp, cm.brokerId, counter) executeBatch //resetQueue -- Don't do this! Not thread-safe
I think messages could be added by a thread between processQueue and resetQueue
- There will always be a point at which your code has taken a 'snapshot' of the queue and emptied it. My previous point ensured that the 'snapshot' and emptying are a single atomic operation. If new entries are enqueued at any point after that atomic 'snapshot & empty' operation - no problem. Your 'snapshot & empty' must occur somewhere and new items enqueued are a fact of life. Make the decision to allow new items to be enqueued at any point subsequent to the 'snapshot & empty'. They'll be processed on next cycle. i.e. nothing extra needed beyond above point.
Robin Green: (By the way, that method seems to have a very misleading name!)
- Wot he said! :)