Question

I am trying to use Amazon Elastic Map Reduce to run a series of simulations of several million cases. This is an Rscript streaming job with no reducer. I am using the Identity Reducer in my EMR call --reducer org.apache.hadoop.mapred.lib.IdentityReducer.

The script file works fine when tested and run locally from the command line on a Linux box when passing one line of string manually echo "1,2443,2442,1,5" | ./mapper.R and I get the one line of results that I am expecting. However, when I tested my simulation using about 10,000 cases (lines) from the input file on EMR, I only got output for a dozen lines or so out of 10k input lines. I've tried several times and I cannot figure out why. The Hadoop job runs fine without any errors. It seems like input lines are being skipped, or perhaps something is happening with the Identity reducer. The results are correct for the cases where there is output.

My input file is a csv with the following data format, a series of five integers separated by commas:

1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...

Here is my R script for mapper.R

#! /usr/bin/env Rscript

# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
    list <- lapply(casename, function(x) {
        read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
                 header = TRUE,
        stringsAsFactors = FALSE)})
    return(data.frame(list))
}

con <- file("stdin")            
line <- readLines(con, n = 1, warn = FALSE) 
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases)                      # for indexing

## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")

## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
                     stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))

## Calculations that use df d and produce a string called 'output' 
## in the form of "id: value1 value2 value3 ..." to be used at a 
## later time for agregation.

cat(output, "\n")
close(con)

The (generalized) EMR call for this simulation is:

ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2

If anyone has any insights as to why I might be experiencing these issues, I am open to suggestions, as well as any changes/optimization to the R script.

My other option is to turn the script into a function and run a parallelized apply using R multicore packages, but I haven't tried it yet. I'd like to get this working on EMR. I used JD Long's and Pete Skomoroch's R/EMR examples as a basis for creating the script.

Was it helpful?

Solution

Nothing obvious jumps out. However, can you run the job using a simple input file of only 10 lines? Make sure these 10 lines are scenarios which did not run in your big test case. Try this to eliminate the possibility that your inputs are causing the R script to not produce an answer.

Debugging EMR jobs is a skill of its own.

EDIT:

This is a total fishing expedition, but fire up a EMR interactive pig session using the AWS GUI. "Interactive pig" sessions stay up and running so you can ssh into them. You could also do this from the command line tools, but it's a little easier from the GUI since, hopefully, you only need to do this once. Then ssh into the cluster, transfer over your test case infile your cachefiles and your mapper and then run this:

cat infile.txt | yourMapper.R > outfile.txt

This is just to test if your mapper can parse the infile in the EMR environment with no Hadoop bits in the way.

EDIT 2:

I'm leaving the above text there for posterity but the real issue is your script never goes back to stdin to pick up more data. Thus you get one run for each mapper then it ends. If you run the above one liner you will only get out one result, not a result for each line in infile.txt. If you had run the cat test even on your local machine the error should pop out!

So let's look at Pete's word count in R example:

#! /usr/bin/env Rscript

trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))

## **** could wo with a single readLines or in blocks
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    words <- splitIntoWords(line)
    ## **** can be done as cat(paste(words, "\t1\n", sep=""), sep="")
    for (w in words)
        cat(w, "\t1\n", sep="")
}
close(con)

The piece your script is missing is this bit:

 while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        #do your dance
        #do your dance quick
        #come on everybody tell me what's the word
        #word up
    }

you should, naturally, replace the lyrics of Cameo's Word Up! with your actual logic.

Keep in mind that proper debugging music makes the process less painful:

http://www.youtube.com/watch?v=MZjAantupsA

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top