Question

I have some data that is keyed by ids in the range of 0 to 200-something million and I need to split it up into bucks for ranges like 0-5mil, 5mil - 10mil, etc.

I'm attempting to use a custom partitioner on Hadoop for this final part so that the last part of my code looks something like this:

Conns = FOREACH ConnsGrouped GENERATE group as memberId, $1.companyId as companyIds;
ConnsPartitioned = DISTINCT Conns PARTITION BY com.mypackage.SearchNodePartitioner PARALLEL 50;

rmf $connections_file

Store ConnsPartitioned INTO 'test' using AvroStorage(...);

My partitioner looks like this:

public class SearchNodePartitioner<Long, V> implements Partitioner<Long, V>
{
    @Override
    public void configure(JobConf conf) 
    {
        // Nothing
    }

    @Override
    public int getPartition(Long key, V value, int numPartitions) 
    {
       return new Double(Math.floor(key / (5.0 * Math.pow(10, 6)))).intValue() % numPartitions;
    }

}

but it doesn't seem to be called at all. Even when I replace the return line with return 1; the data across files seems to be hash distributed with the default behaviors.

Was it helpful?

Solution

The answer to DISTINCT + custom partitioner is: you can't do that anymore (as I just found out). DISTINCT now uses a optimized special partitioner.

See:

http://mail-archives.apache.org/mod_mbox/pig-user/201307.mbox/%3C14FE3AC3-DBA5-4898-AF94-0C34819A0D8B%40hortonworks.com%3E

https://issues.apache.org/jira/browse/PIG-3385

A workaround:

A = //some tuple...;

B = GROUP A BY field PARTITION BY custom;

STORE B INTO 'foo' USING ....;

Later:

B = LOAD 'foo' USING ...;

A = FOREACH B GENERATE FLATTEN($1);

OTHER TIPS

One way you can do this is:

A = LOAD ............
SPLIT A INTO B IF <your range condition> , C IF < your range condition>
STORE B ...
STORE C ...

Or else you can try this:

 B = FILTER A BY $1 >= <lower_Range> AND $1 <= <upper_Range>;

Also, since you've already written a custom partitioner, implementing this would be easy with MapReduce.

Your Map Class would just emit pairs, and your custom partitioner would send the appropriate range of values to a given reducer. However, I'm not sure what exactly you want to do once you partition your input data, so I can't comment on what the reducer has to do.

You can set your custom partitioner class in your Main method as:

Job.setPartitionerClass(<your custom partitioner class>);
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top