Question

I'm using Riak 2.0.0b1 on Ubuntu 12.10 (up to date). This is a developer box, so I have only one Riak instance - no clusters, etc.

I've put about 100k JSON documents (about 300 bytes each) into a bucket and now am trying to mapreduce over it. The data is random and I've also got 2i index on one of the keys which is basically dividing the dataset into 10 almost even parts of ~10k documents.

This query works as expected:

curl -XPOST -d'{
  "inputs": {"bucket": "bucket", "index": "idx_bin", "key": "10"},
  "query": [
    {
      "map": {
        "language": "javascript",
        "source": "Riak.mapValuesJson"
      }
    }
  ]
}' http://localhost:8080/mapred -H 'Content-Type: application/json' | python -m json.tool | egrep '^ {4}\{' | wc -l

9974

Got about ~10k results. Now if I want to do something in the reduce step, I get an answer which doesn't make sense:

curl -XPOST -d'{
  "inputs": {"bucket": "bucket", "index": "idx_bin", "key": "10"},
  "query": [
    {
      "map": {
        "language": "javascript",
        "source": "Riak.mapValuesJson"
      }
    },
    {
      "reduce": {
         "language": "javascript",
        "source": "function(o) { return [o.length] }"
      }
    }
  ]
}' http://localhost:8080/mapred -H 'Content-Type: application/json' | python -m json.tool

[
    15
]

I'd like to see an error here if I'm reaching some (un)documented limits or a full list of objects please, not 15. (This number differs between runs; sometimes there's a couple more.) I went to the configs and done this:

javascript.map_pool_size = 64
javascript.reduce_pool_size = 64
javascript.maximum_stack_size = 32MB
javascript.maximum_heap_size = 64MB

Didn't help at all.

What is going on and how to get all objects in the reduce phase?

Was it helpful?

Solution

The reduce function is called many times. The map function will be run on about 1/3 of the vnodes in the cluster (that's 22 times in a cluster with ring_size 64), the reduce function will be called each time results are available from a map function, with it's first argument being a list containing both the result from the previous run of the reduce function, and the results from the map function. In your case, you counted the values returned from the first vnode, which was then passed as a value included with the second vnode's results, and only counted as a single value.

What you will need to do is have the reduce function return a value/object that is easily differentiated from the other values, such as

function(o) {
  var prevCount = 0;
  var countObjects = 0;
  for each (e in o) {
    if (typeof e === 'object' && typeof e.reduce_running_total === 'number') {
       prevCount += e.reduce_running_total;
       countObjects += 1;
    }
  }
  return([{"reduce_running_total":o.length + prevCount - countObjects}]);
}

Or, you could save some network bandwidth, and instead of having the map phase return all of the objects, have the map function return a literal [1] for each key found, then the reduce function simply sums up all the numbers in the input list and returns them.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top