Pregunta

I'm executing multiple python processes like such:

find /path/to/logfiles/*.gz | xargs -n1 -P4 python logparser.py

and the output is occasionally scrambled.

The output stream is unbuffered and the size of the the write is smaller than the default system (osx 10.8.2, python 2.7.2) defined PIPE_BUF of 512 bytes, so i believe the writes should be atomic, but output is occasionally scrambled. I must be missing something and any suggestions would be appreciated.

Thanks.

A simplified skeleton of the script is:

import argparse
import csv
import gzip


class class UnbufferedWriter(object):
    """Unbuffered Writer from 
       http://mail.python.org/pipermail/tutor/2003-November/026645.html

    """

    def __init__(self, stream):
        self.stream = stream

    def write(self, data):
        self.stream.write(data)
        self.stream.flush()

    def __getattr__(self, attr):
        return getattr(self.stream, attr)


def parse_records(infile):
    if infile.name.endswith('.gz'):
        lines = gzip.GzipFile(fileobj=infile)
    else:
        lines = infile

    for line in lines:
        # match lines with regex and filter out on some conditions.
        yield line_as_dict

def main(infile, outfile):
    fields = ['remote_addr', 'time', 'request_time', 'request', 'status']
    writer = csv.DictWriter(outfile, fields, quoting=csv.QUOTE_ALL)

    for record in parse_records(infile):
        row_as_dict = dict(
            remote_addr=record.get('remote_addr', ''),
            time=record.get('time', ''),
            request_time=record.get('request_time', ''),
            request=record.get('request', ''),
            status=record.get('status', '')
        )
        writer.writerow(row_as_dict)

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin)
    parser.add_argument('outfile', nargs='?', type=argparse.FileType('w', 0), default=sys.stdout)

    pargs = parser.parse_args()
    pargs.outfile = UnbufferedWriter(pargs.outfile)

    main(pargs.infile, pargs.outfile)
¿Fue útil?

Solución

You might want to consider using GNU Parallel. By default, the output is buffered until the instance has completed running:

When running jobs that output data, you often do not want the output of multiple jobs to run together. GNU parallel defaults to grouping the output of each job, so the output is printed when the job finishes. If you want the output to be printed while the job is running you can use -u.

I believe the best way to run your script is vai:

find /path/to/logfiles/*.gz | parallel python logparser.py

or

parallel python logparser.py ::: /path/to/logfiles/*.gz

You can specify the number of processes to run using the -j flag, i.e., -j4.

The nice thing about Parallel is that is supports cartesian products of input arguments. For example, if you had some additional arguments that you wanted to iterate through for each file, you can use:

parallel python logparser.py ::: /path/to/logfiles/*.gz ::: 1 2 3

This will result in running the following across multiple processes:

python logparser.py /path/to/logfiles/A.gz 1
python logparser.py /path/to/logfiles/A.gz 2
python logparser.py /path/to/logfiles/A.gz 3
python logparser.py /path/to/logfiles/B.gz 1
python logparser.py /path/to/logfiles/B.gz 2
python logparser.py /path/to/logfiles/B.gz 3
...

Good luck!

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top