Checking to see if there is more data to read from a file descriptor using Python's select module

StackOverflow https://stackoverflow.com/questions/23677526

Question

I have a program that creates a subprocess within a thread, so that the thread can be constantly checking for specific output conditions (from either stdout or stderr), and call the appropriate callbacks, while the rest of the program continues. Here is a pared-down version of that code:

import select
import subprocess
import threading

def run_task():
    command = ['python', 'a-script-that-outputs-lines.py']
    proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
    while True:

        ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)

        if proc.stdout in ready:
            next_line_to_process = proc.stdout.readline()
            # process the output

        if proc.stderr in ready:
            next_line_to_process = proc.stderr.readline()
            # process the output

        if not ready and proc.poll() is not None:
            break

thread = threading.Thread(target = run_task)
thread.run()

It works reasonably well, but I would like the thread to exit once two conditions are met: the running child process has finished, and all of the data in stdout and stderr has been processed.

The difficulty I have is that if my last condition is as it is above (if not ready and proc.poll() is not None), then the thread never exits, because once stdout and stderr's file descriptors are marked as ready, they never become unready (even after all of the data has been read from them, and read() would hang or readline() would return an empty string).

If I change that condition to just if proc.poll() is not None, then the loop exists when the program exits, and I can't guarantee that it's seen all of the data that needs to be processed.

Is this just the wrong approach, or is there a way to reliably determine when you've read all of the data that will ever be written to a file descriptor? Or is this an issue specific to trying to read from the stderr/stdout of a subprocess?

I have been trying this on Python 2.5 (running on OS X) and also tried select.poll() and select.epoll()-based variants on Python 2.6 (running on Debian with a 2.6 kernel).

Was it helpful?

Solution

select module is appropriate if you want to find out whether you can read from a pipe without blocking.

To make sure that you've read all data, use a simpler condition if proc.poll() is not None: break and call rest = [pipe.read() for pipe in [p.stdout, p.stderr]] after the loop.

It is unlikely that a subprocess closes its stdout/stderr before its shutdown therefore you could skip the logic that handles EOF for simplicity.


Don't call Thread.run() directly, use Thread.start() instead. You probably don't need the separate thread here at all.

Don't call p.stdout.readline() after the select(), it may block, use os.read(p.stdout.fileno(), limit) instead. Empty bytestring indicates EOF for the corresponding pipe.


As an alternative or in addition to you could make the pipes non-blocking using fcntl module:

import os
from fcntl import fcntl, F_GETFL, F_SETFL

def make_nonblocking(fd):
    return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK)

and handle io/os errors while reading.

OTHER TIPS

My eventual solution, as I mentioned above, was the following, in case this is helpful to anyone. I think it is the right approach, since I'm now 97.2% sure you can't do this with just select()/poll() and read():

import select
import subprocess
import threading

def run_task():
    command = ['python', 'a-script-that-outputs-lines.py']
    proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
    while True:

        ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)

        if proc.stdout in ready:
            next_line_to_process = proc.stdout.readline()
            if next_line_to_process:
                # process the output
            elif proc.returncode is not None:
                # The program has exited, and we have read everything written to stdout
                ready = filter(lambda x: x is not proc.stdout, ready)

        if proc.stderr in ready:
            next_line_to_process = proc.stderr.readline()
            if next_line_to_process:
                # process the output
            elif proc.returncode is not None:
                # The program has exited, and we have read everything written to stderr
                ready = filter(lambda x: x is not proc.stderr, ready)

        if proc.poll() is not None and not ready:
            break

thread = threading.Thread(target = run_task)
thread.run()

You could do a raw os.read(fd, size) on the pipe's file descriptor instead of using readline(). This is a non-blocking operation which can also detect EOF (in that case it returns an empty string or byte object). You'd have to implement the line splitting and buffering yourself. Use something like this:

class NonblockingReader():
  def __init__(self, pipe):
    self.fd = pipe.fileno()
    self.buffer = ""

  def readlines(self):
    data = os.read(self.fd, 2048)
    if not data:
      return None

    self.buffer += data
    if os.linesep in self.buffer:
      lines = self.buffer.split(os.linesep)
      self.buffer = lines[-1]
      return lines[:-1]
    else:
      return []
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top