Question

I have three commands that would otherwise be easily chained together on the command-line like so:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput

In other words, the firstCommand processes foo from standard input and pipes the result to secondCommand, which in turn processes that input and pipes its output to thirdCommand, which does processing and redirects its output to the file finalOutput.

I have been trying to recapitulate this in a Python script, using threading. I'd like to use Python in order to manipulate the output from firstCommand before passing it to secondCommand, and again between secondCommand and thirdCommand.

Here's an excerpt of code that does not seem to work:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.communicate()
second_process.communicate()
third_process.communicate()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

When I run this, the script hangs:

$ echo foo | ./myConversionScript.py
** hangs here... **

If I hit Ctrl-C to terminate the script, the code is stuck on the line third_thread.join():

  C-c C-c
Traceback (most recent call last):
  File "./myConversionScript.py", line 786, in <module>
    sys.exit(main(*sys.argv))
  File "./myConversionScript.py", line 556, in main
    third_thread.join()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
    self.__block.wait()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
    waiter.acquire()
KeyboardInterrupt

If I don't use a third_process and third_thread, instead only passing data from the output of the first thread to the input of the second thread, there is no hang.

Something about the third thread seems to cause things to break, but I don't know why.

I thought the point of communicate() is that it will handle I/O for the three processes, so I'm not sure why there is an I/O hang.

How do I get three or more commands/processes working together, where one thread consumes the output of another thread/process?

UPDATE

Okay, I made some changes that seem to help, based on some comments here and on other sites. The processes are made to wait() for completion, and within the thread methods, I close() the pipes once the thread has processed all the data that it can. My concern is that memory usage will be very high for large datasets, but at least things are working:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.wait()
second_process.wait()
third_process.wait()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()
Was it helpful?

Solution

To emulate:

echo foo |
firstCommand - | somePythonRoutine - |
secondCommand - | anotherPythonRoutine - |
thirdCommand - > finalOutput

your current approach with threads works:

from subprocess import Popen, PIPE

first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
bind(first.stdout, second.stdin, somePythonRoutine)
with open("finalOutput", "wb") as file:
    third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1)
bind(second.stdout, third.stdin, anotherPythonRoutine)

# provide input for the pipeline
first.stdin.write(b"foo")
first.stdin.close()

# wait for it to complete
pipestatus = [p.wait() for p in [first, second, third]]

where each bind() starts a new thread:

from threading import Thread

def bind(input_pipe, output_pipe, line_filter):
    def f():
        try:
            for line in iter(input_pipe.readline, b''):
                line = line_filter(line)
                if line:
                    output_pipe.write(line) # no flush unless newline present
        finally:
            try:
                output_pipe.close()
            finally:
                input_pipe.close()
    t = Thread(target=f)
    t.daemon = True # die if the program exits
    t.start()

and somePythonRoutine, anotherPythonRoutine accept a single line and return it (possibly modified).

OTHER TIPS

The point of communicate() is that it returns the output of the process. This collides with your pipe setup.

You should only call it once on the third process; all the other ones are connected via pipes and know how to communicate with each other - no other / manual intervention is necessary.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top