Question

I'm using the observer pattern and a BlockingQueue to add some instances. Now in another method I'm using the queue, but it seems take() is waiting forever, even though I'm doing it like this:

/** {@inheritDoc} */
@Override
public void diffListener(final EDiff paramDiff, final IStructuralItem paramNewNode,
    final IStructuralItem paramOldNode, final DiffDepth paramDepth) {
    final Diff diff =
        new Diff(paramDiff, paramNewNode.getNodeKey(), paramOldNode.getNodeKey(), paramDepth);
    mDiffs.add(diff);
    try {
        mDiffQueue.put(diff);
    } catch (final InterruptedException e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mEntries++;

    if (mEntries == AFTER_COUNT_DIFFS) {
        try {
            mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
        } catch (final Exception e) {
            LOGWRAPPER.error(e.getMessage(), e);
        }
        mEntries = 0;
        mDiffs = new LinkedList<>();
    }
}

/** {@inheritDoc} */
@Override
public void diffDone() {
    try {
        mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
    } catch (final Exception e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mDone = true;
}

whereas mDiffQueue is a LinkedBlockingQueue and I'm using it like this:

while (!(mDiffQueue.isEmpty() && mDone) || mDiffQueue.take().getDiff() == EDiff.INSERTED) {}

But I think the first expression is checked whereas mDone isn't true, then maybe mDone is set to true (an observer always is multithreaded?), but it's already invoking mDiffQueue.take()? :-/

Edit: I really don't get it right now. I've recently changed it to:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
        if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
            break;
        }
    }
}

If I wait in the debugger a little time it works, but it should also work in "real time" since mDone is initialized to false and therefore the while-condition should be true and the body should be executed.

If the mDiffQueue is empty and mDone is true it should skip the body of the while-loop (which means the queue isn't filled anymore).

Edit: Seems it is:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
         if (mDiffQueue.peek() != null) {
             if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
                 break;
             }
         }
    }
}

Even though I don't get why the peek() is mandatory.

Edit:

What I'm doing is iterating over a tree and I want to skip all INSERTED nodes:

for (final AbsAxis axis = new DescendantAxis(paramRtx, true); axis.hasNext(); axis.next()) {
    skipInserts();
    final IStructuralItem node = paramRtx.getStructuralNode();
    if (node.hasFirstChild()) {
        depth++;
        skipInserts();
        ...

Basically computing the maximum depth or level in the tree without considering nodes which have been deleted in another revision of the tree (for a comparsion Sunburst visualization), but ok, that's maybe out of scope. Just to illustrate that I'm doing something with nodes which haven't been inserted, even if it's just adjusting the maximum depth.

regards,

Johannes

Was it helpful?

Solution

First advice: don't synchronized (mDiffQueue). You would get deadlock if the LinkedBlockingQueue had some synchronized method; it's not the case here, but it's a practice that you should avoid. Anyway, I don't see why you are synchronizing at that point.

You have to "wake up" periodically while waiting to check if mDone has been set:

while (!(mDiffQueue.isEmpty()  && mDone)) {
   // poll returns null if nothing is added in the queue for 0.1 second.
   Diff diff = mDiffQueue.poll(0.1, TimeUnit.SECONDS); 
   if (diff != null)
      process(diff);
}

This is about the same as using peek, but peek basically waits for a nanosecond instead. Using peek is called "busy waiting" (your thread runs the while loop non-stop) and using pool is called "semi-busy waiting" (you let the thread sleep at intervals).

I guess in your case process(diff) would be to get out of the loop if diff is not of type EDiff.INSERTED. I'm not sure if that is what you are trying to accomplish. This seems odd since you are basically just stalling the consumer thread until you get a single element of the right type, and then you do nothing with it. And you cannot receive the future incoming elements since you are out of the while loop.

OTHER TIPS

take() is a "blocking call". That means it will block (wait forever) until something is on the queue then it will return what was added. Of course, if something is on the queue, it will return immediately.

You can use peek() to return what would be returned by take() - that is, peek() returns the next item without removing it from the queue, or returns null if there's nothing on the queue. Try using peek() instead in your test (but check for null too).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top