Question

I'm having trouble with some CPU-intensive tasks within an asyncio event loop. The troubles I have arise when dealing with maintaining a buffer of incoming data and building packets from it. I've tried using executors to do the CPU bound stuff, but then had trouble maintaining the ordering of the buffer when packets were removed from it.

I'm looking for a best-practice method to achieve the following functionality without having the CPU-bound tasks executed within the event loop.

import asyncio
import struct

class Reader(asyncio.Protocol):
    def __init__(self):
        self.extra = bytearray()

    def data_received(self, data):
        self.extra.extend(data)
        packet = get_packet(bytes(self.extra))
        if packet:
            del self.extra[:len(packet)]
            if verify_hash(packet):  # CPU intensive
                asyncio.async(distribute(packet))  # Some asyncio fan-out callback


def get_packet(data):  # CPU intensive
    if len(data) > HEADER_SIZE:
        payload_size, = struct.unpack_from(HEADER_FORMAT, data)
        if len(data) >= HEADER_SIZE + payload_size:
            return data[:HEADER_SIZE + payload_size]
    return None

loop = asyncio.get_event_loop()
loop.run_until_complete(loop.create_server(Reader, '0.0.0.0', 8000))
loop.run_forever()
Was it helpful?

Solution

You want to be able to handle all the data coming into Reader as quickly as possible, but you also can't have multiple threads/processes try to process that data in parallel; that's how you ran into race conditions using executors before. Instead, you should start one worker process that can handle processing all the packet data, one at a time, using a multiprocessing.Queue to pass the data from the parent to the worker. Then, when the worker has a valid packet built, verified, and ready to be distributed, it uses another multiprocessing.Queue to send it back to a thread in the parent process, which can use the thread-safe call_soon_threadsafe method to schedule distribute to run.

Here's an untested example that should give you an idea of how to do this:

import asyncio
import struct
from concurrent.futures.ProcessPoolExecutor
import threading


def handle_result_packets():
    """ A function for handling packets to be distributed.

    This function runs in a worker thread in the main process.

    """
    while True:
        packet = result_queue.get()
        loop.call_soon_threadsafe(asyncio.async, distribute(packet))

def get_packet():  # CPU intensive
    """ Handles processing all incoming packet data.

    This function runs in a separate process.

    """
    extra = bytearray()
    while True:
        data = data_queue.get()
        extra.extend(data)
        if len(data) > HEADER_SIZE:
            payload_size, = struct.unpack_from(HEADER_FORMAT, data)
            if len(data) >= HEADER_SIZE + payload_size:
                packet = data[:HEADER_SIZE + payload_size]
                del extra[:len(packet)]
                if verify_hash(packet):
                    result_queue.put(packet)


class Reader(asyncio.Protocol):
    def __init__(self):
        self.extra = bytearray()
        self.t = threading.Thread(target=handle_result_packets)
        self.t.start()

    def data_received(self, data):
        data_queue.put(data)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    data_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=get_packet)
    p.start()
    loop.run_until_complete(loop.create_server(Reader, '0.0.0.0', 8000))
    loop.run_forever()

OTHER TIPS

I would try to wrap up the whole packet handling logic, and split the heavy task into pieces. Using MD5 hash for example:

@asyncio.coroutine
def verify_hash(packet):
    m = hashlib.md5()
    for i in range(len(packet) // 4096 + 1):
        yield m.update(packet[i:i+4096])
    return m.digest() == signature


@asyncio.coroutine
def handle_packet(packet):
    verified = yield from verify_hash(packet)
    if verified:
        yield from distribute(packet)


class Reader(asyncio.Protocol):
    def __init__(self):
        self.extra = bytearray()

    def data_received(self, data):
        self.extra.extend(data)
        packet = get_packet(bytes(self.extra))
        if packet:
            del self.extra[:len(packet)]
            asyncio.async(handle_packet(packet))

Note that the packets can come in much faster than Reader can handle, so make sure to monitor system load & stop receiving while needed. But that's another story :)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top