문제

I have a large job running in AWS's elastic map reduce cluster. By large, I mean over 800,000 files I'm processing with 25,000+ records per file. In my test runs I have been using 100 m1.medium spot instances for processing.

The job seemed to be running correctly, however I noticed that the outputs (part-00000, part-00001, etc.) have records with the same key listed in multiple outputs. Aren't these supposed to be reduced down in EMR?

Any insight would be appreciated.

도움이 되었습니까?

해결책

I am running into the same issue - I am using EMR to create an "inverted index" using the streaming API:

-input s3n://mybucket/html2 -output s3n://mybucket/results -mapper s3n://mybucket/mapper.py -reducer s3n://mybucket/reduce.py

Where //mybucket/html2 has a few html files and

mapper.py:

def main(args):
    for line in sys.stdin:
        line = line.strip()
        words = line.split()            
        for word in words:
            #do some preprocessing
            if word.startswith("http://"):
                #output the URL with a count of 1
                print "%s,%s" % (word, 1)
            else:           
                #cleanup HTML tags

                url = get_url() #irrelevant
                print "%s,%s" % (word, url)

if __name__ == "__main__":
    main(sys.argv)

and reduce.py is:

def main(args):
    current_word = None
    current_count = 0
    current_url_list = []
    key = None

    for line in sys.stdin:
        line = line.strip()
        (key, val) = line.split(',', 1)

        # If key is a URL - act as word count reducer
        if key.startswith("http:"):
            # convert count (currently a string) to int
            try:
                count = int(val)
            except:
                # count was not a number, so silently
                # ignore/discard this line
                continue

            # this IF-switch only works because Hadoop sorts map output
            # by key (here: word) before it is passed to the reducer
            if current_word == key:
                current_count += count
            else:
                if current_word:
                     #Check if previous word was a regular word
                    if current_word.startswith('http:'):
                        print '%s,%s' % (current_word, current_count)
                    else:
                        # previous word was a regular word
                        print '%s,%s' % (current_word, ','.join(current_url_list))
                current_count = count
                current_word = key
        else:
            #If key is a word - as act a URL-list-appending reducer
            if current_word == key:
                if val not in current_url_list:
                    current_url_list.append(val)
            else: #Got to a new key
                if current_word:
                    #Check if previous word was a URL
                    if(current_word.startswith("http:")):
                        print '%s,%s' % (current_word, current_count)
                    else:
                        # previous word was a regular word
                        print '%s,%s' % (current_word, ','.join(current_url_list))
                current_url_list = []
                current_url_list.append(val)
                current_word = key

I am starting this flow using the AWS console wizard ("Create new job flow") and except for setting the input, output, map and reduce scripts I am leaving everything as default (except for log path).

In the output I am getting few files and in them I see the same key (each time with different values).

Maybe this can help to shed more light on the issue and help resolve it

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top