Java Jetty WebSocket Server: Handle broadcasts with asynchronously disconnecting clients

StackOverflow https://stackoverflow.com/questions/23137993

سؤال

Background

I'm working of a proof of concept that consist of a server-client system using websocket communication. I use a small jetty solution (jetty-all-9.1.3.v20140225.jar together with servlet-api-3.1.jar). I have most of the basic functionality in order for a PoC.

I have 2 classes:

  • TestServer (With a main function creating a Server instance, see code)
  • ClientSocket (WebSocket-object instantiated for each client)

Problem

The problem I would like to discuss has to do with broadcasting client disconnects. The server saves all instances of ClientSockets in one array, the ClientSockets are added to this array in their "onConnect"-function.

The system will later on limit broadcasts to groups of clients, but for the PoC, all connected clients shall get broadcasts. If one client disconnects I want to send a notification to all other clients ("myClient has disconnected." or similar).

To do this I implement a broadcast function in the server that loops through the list of clients, sending this information to all connected clients except the one who disconnected. This function is also used to inform all clients about other things, such as new connections, and this problem should most likely occur here as well in the case a client disconnects at the very same time as someone connects or broadcasts something.

The problem is easy to produced by connecting several (10+) clients (I do it in js) and then disconnect them all at the same time. If I do this I always get concurrency errors, for very few clients (2-3) it works sometimes, depending on timing I guess.

Questions

How should I handle the task of broadcasting to all other clients, taking into account that any clients can disconnect (asynchronously) at any time? Can I do this without generating exceptions? Since it's asynchronous I can't see any other way than dealing with the exceptions occurring. Any suggestions are greatly appreciated.

Code

TestServer.java

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

public class TestServer {
    private static final TestServer testServer = new TestServer();
    private ArrayList<ClientSocket> clients = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        int port = 8080;
        Server server = new Server(port);
        WebSocketHandler wsHandler = new WebSocketHandler() {
            @Override
            public void configure(WebSocketServletFactory factory) {
                factory.register(ClientSocket.class);
            }
        };
        server.setHandler(wsHandler);
        System.out.println("Starting server on port " + port + ".");
        server.start();
        server.join();
    }

    public static TestServer getServer() {
        return testServer;
    }

    public void addClient(ClientSocket client) {
        this.clients.add(client);
    }

    public void removeClient(ClientSocket client) {
        this.clients.remove(client);
        this.broadcast("disconnect " + client.id, client);  
    }

    public void broadcast(String message, ClientSocket excludedClient) {
        log("Sending to all clients: " + message);
        for (ClientSocket cs : this.clients) {
            if (!cs.equals(excludedClient) && cs.session.isOpen() && cs != null) {
                try {
                    cs.session.getRemote().sendStringByFuture(message);
                } catch (Exception e) {
                    log("Error when broadcasting to " + cs.id + " (" + cs.address + "):");
                    log(e.getMessage());
                }
            }
        }
    }

Since looping through an array in this way generally doesn't work if you meddle with the array in the process so I also tried this broadcast function:

    public void broadcast(String message, ClientSocket excludedClient) {
        log("Sending to all clients: " + message);
        Iterator<ClientSocket> cs = this.clients.iterator();
        while (cs.hasNext()) {
            ClientSocket client = cs.next();
            if (client != null) {
                if (!client.equals(excludedClient) && client.session.isOpen()) {
                try {
                    client.session.getRemote().sendStringByFuture(message);
                } catch (Exception e) {
                    log("Error when broadcasting to " + client.id + " (" + client.address + "):");
                    log(e.getMessage());
                }
            }
        }
    }

It doesn't work any better though, since the problem is that the array can be meddled with asynchronously if another ClientSocket object disconnects as the first one is broadcasting it's disconnection.

ClientSocket.java

import java.io.IOException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

@WebSocket(maxIdleTime=0)
public class ClientSocket {
    TestServer server = TestServer.getServer();
    Session session;
    String id;
    String address;

    @OnWebSocketClose
    public void onClose(Session session, int statusCode, String reason) {
        server.log("Disconnected: " + this.id + "(" + this.address + ")" + " (statusCode=" + statusCode + ", reason=" + reason + ")");
        server.removeClient(this);
    }

    @OnWebSocketError
    public void onError(Session session, Throwable t) {
        server.log(this.id + "(" + this.address + ") error: " + t.getMessage());
    }

    @OnWebSocketConnect
    public void onConnect(Session session) {
        this.session = session;
        this.address = this.session.getRemoteAddress().getAddress().toString().substring(1);
        this.id = this.address; //Until user is registered their id is their IP
        server.log("New connection: " + this.address);
        server.addClient(this);
        try {
            session.getRemote().sendString("Hello client with address " + this.address + "!");
        } catch (IOException e) {
            server.log("Error in onConnect for " + this.id + "(" + this.address + "): " + e.getMessage());
        }
    }

    @OnWebSocketMessage
    public void onMessage(Session session, String message) {
        server.log("Received from " + this.id + "(" + this.address + "): " + message);
        String replyMessage;
        String[] commandList = message.split("\\s+");
        switch (commandList[0].toLowerCase()) {
            case "register":
                if (commandList.length > 1) {
                    this.id = commandList[1];
                    replyMessage = "Registered on server as " + this.id;
                    server.broadcast(this.id + " has connected", this);
                } else {
                    replyMessage = "Incorrect register message";
                }
                break;
            default:
                replyMessage = "echo " + message;
                break;
        }
        server.log("Sending to " + this.id + "(" + this.address + "): " + replyMessage);
        try {
            session.getRemote().sendString(replyMessage);
        } catch (IOException e) {
            server.log("Error during reply in onMessage for " + this.id + "(" + this.address + "): " + e.getMessage());
        }
    }
}

I pasted this whole class for completeness, even though I removed some of the cases in the switch for onMessage. The parts to take notice in however are the onConnect and onClose functions that will populate and remove clients from the client array in the server.

Error log

[2014-04-17 17:40:17.961] Sending to all clients: disconnect testclient4
2014-04-17 17:40:17.962:WARN:ClientSocket:qtp29398564-17: Unhandled Error (closing connection)
org.eclipse.jetty.websocket.api.WebSocketException: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:99)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
Caused by:
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
Caused by:
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
        at java.util.ArrayList$Itr.next(ArrayList.java:831)
        at TestServer.broadcast(TestServer.java:61)
        at TestServer.removeClient(TestServer.java:45)
        at ClientSocket.onClose(ClientSocket.java:22)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
[2014-04-17 17:40:17.97] testclient7(94.246.80.30) error: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]

This happens, sometimes for several clients, when disconnecting all the clients at the same time. I'm pretty sure that it has to do with ClientSocket objects disappearing at the same time as a broadcast is made.

هل كانت مفيدة؟

المحلول

Replace:

private ArrayList<ClientSocket> clients = new ArrayList<>();

With:

private List<ClientSocket> clients = new CopyOnWriteArrayList<>();

In your broadcast method just use the for each statement:

for( ClientSocket client : clients )
{
  if ( !client.equals(excludedClient) && client.session.isOpen() )
  {
    // Broadcast...
  }
}

This will give you thread safe iteration. ConcurrentModificationException occurs because you are modifying the List at the same time as iterating over it. A CopyOnWriteArrayList copies the internal array on write so modifications do not interfere with iteration. This adds some additional overhead to mutating the list of course, so you may want to think about another method of ensuring thread safety, however I suspect this will be sufficient for your needs. If reads are a lots more common than writes (as is usually the case) then a 'copy on write' data structure will do fine.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top