Question

I am trying to perform a mapreduce job using the Python MRJob lib and am having some issues getting it to properly distribute across my Hadoop cluster. I believe I am simply missing a basic principle of mapreduce. My cluster is a small, one master one slave test cluster. The basic idea is that I'm just requesting a series of web pages with parameters, doing some analysis on them and returning back some properties on the web page.

The input to my map function is simply a list of URLs with parameters such as the following:

http://guelph.backpage.com/automotive/?layout=bla&keyword=towing
http://guelph.backpage.com/whatever/?p=blah
http://semanticreference.com/search.html?go=Search&q=red
http://copiahcounty.wlbt.com/h/events?ename=drupaleventsxmlapi&s=rrr
http://sweetrococo.livejournal.com/34076.html?mode=ffff

Such that the key-value pairs for the initial input are just key:None, val:URL.

The following is my map function:

def mapper(self, key, url):
'''Yield domain as the key, and (url, query parameter) tuple as the value'''

parsed_url = urlparse(url)
domain = parsed_url.scheme + "://" + parsed_url.netloc + "/"

if self.myclass.check_if_param(parsed_url):

    parsed_url_query = parsed_url.query
    url_q_dic = parse_qs(parsed_url_query)

    for query_param, query_val in url_q_dic.iteritems():

        #yielding a tuple in mrjob will yield a list
        yield domain, (url, query_param)

Pretty simple, I'm just checking to make sure the URL has a parameter and yielding the URL's domain as key and a tuple giving me the URL and the query parameter as value which MRJob kindly transforms into a list to pass to the reducer, which is the following:

def reducer(self, domain, url_query_params):

    final_list = []
    for url_query_param in url_query_params:

        url_to_list_props = url_query_param[0]
        param_to_list_props = url_query_param[1]

        #set our target that we will request and do some analysis on
        self.myclass.set_target(url_to_list_props, param_to_list_props)

        #perform a bunch of requests and do analysis on the URL requested
        props_list = self.myclass.get_props()

        for prop in props_list:

            final_list.append(prop)

    #index this stuff to a central db
    MapReduceIndexer(domain, final_list).add_prop_info()


    yield domain, final_list

My problem is that only one reducer task is run. I would expect the number of reducer tasks to be equal to the number of unique keys emitted by the mapper. The end result with the above code is that I have one reducer which runs on the master, but the slave sits idly and does nothing, which is obviously not ideal. I notice that in my output a few mapper tasks are started, but always only 1 reducer task. Other than that, the task runs smoothly and all works as expected.

My question is... what the heck am I doing wrong? Am I misunderstanding the reduce step or screwing up my key-value pairs somewhere? Why are there not multiple reducers running on this job?

Update: OK so from the answer given I increased mapred.reduce.tasks to higher (it was the default which I now realize is 1). This was indeed why I was getting 1 reducer. I now see 3 reduce tasks being performed simultaneously. I now have an import error on my slave that needs to be resolved but at least I am getting somewhere...

Was it helpful?

Solution

The number of reducers is totally unrelated to the form of your input data. For MRJob it looks like you need bootstrap options

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top