Question

I am using MRjob to run Hadoop Streaming jobs over our HBase instance. For the life of me I cannot figure out how to pass a parameter to my reducer. I have two parameters that I want to pass to my reducer from when I run the job: startDate and endDate. Here's what my current reducer looks like:

def reducer(self, groupId, meterList):
    """
    Print bucket.
    """
    sys.stderr.write("Working on group = " + str(groupId) + "\n")
    #print "Opening connection..."
    conn = open_connection(hostname)
    #print "Getting table..."
    table = get_table(conn, tableName)

    compositeDf = DataFrame()

    for meterId in meterList:
        sys.stderr.write("Querying: " + str(meterId) + "\n")
        df = extract_meter_data(table, meterId, startDate, endDate)

I cannot seem to pass startDate and endDate as parameters to my reducer. The only way I can get the job to pick up the parameters is through a global variable at the top of the class.

startDate = datetime.datetime(2012, 6, 10)
endDate = datetime.datetime(2012, 6, 11)

class MRDataQuality(MRJob):
    """
    MapReduce job that does a data quality check on the meter data in HBase.
    """

But that is dirty. I want to pass it in from calling the job. I've tried many methods. Setting it as an instance variable, setting it as a static class variable, creating an overloaded constructor for MRDataQualityJob.... nothing seems to work. I am calling it from my top-level script programmatically like so:

if args.hadoop:
    mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
    mrdq_job = MRDataQuality(args=[meterFile])

with mrdq_job.make_runner() as runner:
    runner.run()

No matter what I do to the mrdq_job instance it seems like the runner.run() is using a fresh new instance of the class which doesn't have the instance or static variables defined. How can i pass my parameters to the reducer???? I can do it in regular Hadoop Streaming by passing a string: "--reducer reducer.py arg1 arg2". Is there any equivalent for MRjob?

Was it helpful?

Solution

How about passing your parameters to job config and then reading them with get_jobconf_value?

Something like this:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer(self, groupId, meterList):
    ...
    startDate = get_jobconf_value("my.job.settings.startdate")
    endDate = get_jobconf_value("my.job.settings.enddate")

    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, startDate, endDate)    

And then set the parameters in code like you did above

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])

OTHER TIPS

How about passing your parameters to job config and then reading them with get_jobconf_value inside of the reducer_init? This way you only have to read the parameters in once.

Something like this:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer_init(self):
    ...
    self.startDate = get_jobconf_value("my.job.settings.startdate")
    self.endDate = get_jobconf_value("my.job.settings.enddate")

  def reducer(self, groupId, meterList):
    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, self.startDate, self.endDate)    

And then set the parameters in code like you did above

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top