Question

I want to create a parallel scanLeft(computes prefix sums for an associative operator) function for Hadoop (scalding in particular; see below for how this is done).

Given a sequence of numbers in a hdfs file (one per line) I want to calculate a new sequence with the sums of consecutive even/odd pairs. For example:

input sequence:

0,1,2,3,4,5,6,7,8,9,10

output sequence:

0+1, 2+3, 4+5, 6+7, 8+9, 10

i.e.

1,5,9,13,17,10

I think in order to do this, I need to write an InputFormat and InputSplits classes for Hadoop, but I don't know how to do this.

See this section 3.3 here. Below is an example algorithm in Scala:

 // for simplicity assume input length is a power of 2

def scanadd(input : IndexedSeq[Int]) : IndexedSeq[Int] =   
if (input.length == 1)
  input 
else { 
//calculate a new collapsed sequence which is the sum of sequential even/odd pairs 
val collapsed = IndexedSeq.tabulate(input.length/2)(i => input(2 * i) + input(2*i+1))

//recursively scan collapsed values
val scancollapse = scanadd(collapse)

//now we can use the scan of the collapsed seq to calculate the full sequence

val output = IndexedSeq.tabulate(input.length)(
i => i.evenOdd match {             

//if an index is even then we can just look into the collapsed sequence and get the value
// otherwise we can look just before it and add the value at the current index

   case Even => scancollapse(i/2) 
   case Odd => scancollapse((i-1)/2) + input(i)  
}

output
}

I understand that this might need a fair bit of optimization for it to work nicely with Hadoop. Translating this directly I think would lead to pretty inefficient Hadoop code. For example, Obviously in Hadoop you can't use an IndexedSeq. I would appreciate any specific problems you see. I think it can probably be made to work well, though.

Was it helpful?

Solution 2

This was the best tutorial I found for writing an InputFormat and RecordReader. I ended up reading the whole split as one ArrayWritable record.

OTHER TIPS

Superfluous. You meant this code?

val vv = (0 to 1000000).grouped(2).toVector
vv.par.foldLeft((0L, 0L, false))((a, v) => 
    if (a._3) (a._1, a._2 + v.sum, !a._3) else (a._1 + v.sum, a._2, !a._3))
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top