
I'm missing something obvious about Yelp's mrjob job library. Setting up an MRJob class is almost trivially easy. Running it over a file or stdin also so. But how can I change the input to the job from a file either locally or in s3, to, say, keys in an s3 bucket?

Something like this. Suppose I wanted to count all objects in my S3 bucket that start with the string 'foo':

import re

class MRCountS3Objects(MRJob):

    define mapper(self, _, botoS3Key):
        if re.match('^foo',
            yield 'foo', 1

    define reduce(self, name, occurrences):
        yield name, sum(occurrences)

It's a highly contrived example, but you probably get my drift. How can I tell MRJob to operate over a stream of s3 objects, ignoring the content of the objects? I saw the S3Filesystem.get_s3_keys() method, which gets me exactly the stream I need, but I'm not sure where to go from there.

War es hilfreich?


Figured at least one way to accomplish this. Your MRJob has a stdin attribute that can be assigned to any iterator, and then you can run the job programmatically. This code, for example, should work over my-bucket's key names:

from mrjob.job import MRJob
from mrjob.emr import EMRJobRunner

class MRS3KeyProcessor(MRJob):
    # Do some MRJob stuff.

def s3_name_generator(bucket):
    """Generator that returns boto.s3.Key names.
    # Could also use raw boto here.
    emr = EMRJobRunner()
    key_stream = emr.fs.get_s3_keys(bucket)
    for key in key_stream:

def main():
    # The '-' argument signifies that we use stdin.
    mr_job = MRCountS3Objects(['--runner', 'inline', '-'])
    stdin = s3_name_generator('my-bucket')
    mr_job.stdin = stdin
    results = []
    with mr_job.make_runner() as runner:
        for line in runner.stream_output():
            key, value = mr_job.parse_output_line(line)
            results.append((key, value))

if __name__ == '__main__':
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top