Question

I have a python program that currently uses a tcp/ip client module I wrote to receive data from a streaming server. The server outputs lines of data.

My TCP client class is fairly primitive and I want to refactor to use a twisted ReconnectingClientFactory.

The main program currently gets data from a readLines function in my TCP Client that 'yields' the lines as they are received.

The TCP client method is accessed by:

for msg_buffer in self.myTcpClient.readLines():
    do some stuff with the data in msg_buffer

In my TCP Client the readLines method in essence looks like:

while True:
    newLine = self.sock.recv(self.buffer_size)
    yield newLine

When I implement the twisted client I'll want some way for it to behave as an iterator and yield data. I assume I'd do something in the protocol dataReceived method.

I'm lost trying to figure out how this works. It appears to me that twisted deferred are meant for this sort of use but I can't figure out how to use a deferred for my purpose (if my assumption about deferred is correct).

In a perfect world the twisted client would yield the lines as received so a call similar to the present method would do the job. i.e.

class GetData(protocol):
    def dataReceived(self, data):
        yield data

But I think that's an oversimplification.

In summary, what I'm trying to do is implement a twisted reconnecting TCP client that behaves something like my readLines method and can be accessed more or less like:

for msg_buffer in self.twistedTcpClient.readLines():

Any pointers will be much appreciated

UPDATE: I just stumbled across 'Crochet' for twisted. At first glance Crochet appears to have been designed for exactly the kind of model that I need... I'll report back after some testing

Was it helpful?

Solution

The Twisted way of doing this would be to write a Protocol. Instead of doing:

for line in self.twistedTcpClient.readLines():
    process_line(line) ...

You would write your Protocol (maybe by subclassing a twisted.protocols.basic.LineReceiver):

class MyProtocol(LineReceiver):
    ...
    def lineReceived(self, line):
        process_line(line) ...

You want to refactor your code to use the lineReceived callback rather than have an iterated loop.

What you have written:

for line in self.twistedTcpClient.readLines():
    process_line(line) ...

is problematic as Twisted is asynchronous. There is no way for Twisted to get around to doing anything else while waiting for twistedTcpClient.readLines() method.

I suggest writing a protocol, but if you really insist on having this iterator pattern, then you may be able to do this:

@inlineCallbacks
def my_func():
    while True:
        try:
            line = yield self.twistedTcpClient.getNextLine()
        except StopIteration:
            break

        process_line(line) ...

Now, the tricky thing is to make twistedTcpClient return Deferreds for each call to getNextLine(). Maybe something like this:

class MyProtocol(LineReceiver):
    ...
    def getNextLine(self):
        self.defer_given_out = Deferred()

    def lineReceived(self, line):
        self.defer_given_out.callback(line)

    def connectionLost(self):
        self.defer_given_out.errback(StopIteration())

(this is just an example that illustrates the idea, you'd have to extend it to handle the details.)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top