Question

I am a Netty beginner and would like to run a server socket and a client socket at the same time using Netty. However, when I have tried it, I could run only one socket and the other one could not. Actually what I want is I am going to communicate with a websocket of Webbrowser and pass it to a client socket in the same app to send it to a TCP/IP socket server. Your help would be appreciated.

Here is my code:

public class WsServer {
    private int port;

    public WsServer(int port){
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new WsServerInitializer());

            Channel ch = b.bind(port).sync().channel();
            System.out.println("Web socket server started at port " + port + '.');
            System.out.println("Open your browser and navigate to http://localhost:" + port + '/');

            ch.closeFuture().sync();
        } finally {
            InternalSocket.close();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}



public class WsServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("codec-http", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
        pipeline.addLast("handler", new WsSocketHandler());
    }
}



public class WsSocketHandler extends WebSocketServerHandler {

    private static final Logger logger = Logger.getLogger(WsSocketHandler.class);

    protected static final String WEBSOCKET_PATH = "/websocket";
    private WebSocketServerHandshaker handshaker;

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //super.channelRead0(ctx, msg);
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        super.exceptionCaught(ctx, cause);
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
        // Handle a bad request.
        if (!req.getDecoderResult().isSuccess()) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // Allow only GET methods.
        if (req.getMethod() != GET) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
            return;
        }

        // Send the demo page and favicon.ico
        if ("/".equals(req.getUri())) {
            ByteBuf content = WebSocketServerIndexPage.getContent(getWebSocketLocation(req));
            FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, OK, content);

            res.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
            setContentLength(res, content.readableBytes());

            sendHttpResponse(ctx, req, res);
            return;
        }
        if ("/favicon.ico".equals(req.getUri())) {
            FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND);
            sendHttpResponse(ctx, req, res);
            return;
        }

        // Handshake
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                getWebSocketLocation(req), null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check for closing frame
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
                    .getName()));
        }

        // Send the uppercase string back.
        String receivedText = ((TextWebSocketFrame) frame).text();
        //System.out.println("========= receivedText:" + receivedText);
        handleWsSocketRequest(ctx, receivedText);

        //logger.debug(String.format("%s received %s", ctx.channel(), receivedText));
        //ctx.channel().write(new TextWebSocketFrame(receivedText.toUpperCase()));
    }

    private void handleWsSocketRequest(ChannelHandlerContext ctx, String receivedText){
        System.out.println("[WebSocketHandler]" + receivedText);
        if (StringUtils.isBlank(receivedText)) return;


        try {
            //InternalClient client = InternalClient.getInstance("localhost", 31200);
            //client.send(receivedText);
//      } catch (Exception e1) {
            //e1.printStackTrace();
        //}
    //}

    protected ChannelFuture send(ChannelHandlerContext ctx, String msg){
        System.out.println(String.format("%s sent: %s", ctx.channel(), msg));
        return ctx.channel().write(new TextWebSocketFrame(msg));
    }

    protected static void sendHttpResponse(
            ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // Generate an error page if response getStatus code is not OK (200).
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            setContentLength(res, res.content().readableBytes());
        }

        // Send the response and close the connection if necessary.
        ChannelFuture f = ctx.channel().write(res);
        ctx.channel().flush();
        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    protected static String getWebSocketLocation(FullHttpRequest req) {
        return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH;
    }

}


public class InternalClient {


    private String host;
    private int port;
    private InternalClientHandler handler;

    private InternalClient(String host, int port){
        this.host = host;
        this.port = port;
        handler = new InternalClientHandler();
    }

    public void run() throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new InternalClietnIntializer());

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            System.out.println("[InternalClient] Connects to " + host + ":" + port + '.');

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public void send(String msg){
        handler.send(msg);
    }
}


public class InternalClietnIntializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // On top of the SSL handler, add the text line codec.
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new InternalClientHandler());
    }

}

public class InternalClientHandler extends ChannelInboundHandlerAdapter {

    ChannelHandlerContext context;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.context = ctx;
        System.out.println("[InternalClientHandler]channelActive");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("channelRead ");

        String recvText = "" + msg;
        System.out.println("###[InternalClientHandler]Received###" + recvText);

        if (StringUtils.isBlank(recvText)){
            return;
        }
    }

    public void send(String msg) {
          if (context!=null){
               context.channel().writeAndFlush(msg + "\r\n");
           System.out.println("###[InternalClientHandler]Sent###" + msg + "\r\n");
          }
    }
}


public class ServerMain {

    public static void main(String[] args) throws Exception {

        int port = 31300;

        try {
                        WsServer server = new WsServer(port);
            server.run();

                        //Below does not run
                        InternalClient client = new InternalClient("localhost", port+1000);
                        client.run();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }

    }

}
Was it helpful?

Solution

This is because you call channel.closeFuture().sync(). This will block until the Channel is closed again. You will need to remove the line

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top