Question

I use Netty for a multithreaded TCP server and a single client persistent connection. The client sends many binary messages (10000 in my use case) and is supposed to receive an answer for each message. I added an OrderedMemoryAwareThreadPoolExecutor to the pipeline to handle the execution of DB calls on multiple threads.

If I run a DB call in the method messageReceived() (or simulate it with Thread.currentThread().sleep(50)) then all events are handled by a single thread.

    5 count of {main}
    1 count of {New
10000 count of {pool-3-thread-4}

For a simple implementation of messageReceived() the server creates many executor threads as expected.

How should I configure the ExecutionHandler to get multiple threads executors for the business logic, please?

Here is my code:

public class MyServer {

      public void run() {
            OrderedMemoryAwareThreadPoolExecutor eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 1048576L, 1048576L, 1000, TimeUnit.MILLISECONDS, Executors.defaultThreadFactory());  
            ExecutionHandler executionHandler = new ExecutionHandler(eventExecutor);        
            bootstrap.setPipelineFactory(new ServerChannelPipelineFactory(executionHandler));
      }
    }  



    public class ServerChannelPipelineFactory implements ChannelPipelineFactory {

      public ChannelPipeline getPipeline() throws Exception {

        pipeline.addLast("encoder", new MyProtocolEncoder());
        pipeline.addLast("decoder", new MyProtocolDecoder());
        pipeline.addLast("executor", executionHandler);
        pipeline.addLast("myHandler", new MyServerHandler(dataSource));

      }
    }

    public class MyServerHandler extends SimpleChannelHandler {

      public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws DBException {


          // long running DB call simulation
          try {
            Thread.currentThread().sleep(50);
          } catch (InterruptedException ex) {

          }  

          // a simple message  
          final MyMessage answerMsg = new MyMessage();
          if (e.getChannel().isWritable()) {
            e.getChannel().write(answerMsg);
          }  
      }      
    }
Was it helpful?

Solution

OrderedMemoryAwareThreadPoolExecutor guarantees that events from a single channel are processed in order. You can think of it as binding a channel to a specific thread in the pool and then processing all events on that thread - although it's a bit more complex than that, so don't depend on a channel always being processed by the same thread.

If you start up a second client you'll see it (most likely) being processed on another thread from the pool. If you really can process a single client's requests in parallel then you probably want MemoryAwareThreadPoolExecutor but be aware that this offers no guarantees on the order of channel events.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top