質問

I have a distributed system, whose node receive message objects through socket. The messages are written to a BlockingQueue when received and processed in another thread. I make sure that there is just one BlockingQueue instance within a machine. The incoming rate for is very high, roughly thousands per second. The consumer works well at first, but blocks (have no response at all) after a certain period - I have checked that BlockingQueue is not empty, so should not be blocked by BlockingQueue.take(). When I manually decrease the rate of incoming message objects, the consumer works absolutely well. This is quite confusing...

Could you help me identify the problem? Thanks a lot in advance.

Consumer code:

ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat(id+"-machine-worker")
            .setDaemon(false)
            .setPriority(Thread.MAX_PRIORITY)
            .build();
ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
executor.submit(new Worker(machine));

public static class Worker implements Runnable {
    Machine machine;
    public Worker(Machine machine) {
        this.machine = machine;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Message message = machine.queue.take();
                // Do my staff here...
            } catch (Exception e) {
                logger.error(e);
            }
        }
    }
}

Producer code:

// Below code submits the SocketListener runnable described below
ExecutorService worker;
Runnable runnable = socketHandlerFactory.getSocketHandlingRunnable(socket, queue);
worker.submit(runnable);

public SocketListener(Socket mySocket, Machine machine, LinkedBlockingQueue<Message> queue) {
    this.id = machine.id;
    this.socket = mySocket;
    this.machine = machine;
    this.queue = queue;

    try {
        BufferedInputStream bis = new BufferedInputStream(socket.getInputStream(), 8192*64);
        ois = new ObjectInputStream(bis);
    } catch (Exception e) {
        logger.error("Error in create SocketListener", e);
    }
}

@Override
public void run() {
    Message message;
    try {
        boolean socketConnectionIsAlive = true;
        while (socketConnectionIsAlive) {
            if (ois != null) {
                message = (Message) ois.readObject();
                queue.put(message);
            }
        }
    } catch (Exception e) {
        logger.warn(e);
    }
}
役に立ちましたか?

解決

If you are using an unbounded queue, it may happen that the whole system is getting bogged down due to memory pressure. Also, this means that the producing intensity is not limited by the consuming intensity. So, use a bounded queue.

Another advice: get a full thread stacktrace dump when your blocking condition occurs to find out for certain where the consumer is blocking. You may get a surprise there.

他のヒント

You have several candidate problem areas:

  1. What actual BlockingQueue are you using? Did you hit the upper limit of an ArrayBlockingQueue?

  2. How much memory did you allocate for your process? I.e., what is the max heap for this process? If you hit the upper limit of that heap space from your overload of incoming messages, it's entirely possible that you had an OutOfMemoryError.

  3. What actually happens during your message processing ("Do my staff here..." [sic])? Is it possible that you have a deadlock inside that code that you only expose when you send many messages per second. Do you have an Exception eater somewhere down in that call stack that's hiding the real problem that you're experiencing?

  4. Where are your loggers logging to? Are you throwing away the indicative message because it's not logging to a location that you expect?

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top