Question

I am using Cascading 2 to create Hadoop jobs and am trying to create a flow that starts with a single source. After a couple of functions are applied to the data I need to split the flow so that this data is used to create two separate reports (in two separate sinks).

    //SOURCE
    Scheme sourceScheme = new TextLine( new Fields( "line" ) );
    Tap source = new Hfs( sourceScheme, input );

    //REPORT1 SINK
    Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
    Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE );

    //REPORT2 SINK
    Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
    Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE );

    //INITIAL FUNCTIONS
    Pipe firstPipe = new Pipe("firstPipe");
    firstPipe = new Each(firstPipe, new Fields("line"), functionA);
    firstPipe = new Each(firstPipe, functionB, Fields.ALL);

    //REPORT1 FUNCTION
    report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS);

    //REPORT2 FUNCTION
    report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS);

    //CONNECT FLOW PARTS
    FlowDef flowDef = new FlowDef()
    .setName("report-flow")
    .addSource(firstPipe, source)
    .addSink(report1Pipe, report1Sink)
    .addSink(report2Pipe, report2Sink);

    new HadoopFlowConnector( properties ).connect( flowDef ).complete();

Currently this is giving me the error "java.lang.IllegalArgumentException: cannot add duplicate sink: firstPipe" but even after messing around with it for a while I get a variety of other issues to do with the flow set up.

Is it possible for someone to explain how to construct a flow of this form (one source, two sinks)? Do I need to create a Cascade instead? Or do I need an intermediate sink to hold the data before I split?

Please help!

Was it helpful?

Solution

You can use the split pattern as mentioned in the Cascading documentation. Here's an example:

public static void main(String[] args) {
    // source and sink
    Scheme sourceScheme = new TextLine(new Fields("line"));
    Tap source = new FileTap(sourceScheme, args[0]);

    Fields sinkFields = new Fields("word", "count");
    Scheme sinkScheme = new TextLine(sinkFields, sinkFields);
    Tap sink_one = new FileTap(sinkScheme, "out-one.txt");
    Tap sink_two = new FileTap(sinkScheme, "out-two.txt");

    // the pipe assembly
    Pipe assembly = new Pipe("wordcount");

    String regex = "\\w+";
    Function function = new RegexGenerator(new Fields("word"), regex);
    assembly = new Each(assembly, new Fields("line"), function);

    Aggregator count = new Count(new Fields("count"));

    // ...split into two pipes
    Pipe countOne = new Pipe("count-one", assembly);
    countOne = new GroupBy(countOne, new Fields("word"));
    countOne = new Every(countOne, count);

    Pipe countTwo = new Pipe("count-two", assembly);
    countTwo = new GroupBy(countTwo, new Fields("word"));
    countTwo = new Every(countTwo, count);

    // create the flow
    final List<Pipe> pipes = new ArrayList<Pipe>(2);
    pipes.add(countOne);
    pipes.add(countTwo);

    final Map<String, Tap> sinks = new HashMap<String, Tap>();
    sinks.put("count-one", sink_one);
    sinks.put("count-two", sink_two);

    FlowConnector flowConnector = new LocalFlowConnector();
    Flow flow = flowConnector.connect(source, sinks, pipes);

    flow.complete();
}

OTHER TIPS

The split pattern is in the Cascading User Guide at: http://docs.cascading.org/cascading/2.1/userguide/htmlsingle/#N21362

Another (simpler) example is included in "Cascading for the Impatient", parts 5 & 6:

One point about the code shown above is that it seem to be missing the variable definitions for report1Pipe and report2Pipe. To use a split pattern, each branch requires a name and the names need to be different.

The exception gets thrown because there are two branches which have both inherited the same name from earlier in the pipe assembly. So, for example, those flowDef.addSink(..) calls are ambiguous to the flow planner.

So in "Impatient" part 5, look at how the "D", "DF", and "TF" branches get named within operations.

It may seem a bit counter-intuitive for Cascading to require this naming, but it becomes quite important in large, complex workflows when you're attaching failure traps, debug, etc., to specific branches.

Alternatively, the Cascalog DSL in Clojure is much more declarative, so this gets handled by the language directly -- branches are subqueries, and the traps, etc., get handled within the closure of a subquery.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top