How to communicate between non-blocking client and non-blocking server through only one SocketChannel [duplicate]

StackOverflow https://stackoverflow.com/questions/9971314

  •  28-05-2021
  •  | 
  •  

Question

I'm trying to write a non-blocking client and non-blocking server with requirements:

  • Server just listens to clients and send back to them what it has received
  • Client can send message to server at anytime and can do it many times, but by just only one SocketChannel.

I read this tutorial: http://rox-xmlrpc.sourceforge.net/niotut/index.html, and it implements the sever like these steps:

  1. Initialize a Selector
  2. Wait for an acceptable SelectionKey from client
  3. Then wait for a readable SelectionKey, and read data from it (when socketChannel.read return -1, close the socketChannel)
  4. After that send back to client by a writable SelectionKey

I took a look at this tutorial also: https://forums.oracle.com/forums/thread.jspa?threadID=1145909&tstart=2040, but it's too hard for me to understand

So I rewrite my own code based on ROX's tutorial. Here is my code http://www.mediafire.com?o30yvtp5kqpya8b (it's almost based on code of ROX's tutorial). Because it's hard to post all code here, so I upload my project to mediafire to let you can download it, please download it and you can import it to Eclipse to view code easily.

You can run: MyServer.java in package server, then MyClent.java in package client_test (don't care about package client)

After running server and client you will see server just can receive the first message from client, but it should be received 2 messages. I know there's something wrong with my implement, but I don't know why and how to fix it.

Any suggestions about fixing my code or about solution for my requirement will be appreciated, thank you all.

Ok, I will post the revelant parts of code here:

My Client:

MyClient.java

public class MyClient implements Runnable {
    // The host:port combination to connect to
    private InetAddress hostAddress;
    private int port;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(8);

    // A list of PendingChange instances
    private List pendingChanges = new LinkedList();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map pendingData = new HashMap();

    // Maps a SocketChannel to a RspHandler
    private Map rspHandlers = Collections.synchronizedMap(new HashMap());


    private SocketChannel socket;
    private static MyResponseHandler handler;   

    public MyClient(InetAddress hostAddress, int port) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();

        handler = new MyResponseHandler();      
    }

    public void send(byte[] data, MyResponseHandler handler) throws IOException {
        System.out.println("------------ send() ---- BEGIN");

        // Register the response handler
        this.rspHandlers.put(socket, handler);  

        // And queue the data we want written
        synchronized (this.pendingData) {
            List queue = (List) this.pendingData.get(socket);
            if (queue == null) {
                queue = new ArrayList();
                this.pendingData.put(socket, queue);
            }
            queue.add(ByteBuffer.wrap(data));
        }

        // Finally, wake up our selecting thread so it can make the required changes
        this.selector.wakeup();
        System.out.println("------------ send() ---- END");
    }

    public void run() {
        while (true) {
            System.out.println("------------ while in run() ---- BEGIN");
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator changes = this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        System.out.println("CHANGE!!!!!!!!!!!!!!!!!");
                        ChangeRequest change = (ChangeRequest) changes.next();
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = change.socket.keyFor(this.selector);
                            key.interestOps(change.ops);
                            break;
                        case ChangeRequest.REGISTER:
                            change.socket.register(this.selector, change.ops);
                            break;
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();
                System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^");
                // Iterate over the set of keys for which events are available
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    System.out.println("There's something in this while loop");
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        System.out.println("key is invalid");
                        continue;
                    }

                    // Check what event is available and deal with it
                    if (key.isConnectable()) {
                        this.finishConnection(key);
                    } else if (key.isReadable()) {
                        this.read(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.out.println("------------ while in run() ---- END");
        }
    }

    private void read(SelectionKey key) throws IOException {
        System.out.println("------------ read() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            return;
        }

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            key.channel().close();
            key.cancel();       
            return;
        }

        // Handle the response
        this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
        System.out.println("------------ read() ---- END");
    }

    private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
        System.out.println("------------ handleResponse() ---- BEGIN");
        // Make a correctly sized copy of the data before handing it
        // to the client
        byte[] rspData = new byte[numRead];
        System.arraycopy(data, 0, rspData, 0, numRead);

        // Look up the handler for this channel
        MyResponseHandler handler = (MyResponseHandler) this.rspHandlers.get(socketChannel);

        // And pass the response to it
        if (handler.handleResponse(rspData)) {
            // The handler has seen enough, close the connection
            socketChannel.close();
            socketChannel.keyFor(this.selector).cancel();
        }
        System.out.println("------------ handleResponse() ---- END");
    }

    private void write(SelectionKey key) throws IOException {
        System.out.println("------------ write() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List queue = (List) this.pendingData.get(socketChannel);

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }

            if (queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                key.interestOps(SelectionKey.OP_READ);
            }
        }
        System.out.println("------------ write() ---- END");
    }

    private void finishConnection(SelectionKey key) throws IOException {
        System.out.println("------------ finishConnection() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Finish the connection. If the connection operation failed
        // this will raise an IOException.
        try {
            socketChannel.finishConnect();
        } catch (IOException e) {
            // Cancel the channel's registration with our selector
            System.out.println(e);
            key.cancel();
            return;
        }

        // Register an interest in writing on this channel
        key.interestOps(SelectionKey.OP_WRITE);
        System.out.println("------------ finishConnection() ---- END");
    }

    private void initiateConnection() throws IOException {
        System.out.println("------------ initiateConnection() ---- BEGIN");
        // Create a non-blocking socket channel
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);

        // Kick off connection establishment
        socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));

        // Queue a channel registration since the caller is not the 
        // selecting thread. As part of the registration we'll register
        // an interest in connection events. These are raised when a channel
        // is ready to complete connection establishment.
        synchronized(this.pendingChanges) {
            this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
        }

        System.out.println("------------ initiateConnection() ---- END");
        socket = socketChannel;
    }

    private Selector initSelector() throws IOException {
        // Create a new selector
        return SelectorProvider.provider().openSelector();
    }

    public static void main(String[] args) {
        try {
            MyClient client = new MyClient(InetAddress.getByName("127.0.0.1"),
                    9090);
            Thread t = new Thread(client);
            t.setDaemon(true);
            t.start();

            // Start a new connection
            client.initiateConnection();            

            // 1st
            client.send("hehe|||".getBytes(), handler);
            System.out.println("SEND: " + "hehe|||");
            handler.waitForResponse();

            System.out.println("==========================================================");

            // 2nd
            client.send(("2 hehe|||").getBytes(), handler);
            System.out.println("SEND: " + "2 hehe|||");
            handler.waitForResponse();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MyResponseHandler.java

public class MyResponseHandler {
    private byte[] rsp = null;

    public synchronized boolean handleResponse(byte[] rsp) {
        this.rsp = rsp;
        this.notify();
        return true;
    }

    public synchronized void waitForResponse() {
        while(this.rsp == null) {
            try {
                System.out.println("--waiting...");
                this.wait();
                System.out.println("--done!!!");
            } catch (InterruptedException e) {}
        }

        System.out.println("RECEIVE: " + new String(this.rsp));

        /**
         *  Set @rsp = null to let the block inside the above while loop
         *  will be run again 
         */
        rsp = null;
    }
}

MyServer.java

public class MyServer implements Runnable {
    // CONSTANT
    private final static int BUFFER_SIZE = 8;

    // The host:port combination to listen on
    private InetAddress hostAddress;
    private int port;

    // The channel on which we'll accept connections
    private ServerSocketChannel serverChannel;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

    private RequestCollector requestCollector;

    // A list of PendingChange instances
    private List<ChangeRequest> pendingChanges = 
            new LinkedList<ChangeRequest>();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map<SocketChannel, List<ByteBuffer>> pendingData = 
            new HashMap<SocketChannel, List<ByteBuffer>>();

    public MyServer(InetAddress hostAddress, int port, 
            RequestCollector requestCollector) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();
        this.requestCollector = requestCollector;
    }

    public void send(SocketChannel socket, byte[] data) {
        System.out.println("------------ send() ---- BEGIN");
        synchronized (this.pendingChanges) {
            // Indicate we want the interest ops set changed
            this.pendingChanges.add(new ChangeRequest(socket,
                    ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));

            // And queue the data we want written
            synchronized (this.pendingData) {
                List<ByteBuffer> queue = 
                        (List<ByteBuffer>) this.pendingData.get(socket);
                if (queue == null) {
                    queue = new ArrayList<ByteBuffer>();
                    this.pendingData.put(socket, queue);
                }
                queue.add(ByteBuffer.wrap(data));
            }
        }

        // Finally, wake up our selecting thread so it can make the required 
        // changes
        this.selector.wakeup();
        System.out.println("------------ send() ---- END");
    }

    public void run() {
        while (true) {
            System.out.println("------------ while in run() ---- BEGIN");
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator<ChangeRequest> changes = 
                            this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        System.out.println("CHANGE!!!!!!!!!!!!!!!!!");
                        ChangeRequest change = (ChangeRequest) changes.next();
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = 
                                change.socket.keyFor(this.selector);
                            key.interestOps(change.ops);
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();
                System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^");

                // Iterate over the set of keys for which events are available
                Iterator<SelectionKey> selectedKeys = 
                        this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    System.out.println("There's something in this while loop");
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        System.out.println("key is invalid");
                        continue;
                    }

                    // Check what event is available and deal with it
                    if (key.isAcceptable()) {
                        this.accept(key);
                    } else if (key.isReadable()) {
                        this.read(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("------------ while in run() ---- END");
        }
    }

    private void accept(SelectionKey key) throws IOException {
        System.out.println("------------ accept() ---- BEGIN");
        // For an accept to be pending the channel must be a server socket 
        // channel.
        ServerSocketChannel serverSocketChannel = 
                (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        //Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        SelectionKey readKey = 
                socketChannel.register(this.selector, SelectionKey.OP_READ);

        // Attach a StringBuilder to this SocketChannel
        readKey.attach( new StringBuilder() );

        // DEBUG    
        System.out.println(socketChannel.socket().getInetAddress() + " - "
                + socketChannel.socket().getPort());
        System.out.println("------------ accept() ---- END");
    }

    private void read(SelectionKey key) throws IOException {
        System.out.println("------------ read() ---- BEGIN");
        // Get socket channel
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Get attached StringBuilder
        StringBuilder currentMessage = (StringBuilder) key.attachment();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            return;
        }

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            key.cancel();
            return;
        }

        // Hand the data off to our requestCollector thread
        this.requestCollector.processData(this, socketChannel, 
                this.readBuffer.array(), numRead, currentMessage);
        System.out.println("------------ read() ---- END");
    }

    private void write(SelectionKey key) throws IOException {
        System.out.println("------------ write() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List<ByteBuffer> queue = 
                    (List<ByteBuffer>) this.pendingData.get(socketChannel);

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }

            if (queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                key.interestOps(SelectionKey.OP_READ);
            }
        }
        System.out.println("------------ write() ---- END");
    }

    private Selector initSelector() throws IOException {
        System.out.println("------------ initSelector() ---- BEGIN");
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, 
                this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        System.out.println("------------ initSelector() ---- END");
        return socketSelector;
    }

    public static void main(String[] args) {
        try {
            RequestCollector requestCollector = new RequestCollector();
            new Thread(requestCollector).start();
            new Thread(new MyServer(null, 9090, requestCollector)).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

If you want to see more code, you can download the zip file. And when run server and client, the result of debugging is:

Server

------------ initSelector() ---- BEGIN
------------ initSelector() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ accept() ---- BEGIN
/127.0.0.1 - 46553
------------ accept() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ read() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
RECEIVE: hehe|||
------------ send() ---- BEGIN
------------ send() ---- END
^^^^^^^^^^^^^^^^^^^^^^^^
------------ while in run() ---- END
------------ while in run() ---- BEGIN
SEND: hehe|||
CHANGE!!!!!!!!!!!!!!!!!
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ write() ---- BEGIN
------------ write() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ while in run() ---- END
------------ while in run() ---- BEGIN

Client

------------ initiateConnection() ---- BEGIN
------------ initiateConnection() ---- END
------------ send() ---- BEGIN
------------ send() ---- END
SEND: hehe|||
--waiting...
------------ while in run() ---- BEGIN
CHANGE!!!!!!!!!!!!!!!!!
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ finishConnection() ---- BEGIN
------------ finishConnection() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ write() ---- BEGIN
------------ write() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ handleResponse() ---- BEGIN
--done!!!
RECEIVE: hehe|||
==========================================================
------------ send() ---- BEGIN
------------ send() ---- END
SEND: 2 hehe|||
--waiting...
------------ handleResponse() ---- END
------------ read() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
------------ while in run() ---- END
------------ while in run() ---- BEGIN
Was it helpful?

Solution

I've just commented on most of this code in another recent post. As for your own code, response handlers should not loop and sleep, they should either do a blocking read with a timeout, or else loop calling select() with a timeout.

OTHER TIPS

import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.io.IOException;

public class EchoServer
{
    public static int DEFAULT_PORT=7;

    public static void main(String [] args)
    {

        ServerSocketChannel serverChannel;
        Selector selector;
        try
        {
            serverChannel = ServerSocketChannel.open();
            ServerSocket ss = serverChannel.socket();
            InetSocketAddress address = new InetSocketAddress(DEFAULT_PORT);
            ss.bind(address);
            serverChannel.configureBlocking(false);
            selector=Selector.open();
            serverChannel.register(selector,SelectionKey.OP_ACCEPT);
        } catch(IOException ex) {ex.printStackTrace(); return;}


        while(true)
        {
            int selectednum=0;
            try{
                selectednum=selector.select();  //blocks
            }catch (IOException ex) {ex.printStackTrace(); break;}
            if (selectednum>0) {
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key=iterator.next();
                iterator.remove();
                try{

                    if (key.isValid()==false) {key.cancel(); key.channel().close(); continue; }

                    if (key.isAcceptable()){
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        System.out.println("Accepted from "+client);
                        client.configureBlocking(false);
                        SelectionKey clientKey=client.register(
                            selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        clientKey.attach(buffer);
                    }
                    if (key.isReadable()){
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        System.out.println("Reading.."+key.channel());
                        client.read(output);
                    }
                    if (key.isWritable()){
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        output.flip();
                        System.out.println("Writing..");
                        client.write(output);
                        output.compact();
                    }
                } catch (IOException ex) {
                    key.cancel(); 
                    try { key.channel().close();} 
                    catch (IOException cex) {}; 
                }
            }
        }
        }
    }
}

--client--

import java.net.*;
import java.nio.*;
import java.io.*;
import java.util.*;


public class EchoClient
{

    public static void main(String [] args)
    {
        byte ch='a';
        try{
        Socket socket = new Socket("localhost",7);
        OutputStream out = socket.getOutputStream();
        InputStream in = socket.getInputStream();
        int closein=5;

        while(true){
            Thread.sleep(1000);
            out.write((byte) ch++);
            System.out.println((char) in.read());
            if (--closein<=0) socket.close();
        }
        }
        catch (InterruptedException ex) {}
        catch (IOException ex) {}
        catch (RuntimeException ex) {}
    }

}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top