I need to output the results of a MR job to multiple CQL3 column families.

In my reducer, I specify the CF using MultipleOutputs, but all the results are written to the one CF defined in the job's OutputCQL statement.

Job definiton:

...
job.setOutputFormatClass(CqlOutputFormat.class);
ConfigHelper.setOutputKeyspace(job.getConfiguration(), "keyspace1");
MultipleOutputs.addNamedOutput(job, "CF1", CqlOutputFormat.class, Map.class, List.class);
MultipleOutputs.addNamedOutput(job, "CF2", CqlOutputFormat.class, Map.class, List.class);
CqlConfigHelper.setOutputCql(job.getConfiguration(), "UPDATE keyspace1.CF1 SET value = ? ");
...

Reducer class setup:

mos = new MultipleOutputs(context);

Reduce method (psudo code):

keys = new LinkedHashMap<>();
keys.put("key", ByteBufferUtil.bytes("rowKey"));
keys.put("name", ByteBufferUtil.bytes("columnName"));

List<ByteBuffer> variables = new ArrayList<>();
variables.add(ByteBufferUtil.bytes("columnValue"));

mos.write("CF2", keys, variables);

The problem is that my reducer ignores the CF I specify in mos.write() and instead must just run the outputCQL. So in the example above, everything is written to CF1.

Ive tried using a prepared statement to inject the CF into the outputCQL, along the lines of "UPDATE keyspace1.? SET value = ?", but I dont think its possible to use a placeholder for the CF like this.

Is there any way I can overwrite the outputCQL inside the reducer class?

有帮助吗?

解决方案

So the simple answer is that you cannot output results from a mr job to multiple CFs. However, having the need to do this actually highlights a flaw in the approach, rather than a missing feature in Hadoop.

Instead of processing a bunch of records and trying to produce 2 different results sets in one pass, a better approach is to arrive at the desired result sets iteratively. Basically, this means having multiple jobs iterating over the results of previous jobs until the desired results are achieved.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top