Question

I'm trying to run a mapReduce query on Riak 1.4 that queries by secondary index, sorts the records by date, and then limits the results to the first record.

I've got the secondary index query working. The sort doesn't seem to do anything. No errors on the sort, just returns the results unsorted. The limit on the number of records returned yields a 'bad_json' error returned by the server.

Here's what I have. It is suppose to query the "cars" bucket for the most recent car owned by "john_doe". (some names have been changed to protect the innocent;) :

JSSourceFunction dateSortFunction = new JSSourceFunction(
    "function(v) {" +
        "return v.sort(function(a, b) {" +
            "return a.issueDate - b.issueDate ;" +
        "}" +
    ");" +
"}");

IndexQuery iq = new BinValueQuery(BinIndex.named("person"), "cars", "john_doe");

MapReduceResult response = session.mapReduce(iq)
   .addMapPhase(NamedErlangFunction.MAP_OBJECT_VALUE)
   .addReducePhase(dateSortFunction)
   .addReducePhase(new NamedJSFunction("Riak.reduceLimit"), 1)
   .execute();

I've seen a number of posts on sorting and am hoping to figure it out eventually. However, I haven't seen any help on how the LIMIT function might work.

Thanks in advance!

Update: Thanks to Joe, he put me on the right track. Here's what ended up working for me. My date format is ISO 8601 (eg. 2011-05-18T17:00:00-07:00). So, I can lexically compare for the correct sorting. Also, I found javascript's array shortening method and updated the code to return up-to the first 5 objects.

JSSourceFunction sortLimitFunction = new JSSourceFunction(
    "function(v) {" +
        "v.sort(function(a, b) {" +
            "return a.issueDate < b.issueDate" +
        "}" +
    ");" +
    "if (v.length > " + "5" + ") { " +
        "v.length = " + "5" + ";" +
    "}" +
    "return v;" +
"}");

IndexQuery iq = new BinValueQuery(BinIndex.named("person"), "cars", "john_doe");

MapReduceResult response = session.mapReduce(iq)
    .addMapPhase(new NamedJSFunction("Riak.mapValuesJson"))
    .addReducePhase(sortLimitFunction)
    .execute();
Was it helpful?

Solution

For the sorting, there is a mailing list post that covers this topic. The main difference I see between that implementation and yours is the use of the JavaScript Riak.mapValuesJson function in the map phase.

For the limiting, if you want just the first item from the sorted list, try having your sort function return only the first element. While the reduce function can (and probably is) called multiple times as partial result sets arrive from the various vnodes, the first element in the consolidated list must also be the first element in the partial list where it originated, so this should give you what you are looking for:

JSSourceFunction dateSortFunction = new JSSourceFunction(
    "function(v) {" +
        "var arr = v.sort(function(a, b) {" +
            "return a.issueDate - b.issueDate ;" +
            "}" +
        ");" +
        "if (arr.length == 0) { " +
           "return [];" +
        "} else {"
           "return arr[0];" + 
        "}"
    "}"
);
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top