Figured at least one way to accomplish this. Your MRJob has a stdin
attribute that can be assigned to any iterator, and then you can run the job programmatically. This code, for example, should work over my-bucket
's key names:
from mrjob.job import MRJob
from mrjob.emr import EMRJobRunner
class MRS3KeyProcessor(MRJob):
# Do some MRJob stuff.
...
def s3_name_generator(bucket):
"""Generator that returns boto.s3.Key names.
"""
# Could also use raw boto here.
emr = EMRJobRunner()
key_stream = emr.fs.get_s3_keys(bucket)
for key in key_stream:
yield key.name
def main():
# The '-' argument signifies that we use stdin.
mr_job = MRCountS3Objects(['--runner', 'inline', '-'])
stdin = s3_name_generator('my-bucket')
mr_job.stdin = stdin
results = []
with mr_job.make_runner() as runner:
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
results.append((key, value))
print(results)
if __name__ == '__main__':
main()