Question

First, I'll explain the situation and the logic that I'm trying to implement:

  • I have multiple threads, each put result of it work, some object called Result into queue QueueToSend

  • My NettyClient runs in thread and takes Result from QueueToSend every 1 milisecond and should connect to server and send a message, that is created from Result. I also need this connections to be asynch. So I need the Result list to be known by NettyHandler to send right message and process right result and then again send response.

So I initialize NettyClient bootstrap

bootstrap = new ClientBootstrap(
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

and sets pipeline once when app starts. Then, every milisecond I take Result object from QueueToSend and connect to server

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,port);
ResultConcurrentHashMap.put(future.getChannel().getId(), result);

I decided to use static ConcurrentHashMap to save every result object taken from QueueToSend assosiated with channel.

The first problem takes place in NettyHandler in method channelConnected, when I am trying to take Result object assosiated with channel from ResultConcurrentHashMap.

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
       Channel channel = ctx.getPipeline.getChannel();
       Result result = ResultConcurrentHashMap.get(channel.getId());
}

But sometimes result is null (1 of 50), even thought it should be in ResultConcurrentHashMap. I think it happens cause that channelConnected event happens before NettyClient runs this code:

ResultConcurrentHashMap.put(future.getChannel().getId(), result);

May be it will not appear if I run NettyServer and NettyClient not on localhost both, but remotely, it will take moretime to estabilish the connection. But I need a solution for this issue.

Another issue is that I am sending messages every 1 milisecond asynchromously and I suppose that messages are may be mixed and server can not read them properly. If I run them one by one it will be ok :

future.getChannel().getCloseFuture().awaitUninterruptibly();

But I need asynchromus sending, and process right results, assosiated with channel and send responses. What should I implement?

No correct solution

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top