Question

I haven't got much experience working with cassandra, so please excuse me if I have put in a wrong approach.

I am trying to do bulk load in cassandra with map reduce

Basically the word count example

Reference : http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/

I have put the simple Hadoop Wordcount Mapper Example and slightly modified the driver code and the reducer as per the above example.

I have successfully generated the output file as well. Now my doubt is how to perform the loading to cassandra part? Is there any difference in my approach ?

Please advice.

This is a part of the driver code

 Job job = new Job();
 job.setJobName(getClass().getName());
 job.setJarByClass(CassaWordCountJob.class);

 Configuration conf = job.getConfiguration();
 conf.set("cassandra.output.keyspace", "test");
 conf.set("cassandra.output.columnfamily", "words");
 conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
 conf.set("cassandra.output.thrift.port","9160");    // default
 conf.set("cassandra.output.thrift.address", "localhost");
 conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

 job.setMapperClass(CassaWordCountMapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 job.setReducerClass(CassaWordCountReducer.class);
 FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra")); 
 MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
 return job.waitForCompletion(true) ? 0 : 1;

Mapper is the same as the normal wordcount mapper that just tokenizes and emits Word, 1

The reducer class is of the form

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());
        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);
        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;
        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}
Was it helpful?

Solution

To do bulk loads into Cassandra, I would advise looking at this article from DataStax. Basically you need to do 2 things for bulk loading:

  • Your output data won't natively fit into Cassandra, you need to transform it to SSTables.
  • Once you have your SSTables, you need to be able to stream them into Cassandra. Of course you don't simply want to copy each SSTable to every node, you want to only copy the relevant part of the data to each node

In your case when using the BulkOutputFormat, it should do all that as it's using the sstableloader behind the scenes. I've never used it with MultipleOutputs, but it should work fine.

I think the error in your case is that you're not using MultipleOutputs correctly: you're still doing a context.write, when you should really be writing to your MultipleOutputs object. The way you're doing it right now, since you're writing to the regular Context, it will get picked up by the default output format of TextOutputFormat and not the one you defined in your MultipleOutputs. More information on how to use the MultipleOutputs in your reducer here.

Once you write to the correct output format of BulkOutputFormat like you defined, your SSTables should get created and streamed to Cassandra from each node in your cluster - you shouldn't need any extra step, the output format will take care of it for you.

Also I would advise looking at this post, where they also explain how to use BulkOutputFormat, but they're using a ConfigHelper which you might want to take a look at to more easily configure your Cassandra endpoint.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top