Hadoop Cascading - create flow with one source, two sinks
Question
I am using Cascading 2 to create Hadoop jobs and am trying to create a flow that starts with a single source. After a couple of functions are applied to the data I need to split the flow so that this data is used to create two separate reports (in two separate sinks).
//SOURCE
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, input );
//REPORT1 SINK
Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE );
//REPORT2 SINK
Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE );
//INITIAL FUNCTIONS
Pipe firstPipe = new Pipe("firstPipe");
firstPipe = new Each(firstPipe, new Fields("line"), functionA);
firstPipe = new Each(firstPipe, functionB, Fields.ALL);
//REPORT1 FUNCTION
report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS);
//REPORT2 FUNCTION
report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS);
//CONNECT FLOW PARTS
FlowDef flowDef = new FlowDef()
.setName("report-flow")
.addSource(firstPipe, source)
.addSink(report1Pipe, report1Sink)
.addSink(report2Pipe, report2Sink);
new HadoopFlowConnector( properties ).connect( flowDef ).complete();
Currently this is giving me the error "java.lang.IllegalArgumentException: cannot add duplicate sink: firstPipe" but even after messing around with it for a while I get a variety of other issues to do with the flow set up.
Is it possible for someone to explain how to construct a flow of this form (one source, two sinks)? Do I need to create a Cascade instead? Or do I need an intermediate sink to hold the data before I split?
Please help!
Solution
You can use the split pattern as mentioned in the Cascading documentation. Here's an example:
public static void main(String[] args) {
// source and sink
Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new FileTap(sourceScheme, args[0]);
Fields sinkFields = new Fields("word", "count");
Scheme sinkScheme = new TextLine(sinkFields, sinkFields);
Tap sink_one = new FileTap(sinkScheme, "out-one.txt");
Tap sink_two = new FileTap(sinkScheme, "out-two.txt");
// the pipe assembly
Pipe assembly = new Pipe("wordcount");
String regex = "\\w+";
Function function = new RegexGenerator(new Fields("word"), regex);
assembly = new Each(assembly, new Fields("line"), function);
Aggregator count = new Count(new Fields("count"));
// ...split into two pipes
Pipe countOne = new Pipe("count-one", assembly);
countOne = new GroupBy(countOne, new Fields("word"));
countOne = new Every(countOne, count);
Pipe countTwo = new Pipe("count-two", assembly);
countTwo = new GroupBy(countTwo, new Fields("word"));
countTwo = new Every(countTwo, count);
// create the flow
final List<Pipe> pipes = new ArrayList<Pipe>(2);
pipes.add(countOne);
pipes.add(countTwo);
final Map<String, Tap> sinks = new HashMap<String, Tap>();
sinks.put("count-one", sink_one);
sinks.put("count-two", sink_two);
FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(source, sinks, pipes);
flow.complete();
}
OTHER TIPS
The split pattern is in the Cascading User Guide at: http://docs.cascading.org/cascading/2.1/userguide/htmlsingle/#N21362
Another (simpler) example is included in "Cascading for the Impatient", parts 5 & 6:
- https://github.com/Cascading/Impatient/wiki/Part-5
- https://github.com/Cascading/Impatient/wiki/Part-6
One point about the code shown above is that it seem to be missing the variable definitions for report1Pipe
and report2Pipe
. To use a split pattern, each branch requires a name and the names need to be different.
The exception gets thrown because there are two branches which have both inherited the same name from earlier in the pipe assembly. So, for example, those flowDef.addSink(..)
calls are ambiguous to the flow planner.
So in "Impatient" part 5, look at how the "D", "DF", and "TF" branches get named within operations.
It may seem a bit counter-intuitive for Cascading to require this naming, but it becomes quite important in large, complex workflows when you're attaching failure traps, debug, etc., to specific branches.
Alternatively, the Cascalog DSL in Clojure is much more declarative, so this gets handled by the language directly -- branches are subqueries, and the traps, etc., get handled within the closure of a subquery.