Question

I have the following situation:

A new channel connection is opened in this way:

    ClientBootstrap bootstrap = new ClientBootstrap(
             new OioClientSocketChannelFactory(Executors.newCachedThreadPool()));

    icapClientChannelPipeline = new ICAPClientChannelPipeline();           
    bootstrap.setPipelineFactory(icapClientChannelPipeline);
    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
    channel = future.awaitUninterruptibly().getChannel();

This is working as expected.

Stuff is written to the channel in the following way:

channel.write(chunk)

This also works as expected when the connection to the server is still alive. But if the server goes down (machine goes offline), the call hangs and doesn't return.

I confirmed this by adding log statements before and after the channel.write(chunk). When the connection is broken, only the log statement before is displayed.

  1. What is causing this? I thought these calls are all async and return immediately? I also tried with NioClientSocketChannelFactory, same behavior.

  2. I tried to use channel.getCloseFuture() but the listener never gets called, I tried to check the channel before writing with channel.isOpen(), channel.isConnected() and channel.isWritable() and they are always true...

  3. How to work around this? No exception is thrown and nothing really happens... Some questions like this one and this one indicate that it isn't possible to detect a channel disconnect without a heartbeat. But I can't implement a heartbeat because I can't change the server side.

Environment: Netty 3, JDK 1.7

Was it helpful?

Solution

Ok, I solved this one on my own last week so I'll add the answer for completness.

I was wrong in 3. because I thought I'll have to change both the client and the server side for a heartbeat. As described in this question you can use the IdleStateAwareHandler for this purpose. I implemented it like this:

The IdleStateAwareHandler:

public class IdleStateAwareHandler extends IdleStateAwareChannelHandler {

    @Override
    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
        if (e.getState() == IdleState.READER_IDLE) {
            e.getChannel().write("heartbeat-reader_idle");
        }
        else if (e.getState() == IdleState.WRITER_IDLE) {
            Logger.getLogger(IdleStateAwareHandler.class.getName()).log(
                    Level.WARNING, "WriteIdle detected, closing channel");
            e.getChannel().close();
            e.getChannel().write("heartbeat-writer_idle");
        }
        else if (e.getState() == IdleState.ALL_IDLE) {
            e.getChannel().write("heartbeat-all_idle");
        }
    }
}

The PipeLine:

public class ICAPClientChannelPipeline implements ICAPClientPipeline {

        ICAPClientHandler icapClientHandler;
        ChannelPipeline pipeline;

        public ICAPClientChannelPipeline(){
            icapClientHandler = new ICAPClientHandler();
        pipeline = pipeline(); 
            pipeline.addLast("idleStateHandler", new IdleStateHandler(new HashedWheelTimer(10, TimeUnit.MILLISECONDS), 5, 5, 5));
            pipeline.addLast("idleStateAwareHandler", new IdleStateAwareHandler());
            pipeline.addLast("encoder",new IcapRequestEncoder());
            pipeline.addLast("chunkSeparator",new IcapChunkSeparator(1024*4));
            pipeline.addLast("decoder",new IcapResponseDecoder());
            pipeline.addLast("chunkAggregator",new IcapChunkAggregator(1024*4));
            pipeline.addLast("handler", icapClientHandler);            
        }

        @Override
    public ChannelPipeline getPipeline() throws Exception {
            return pipeline;
    }                     
}

This detects any read or write idle state on the channel after 5 seconds. As you can see it is a little bit ICAP-specific but this doesn't matter for the question.

To react to an idle event I need the following listener:

channel.getCloseFuture().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
            doSomething();
    }
});
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top