Question

Motivation for this question I am running a huge product that runs on very expensive hardware. Shutting it down for testing purpose is not possible, nor is putting up a bad jar on production environment. I need to be as sure as possible to almost ensure that I don't mess up the production environment.

I need the below code reviewed for obvious issues before I run this on the staging setup (which is as expensive).

Problem I have a socket based application, sometimes the clients dont send a CloseConnection request explicitly. And sometimes the IOException does not occur, there by holding up the threads on the blocking readObject call.

I need to close free this thread by closing the connection after a time out. If I get a new request from the server the timeout is refreshed.

So you will see 3 parts below

  • initializing
  • the readObject call in a while(true) loop, and the scheduled service reset
  • the actual closing of the instream

Code

I have been advised to use ScheduledExecutorService instead of Timer/TimerTask.

class StreamManager {
    ....
    private ScheduledExecutorService activityTimeOut = Executors
            .newSingleThreadScheduledExecutor();
    private CloseConnectionOnTimeOut closeOnTimeOut = new CloseConnectionOnTimeOut();
    ....

    public void initialize(Socket newClientSocket, ObjectInputStream newInputStream,
            ObjectOutputStream newOutputStream, ThreadMonitor newThreadMonitor) {
        ....
        closeOnTimeOut.setInputStream(myInputStream);
        activityTimeOut.scheduleAtFixedRate(closeOnTimeOut, 0, Globals.INACTIVITY_TIME_OUT,
                TimeUnit.MILLISECONDS);
    }

    public void run() {
    ....
    while (true) {
            try {
                AMessageStrategy incomingCommand = (AMessageStrategy) myInputStream
                        .readObject();
                activityTimeOut.shutdown();
                activityTimeOut.scheduleAtFixedRate(closeOnTimeOut, 0,
                        Globals.INACTIVITY_TIME_OUT, TimeUnit.MILLISECONDS);
        ....
     }
    ....
 }
 class CloseConnectionOnTimeOut implements Runnable {
        private ObjectInputStream myInputStream;

        public CloseConnectionOnTimeOut() {

        }

        public void setInputStream(ObjectInputStream myInputStream) {
            this.myInputStream = myInputStream;
        }

        public void run() {
            try {
                myInputStream.close();
                myOutputStream.close();
                clientSocket.close();
                log.info("Time out occured for client, closed connection forcefully.") ;
            } catch (IOException e) {
                e.printStackTrace();
                log.fatal("Time out has occured, yet unable to clean up client connection. Keep a watch out on \"Size of clientStreamQ\"");
            }
        }
    }

Edit : Just tested a smaller application, and it seems to work. I still need your feedback.

Edit Again :

I have modified the code below as per advice.

Initializing

private ScheduledExecutorService activityTimeOut = Executors
            .newSingleThreadScheduledExecutor();
    private Future<Void> timeoutTask ;
    private CloseConnectionOnTimeOut closeOnTimeOut = new CloseConnectionOnTimeOut(); 

Removed this code

closeOnTimeOut.setInputStream(myInputStream);
activityTimeOut.scheduleAtFixedRate(closeOnTimeOut, 0, Globals.INACTIVITY_TIME_OUT,
                TimeUnit.MILLISECONDS);

Replaced before and after readObject

timeoutTask = (Future<Void>) activityTimeOut.scheduleAtFixedRate(
                        closeOnTimeOut.setInputStream(myInputStream), 0,
                        Globals.INACTIVITY_TIME_OUT, TimeUnit.MILLISECONDS);
AMessageStrategy incomingCommand = (AMessageStrategy) myInputStream
                        .readObject();
timeoutTask.cancel(true) ;  

On Cleanup

activityTimeOut.shutdown() ;
Was it helpful?

Solution

You cannot submit tasks to an ExecutorService that was already shut down. If you want to stop a task executing, cancel it. Besides that, your cancel task will be scheduled to run as soon as the StreamManager is initialized - if there is a gap between initialize and run you could get into trouble. I would suggest to create and schedule a new task right before attempting to read from the socket, and cancel it after the read succeeded:

while (true) {
   ...
   Future<Void> timeoutTask = activityTimeOut.schedule(new CloseConnection(/*init with streams*/), Globals.INACTIVITY_TIME_OUT, TimeUnit.MILLISECONDS);
   try {
      AMessageStrategy incomingCommand = (AMessageStrategy) myInputStream.readObject();
   } finally {
      timeoutTask.cancel(false);
   }
   ...
}

In a clean up method of the StreamManager or at the end of run() you should shutdown the used ScheduledExecutorService.

If your software is mission critical I would thoroughly test it locally. Write unit tests and perhaps small integration tests to verify that the cancelling works. But I'm afraid that this solution is rather brittle. Multi-threading and IO add a lot of uncertainties.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top