Question

As part of a coursework project, i am trying to implement a layer of reliability over a basic UDP protocol in java using Selective Repeat : http://en.wikipedia.org/wiki/Selective_Repeat_ARQ. Basically, each packet - when sent- keeps track of its own timer in a separate thread. If any specific timer runs out, the packet is resent.

For relatively large timeout settings (e.g 500ms) this code executes fine, and the large file is completely sent to the receiver. However, with a lower timeout set (e.g 20ms) I get the following error spamming the terminal:

java.nio.channels.ClosedChannelException
at sun.nio.ch.DatagramChannelImpl.ensureOpen(DatagramChannelImpl.java:132)
at sun.nio.ch.DatagramChannelImpl.send(DatagramChannelImpl.java:241)
at Sender4.sendPak(Sender4.java:118)
at Sender4.access$000(Sender4.java:8)
at Sender4$packetTimer.run(Sender4.java:135)

From what i can see however, the channel is not closed. The documentation for this exception states:

Checked exception thrown when an attempt is made to invoke or complete an I/O operation upon channel that is closed, or at least closed to that operation. That this exception is thrown does not necessarily imply that the channel is completely closed. A socket channel whose write half has been shut down, for example, may still be open for reading.

Which makes me think it is perhaps closed because it is unavailable in some way. Since it only occurs at smaller timeout values, maybe this is because two threads are attempting to resend simultaneously? However, the method for sending (sendPak) is synchronized...so this should not be possible.

What is causing this problem? Or what is a fix i can use to avoid running into this problem? Here is the code for the Sender part of my program, i am fairly sure the Receiver is fine:

/* Craig Innes 0929508 */
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.nio.channels.*;

public class Sender4 {
    short base = 0;
    short nextSeqNum = 0;
    byte[][] packets;
    ByteBuffer bb;
    String endSys;
    int portNum;
    String fileName;
    int retryTime;
    int windowSize;
    DatagramSocket clientSocket;
    InetAddress IPAddress;
    InetSocketAddress destination;
    boolean timedOut = false;
    int resends = 0;
    HashMap<Short, packetTimer> timers = new HashMap<Short, packetTimer>();
    DatagramChannel clientChannel;

    public Sender4(String endSys, int portNum, String fileName, int retryTime, int windowSize){
        this.endSys = endSys;
        this.portNum = portNum;
        this.fileName = fileName;
        this.retryTime = retryTime;
        this.windowSize = windowSize;
    }

    public static void main(String args[]) throws Exception{
        //Check for current arguments and assign them
        if(args.length != 5){
            System.out.println("Invalid number of arguments. Please specify: <endSystem> <portNumber> <fileName> <retryTimeout><windowSize>");
            System.exit(1);
        }

        Sender4 sendy = new Sender4(args[0], Integer.parseInt(args[1]), args[2], Integer.parseInt(args[3]), Integer.parseInt(args[4]));

        sendy.go();
    }

    private void go() throws Exception{

        clientChannel = DatagramChannel.open();
        clientChannel.configureBlocking(false);


        bb = ByteBuffer.allocate(2);
        byte[] picData = new byte[1021];
        byte[] sendData = new byte[1024];
        byte[] seqBytes = new byte[2];
        byte EOFFlag = 0;
        boolean acknowledged = false;
        int resends = 0;
        IPAddress = InetAddress.getByName(endSys);
        destination = new InetSocketAddress(IPAddress, portNum);

        FileInputStream imReader = new FileInputStream(new File(fileName));
        double fileSizeKb = imReader.available() / 1021.0; //We add 3 bytes to every packet, so dividing by 1021 will give us total kb sent. 
        int packetsNeeded = (int) Math.ceil(fileSizeKb);
        packets = new byte[packetsNeeded][];
        long startTime = System.currentTimeMillis();
        long endTime;
        double throughput;

        //Create array of packets to send
        for(int i = 0; i < packets.length; i++){
            if(i == packets.length - 1){
                EOFFlag = 1;
                picData = new byte[imReader.available()];
                sendData = new byte[picData.length + 3];
            }
            imReader.read(picData);
            bb.putShort((short)i);
            bb.flip();
            seqBytes = bb.array();
            bb.clear();
            System.arraycopy(seqBytes, 0, sendData, 0, seqBytes.length);
            sendData[2] = EOFFlag;
            System.arraycopy(picData, 0, sendData, 3, picData.length);
            packets[i] = (byte[])sendData.clone();
        }

        //System.out.println("timeout is: " + timedOut + " base is: " + base + " packet length is: " + packets.length + " nextSeqNum: " + nextSeqNum);

        while(base != packets.length || !timers.isEmpty()){

            while(nextSeqNum - base < windowSize && nextSeqNum < packets.length){
                System.out.println("sending packet with seqNum: " + nextSeqNum);
                sendPak(nextSeqNum);
                timers.put(nextSeqNum, new packetTimer(nextSeqNum));
                timers.get(nextSeqNum).start();
                System.out.println("nextSeq: " + nextSeqNum + "base " + base + "windowSize " + windowSize + "timer size" + timers.size());
                nextSeqNum++;
            }           

            //Done all the sending we can, have a check for any ACKs we have received...
            getACK();

        }

        endTime = System.currentTimeMillis();
        throughput = 1000 * fileSizeKb / (endTime - startTime);
        clientChannel.close();
        imReader.close();
        System.out.println("Number of retransmissions: " + resends);
        System.out.println("Average throughput is: " + throughput + "Kb/s");

    }

    private synchronized void sendPak(short resNum) throws IOException{
        //System.out.println("Timed out waiting for acknowledgement, resending all unACKed packets in window");
            ByteBuffer sendBuff = ByteBuffer.wrap(packets[resNum]);
            clientChannel.send(sendBuff, destination);
            sendBuff.clear();
    }

    private class packetTimer extends Thread{
        short sendingNum;
        boolean timeToStop = false;
        boolean fileACKed = false;
        public packetTimer(short seqNum){
            sendingNum = seqNum;
        }
        public void run() {
            //If packet times out - resend. If thread interrupted, we have received the corresponding ack
                while(waitForACK()){
                    System.out.println("Packet timed out. Resending packet: " + sendingNum);

                    try{
                        sendPak(sendingNum);
                    }catch(IOException ex){
                        System.out.println("I think this is causing the problems");
                        ex.printStackTrace();
                    }
                }

            System.out.println("Thread" + sendingNum + "has reached completion");
        }

        private boolean waitForACK(){
            if(this.interrupted()){
                return false;
            }
            try{
                Thread.sleep(retryTime);
            }catch(InterruptedException ex){
                return false;
            }
            return true;
        }
    }


    private synchronized void getACK() throws Exception{
        //Listen out for ACKs and update pointers accordingly
        ByteBuffer ackBuff;
        byte[] ackData = new byte[2];
        ackBuff = ByteBuffer.wrap(ackData);
        SocketAddress recked = clientChannel.receive(ackBuff);
        if(recked != null){ //Only if it actually receives anything, check for nullity
            //System.out.println("ACK buff size: " + ackBuff.capacity() + "Current position: " + ackBuff.position() + "remaining: " + ackBuff.remaining());
            ackBuff.flip();
            short ack = ackBuff.getShort();
            System.out.println("ack received: " + ack);
            ackBuff.clear();

            if(timers.containsKey(ack)){ //Stop Timer
                System.out.println("Interrupting timer: " + ack);
                timers.get(ack).interrupt();
                timers.get(ack).fileACKed = true;
            }

            if(base == ack){ //If you receive ack for the base, remove all the consecutively stopped timers
                while(timers.containsKey(base) && timers.get(base).fileACKed){
                    System.out.println("Removing: " + base);
                    timers.remove(base);
                    base++;
                }
            }
            //System.out.println("acknowledgement for base num: " + base + "ack num:" + ack);
        }

        System.out.println("Waiting for base: " + base + "packets length is " + packets.length + "timers size is: " + timers.size() + "but is it empty? " + timers.isEmpty());
        Thread.yield();
    }
}

No correct solution

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top