Overview
There were two errors in my code:
- The initialization code for a step should be in the step's initializer
- By default EMR uses Python 2.6 which precludes dictionary comprehensions among other things
Step Initialization
Every step has a corresponding initializer method. For example, mapper
has mapper_init
which can be used to initialize data used in the mapper. The functions reducer
and combiner
have similar initialization methods. If you use the steps
function to define your own steps then you can also define which initialization function you use. Read more about initializers here.
Beware the Python Version
As of today, EMR uses Python version 2.6.6 by default. So any dependencies on later versions may run locally but have problems on EMR.
The Fix
To repair the code above, it is necessary to remove the line defining zip_codes
in MR_zip.py
zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}
and instead define it inside of mapper_init
without using dictionary comprehensions.
def mapper_init(self):
self.zip_codes = {}
for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")):
self.zip_codes[int(zip_code)] = (float(latitude), float(longitude))
The other files and the command-line stay the same.