Domanda

I am new to netty. I have to use static pipelines (because the project manager prefers it). And it is a little bit difficult, because I have to handle RTP and RTSP protocols on the same line.

Althought it almost works, but there is memory leak. I guess the fault in my splitter class. Moreover I think the error might be near bypass method (because the developers of netty - in order the avoide infinitive loop - do not allow to leave ByteBuf unchanged that is why I had to create the bypass method.)

If you have any idea, please help me! (Thanks in advance!)

Here is my code:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.ArrayList;
import java.util.List;

public class Splitter extends ByteToMessageDecoder {

    private ByteBuf bb = Unpooled.buffer();
    final RtspClientHandler rtspClientHandler;
    final RtpClientHandler rtpClientHandler;

    public Splitter(RtspClientHandler rtspClientHandler, RtpClientHandler rtpClientHandler) {
        this.rtspClientHandler = rtspClientHandler;
        this.rtpClientHandler = rtpClientHandler;
    }

    protected void bypass(ByteBuf in, MessageList<Object> out) {
        bb.writeBytes(in);
        in.discardReadBytes();
        bb.retain();
        out.add(bb);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
        if (rtspClientHandler.getRTSPstate() == RtspClientHandler.RTSP_CLIENT_STATE.READY) {
            if (in.getByte(0) == 0x24 && in.readableBytes() > 4) {
                int lengthToRead = in.getUnsignedShort(2);
                if (in.readableBytes() >= (lengthToRead + 4)) {
                    in.skipBytes(4);
                    if (in.getByte(16) == 0x67 || in.getByte(16) == 0x68) {
                        final byte bytes[] = new byte[lengthToRead];
                        in.readBytes(bytes);
                        in.discardReadBytes();
                        SPSPPSbuffer spspps = new SPSPPSbuffer();
                        spspps.setSPSPPS(bytes);
                        out.add(spspps);

                    } else {
                        final byte packetArray[] = new byte[lengthToRead];// copy packet.
                        in.readBytes(packetArray);
                        in.discardReadBytes();
                        out.add(packetArray);
                    }
                }
            } else {
                bypass(in, out);
            }
        } else {
            bypass(in, out);
        }
    }
}
È stato utile?

Soluzione

It seems I could manage to solve it.

The main thing is: I had to use a collector ByteBuf in which I collects all bytes coming from the net (I had to clear the input ByteBuf), because there are 4 cases possible:

  • The num. of bytes (in the collector) are less than the RTP chunk size

  • The num. of bytes (in the collector) are equals the RTP chunk size

  • The num. of bytes (in the collector) are bigger than the RTP chunk size

  • More than one chunks are in the collector ByteBuf.

Here is the code:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;

public class Splitter extends ByteToMessageDecoder {

private ByteBuf collector = Unpooled.buffer();
final RtspClientHandler rtspClientHandler;
final RtpClientHandler rtpClientHandler;

public Splitter(RtspClientHandler rtspClientHandler, RtpClientHandler rtpClientHandler) {
    this.rtspClientHandler = rtspClientHandler;
    this.rtpClientHandler = rtpClientHandler;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
    collector.writeBytes(in);
    in.discardReadBytes();
    in.clear();
    if (rtspClientHandler.getRTSPstate() != RtspClientHandler.RTSP_CLIENT_STATE.READY) {
        System.out.println("RTSP communication in progress");
        collector.retain();
        out.add(collector);
        return;
    }
    if (collector.readableBytes() > 0 && collector.getByte(0) != 0x24) {
        System.out.println("Clearing the Unpooled.buffer() (because it does not start with 0x24)");
        collector.readerIndex(collector.writerIndex());
        collector.discardReadBytes();
    }
    System.out.println("*****New bytes arrived");
    while (collector.readableBytes() > 0 && collector.getByte(0) == 0x24) {
        System.out.println("Length: " + collector.readableBytes());
        if (collector.readableBytes() > 4) {
            int lengthToRead = collector.getUnsignedShort(2);
            if (collector.readableBytes() >= (lengthToRead + 4)) {
                collector.skipBytes(4);
                if (collector.getByte(16) == 0x67 || collector.getByte(16) == 0x68) {
                    final byte bytes[] = new byte[lengthToRead];
                    collector.readBytes(bytes);
                    collector.discardReadBytes();
                    SPSPPSbuffer spspps = new SPSPPSbuffer();
                    spspps.setSPSPPS(bytes);
                    out.add(spspps);

                } else {
                    final byte packetArray[] = new byte[lengthToRead];// copy packet.
                    collector.readBytes(packetArray);
                    collector.discardReadBytes();
                    out.add(packetArray);
                }
            } else {
                System.out.println("Not enough length, " + (lengthToRead + 4) + " byte should be required (together with 4 bytes header)");
                return;
            }
        } else {
            System.out.println("Less than 5 bytes");
            return;
        }
    }
}

}

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top