Question

I am trying to run a mrjob on Amazon's EMR. I've tested the job locally using the inline runner, but it fails when running on Amazon. I've narrowed the failure down to my dependence on an external data file zip_codes.txt. If I run without that dependency using hardcoded zip code data it works just fine.

I've tried to include the necessary data file using the upload file argument. When I look on S3, the file did make it there, but clearly something is going wrong so that I cannot access it locally.

enter image description here

Here is my mrjob.conf file:

runners:
  emr:
    aws_access_key_id: FOOBARBAZQUX
    aws_secret_access_key: IAMASECRETKEY
    aws_region: us-east-1
    ec2_key_pair: mapreduce
    ec2_key_pair_file: $ENV/keys/mapreduce.pem
    ssh_tunnel_to_job_tracker: true
    ssh_tunnel_is_open: true
    cleanup_on_failure: ALL
    cmdenv:
      TZ: America/Los_Angeles 

This is my MR_zip.py file.

from mrjob.job import MRJob
import mrjob
import csv

def distance(p1, p2):
    # d = ...    
    return d

class MR_zip(MRJob):
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol
    zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}

    def mapper(self, _, line):
        zip_code_1, poi = line.split(",")
        zip_code_1 = int(zip_code_1)
        lat1, lon1 = self.zip_codes[zip_code_1]
        for zip_code_2, (lat2, lon2) in self.zip_codes.items():
            d = distance((lat1, lon1), (lat2, lon2))
            yield zip_code_2, (zip_code_1, poi, d)

    def reducer(self, zip_code_1, ds):
        result = {}
        for zip_code_2, poi, d in ds:
            if poi not in result:
                result[poi] = (zip_code_2, d)
            elif result[poi][1] > d:
                result[poi] = (zip_code_2, d)
        yield zip_code_1, result

if __name__ == '__main__':
    MR_zip.run()

And finally, I run it with the following command:

python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt

Where zip_codes.txt looks like:

...
62323,39.817702,-90.66923
62324,39.988988,-90.94976
62325,40.034398,-91.16278
62326,40.421857,-90.80333
...

And poi.txt looks like:

...
210,skate park
501,theatre
29001,theatre
8001,knitting club
20101,food bank
...
Was it helpful?

Solution 2

Overview

There were two errors in my code:

  1. The initialization code for a step should be in the step's initializer
  2. By default EMR uses Python 2.6 which precludes dictionary comprehensions among other things

Step Initialization

Every step has a corresponding initializer method. For example, mapper has mapper_init which can be used to initialize data used in the mapper. The functions reducer and combiner have similar initialization methods. If you use the steps function to define your own steps then you can also define which initialization function you use. Read more about initializers here.

Beware the Python Version

As of today, EMR uses Python version 2.6.6 by default. So any dependencies on later versions may run locally but have problems on EMR.

The Fix

To repair the code above, it is necessary to remove the line defining zip_codes in MR_zip.py

zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}

and instead define it inside of mapper_init without using dictionary comprehensions.

def mapper_init(self):
    self.zip_codes = {}
    for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")):
        self.zip_codes[int(zip_code)] = (float(latitude), float(longitude))

The other files and the command-line stay the same.

OTHER TIPS

Also, you might find useful MRJob.add_file_option routine. For example, specifying

self.add_file_option('--config-file', dest='config_file', 
    default=None, help='file with labels', action="append")

you can reference uploaded files via self.options.config_file paths list.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top