Question

I have a JSON-RPC service which for one of the requests returns a continuous stream of JSON objects.

I.e. :

{id:'1'}
{id:'2'}
//30 minutes of no data
{id:'3'}
//...

Of course, there's no Content-Length because the stream is endless.

I'm using custom TStream descendant to receive and parse the data. But internally TIdHttp buffers the data and does not pass it to me until RecvBufferSize bytes are received.

This results in:

{id:'1'} //received
{id:'2'} //buffered by Indy but not received
//30 minutes of no data
{id:'3'} //this is where Indy commits {id:'2'} to me

Obviously this won't do because the message which mattered 30 minutes ago should have been delivered 30 minutes ago.

I'd like Indy to do just what sockets do: read up to RecvBufferSize or less if there's data available and return immediately.

I've found this discussion from 2005 where some poor soul tried to explain the problem to Indy developers but they didn't understand him. (Read it; it's a sad sight)

Anyway, he worked around this by writing custom IOHandler descendant, but that was back in 2005, maybe there are some ready solutions today?

Was it helpful?

Solution 3

While using TCP stream was an option, in the end I went with original solution of writing custom TIdIOHandlerStack descendant.

The motivation was that with TIdHTTP I know what doesn't work and only need to fix that, while switching to lower level TCP means new problems can arise.

Here's the code that I'm using, and I'm going to discuss the key points here.

New TIdStreamIoHandler has to inherit from TIdIOHandlerStack.

Two functions need to be rewritten: ReadBytes and ReadStream:

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer;
  AAppend: Boolean = True): integer; virtual;
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1;
  AReadUntilDisconnect: Boolean = False); override;

Both are modified Indy functions which can be found in IdIOHandler.TIdIOHandler. In ReadBytes the while clause has to be replaced with a singe ReadFromSource() request, so that TryReadBytes returns after reading up to AByteCount bytes in one go.

Based on this, ReadStream has to handle all combinations of AByteCount (>0, <0) and ReadUntilDisconnect (true, false) to cyclically read and then write to stream chunks of data arriving from the socket.

Note that ReadStream need not terminate prematurely even in this stream version if only part of the requested data is available in the socket. It just has to write that part to the stream instantly instead of caching it in FInputBuffer, then block and wait for the next part of data.

OTHER TIPS

Sounds to me like a WebSocket task, since your connection is not plain HTTP question/answer oriented any more, but a stream of content.

See WebSocket server implementations for Delphi for some code.

There is at least one based on Indy, from the author of AsmProfiler.

AFAIK there are two kind of stream in websockets: binary and text. I suspect your JSON stream is some text content, from the websocket point of view.

Another option is to use long-pooling or some older protocols, which are more rooter-friendly - when the connection switch to websockets mode, it is no standard HTTP any more, so some "sensible" packet-inspection tools (on a corporate network) may identify it as a security attack (e.g. DoS), so may stop the connection.

You do not need to write a IOHandler descendant, it is already possible with the TIdTCPClient class. It exposes a TIdIOHandler object, which has methods to read from the socket. These ReadXXX methods block until the requested data has been read or a timeout occurs. As long as the connection exists, ReadXXX can be executed in a loop and whenever it receives a new JSON object, pass it to the application logic.

Your example looks like all JSON objects only have one line. JSON objects however could be multi-lined, in this case the client code needs to know how they are separated.


Update: in a similar Stackoverflow question (for .Net) for a 'streaming' HTTP JSON web service, the most upvoted solution used a lower-level TCP client instead of a HTTP client: Reading data from an open HTTP stream

There is actually a length data right before the content of packet which transferred in chunked encoding transfer mode. Using this length data, IOhandler of idhttp read one packet by one packet to stream. The minimum meaningful unit is a packet, So there should be no need to read characters one by one from a packet and then no need to change the functions of IOHandler. The only problem is idhttp wouldn't stop an turn the stream data to next step because of the endless of stream data: there is no ending packet. So the solution is using idhttp onwork event to trigger a reading from stream and setting the stream position to zero in order to avoid overflow .like this:

    //add a event handler to idhttp    
    IdHTTP.OnWork :=  IdHTTPWork;


    procedure  TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    begin
         .....
         ResponseStringStream.Position :=0; 
         s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten
         ResponseStringStream.Clear;
         ... 
    end;

procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject);
var
 begin
    .....    
    source := RatesWorker.RatesURL+'EUR_USD';  
    RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream);  
 end;

Yet use a custom write() function of Tstream may be a better solution for this kind of requirement.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top