I have a piece of code which most of the time skips packets when many packets arrive at almost the same time from the same source. The packets are built so that, a packet size field is attached at the beginning which has the size of the packet after it, in bytes.

The TCP client thread is ran with 10ms interval.

Thread Library

unit AgThread11;

interface

uses
  SysUtils, Classes, Windows, Rtti;

type

  TAgThreadMethod1 = procedure of object;
  TAgThreadMethod2 = procedure;

  TAgThread = class ( TThread )

  private
    fInterval       : Cardinal;
    fTerminateEvent : THandle;
    fRun            : Boolean;
    fOnRun1         : TAgThreadMethod1;
    fOnRun2         : TAgThreadMethod2;

  protected
    procedure Execute; override;

  public
    constructor Create
                ( const AOnRun: TAgThreadMethod1; const AInterval: Cardinal; 
                  const ARun: Boolean = True ); overload;
    constructor Create
                ( const AOnRun: TAgThreadMethod2; const AInterval: Cardinal; 
                  const ARun: Boolean = True ); overload;

    destructor  Destroy; override;
    procedure Signal;

    property  Run      : Boolean          read fRun       write fRun;
    property  Interval : Cardinal         read fInterval  write fInterval;
    property  OnRun1   : TAgThreadMethod1 read fOnRun1    write fOnRun1;
    property  OnRun2   : TAgThreadMethod2 read fOnRun2    write fOnRun2;

  end;

implementation

constructor TAgThread.Create 
            ( const AOnRun: TAgThreadMethod1; const AInterval: Cardinal; 
              const ARun: Boolean = True );
begin

  fTerminateEvent := CreateEvent ( nil, TRUE, FALSE, nil );
  fInterval := AInterval;
  fRun      := ARun;
  fOnRun1   := AOnRun;
  inherited Create ( False );

end;

constructor TAgThread.Create 
            ( const AOnRun: TAgThreadMethod2; const AInterval: Cardinal;  
              const ARun: Boolean = True );
begin

  fTerminateEvent := CreateEvent ( nil, TRUE, FALSE, nil );
  fInterval := AInterval;
  fRun      := ARun;
  fOnRun2   := AOnRun;
  inherited Create ( False );

end;

destructor  TAgThread.Destroy;
begin

  Terminate;
  Signal;
  WaitFor;
  inherited;

end;

procedure TAgThread.Signal;
begin
  SetEvent ( FTerminateEvent );
end;

procedure TAgThread.Execute;
begin

  while not Terminated do
  begin

    if fRun then
      if      Assigned ( fOnRun1 ) then fOnRun1
      else if Assigned ( fOnRun2 ) then fOnRun2;
    WaitForSingleObject ( FTerminateEvent, fInterval );

  end;

end;

end.

TCP Thread Code

procedure TForm1.THEX_TCP;
var
  Buffer  : TBytes;
  MsgSize : Integer;
begin

  if TCPClient.IOHandler.CheckForDataOnSource then
  begin

    while TCPClient.IOHandler.InputBuffer.Size >= 4 do
    begin

      fRXCount := fRXCount + 1;
      TCPClient.IOHandler.InputBuffer.ExtractToBytes ( Buffer, 4 );
      Move ( Buffer [0], MsgSize, 4 );
      TCPClient.IOHandler.InputBuffer.ExtractToBytes ( Buffer, MsgSize, False );
      NAT.RecievedNATData ( Buffer ); // Packet Processor

    end;

  end;

end;

What should I do to ensure zero packet loss?

有帮助吗?

解决方案

There are two major problems with your TCP reading code:

  1. You are not ensuring that the InputBuffer actually has MsgSize number of bytes available before calling ExtractToBytes() the second time. If you try to extract more bytes than are actually in the buffer, ExtractToBytes() raises an exception.

  2. more importantly, you are not resizing your Buffer variable back to 0 before each call to ExtractToBytes(). After the first call in the first loop iteration, the length of the Buffer is 4 bytes. If that message is less than 4 bytes in size, you are leaving behind random bytes at the end of your Buffer that are being passed on to your parser and likely corrupts its logic. But worse, if there is another message size in the buffer, your next loop iteration makes a 3rd call to ExtractToBytes() and appends those 4 bytes to the end of the existing Buffer content, not replaces the content like you are assuming (the AAppend parameter of ExtractToBytes() is True by default). Thus, you end up copying 4 bytes from the previous message data into your MsgSize variable instead of the new 4 bytes you just extracted, so you are using a corrupted MsgSize value on the next ExtractToBytes() call.

Because your packets are length-prefixed, you do not need to use CheckForDataOnSource() or access the InputBuffer directly at all. Use the following code and let Indy do the work for you:

procedure TForm1.THEX_TCP;
var
  Buffer  : TBytes;
  MsgSize : Integer;
begin
  MsgSize := TCPClient.IOHandler.ReadLongInt;
  TCPClient.IOHandler.ReadBytes(Buffer, MsgSize);
  Inc(fRXCount);
  NAT.RecievedNATData(Buffer);
end;

By default, that will block the caller until data is available to read. If THEX_TCP needs to exit when there is no data ready to be read, use something like this instead:

procedure TForm1.THEX_TCP;
var
  Buffer  : TBytes;
  MsgSize : Integer;
begin
  if TCPClient.IOHandler.InputBufferIsEmpty then
  begin
    TCPClient.IOHandler.CheckForDataOnSource;
    TCPClient.IOHandler.CheckForDisconnect;
    if TCPClient.IOHandler.InputBufferIsEmpty then Exit;
  end;

  repeat
    MsgSize := TCPClient.IOHandler.ReadLongInt;
    TCPClient.IOHandler.ReadBytes(Buffer, MsgSize);
    Inc(fRXCount);
    NAT.RecievedNATData(Buffer);
    SetLength(Buffer, 0);
  until TCPClient.IOHandler.InputBufferIsEmpty;
end;

The only gotcha with this approach is that ReadLongInt() and ReadBytes() might read more bytes into the InputBuffer, so your loop could potentially running for a long time if a lot of data is being sent in a short amount of time. If you absolutely must read only one buffer at a time and only process complete messages then use something like this:

procedure TForm1.THEX_TCP;
var
  MsgSizeBuffer: array[0..3] of Byte;
  MsgSize, I : Integer;
  Buffer : TBytes;
begin
  TCPClient.IOHandler.CheckForDataOnSource;
  TCPClient.IOHandler.CheckForDisconnect;

  while TCPClient.IOHandler.InputBuffer.Size >= 4 do
  begin
    // unfortunately, TIdBuffer does not have a way to peek
    // multiple bytes at a time without removing them
    for I := 0 to 3 do
      MsgSizeBuffer[I] := TCPClient.IOHandler.InputBuffer.Peek(I);
    Move(MsgSizeBuffer[0], MsgSize, 4);
    MsgSize := LongInt(GStack.NetworkToHost(LongWord(MsgSize)));

    if TCPClient.IOHandler.InputBuffer.Size < (4+MsgSize) then
      Break;

    TCPClient.IOHandler.InputBuffer.Remove(4);
    TCPClient.IOHandler.InputBuffer.ExtractToBytes(Buffer, MsgSize);
    Inc(fRXCount);
    NAT.RecievedNATData(Buffer);
    SetLength(Buffer, 0);
  end;
end;
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top