Question

I'm currently trying to implement a Storm topology that integrates with the R language.

As a starting point, i took the following project (https://github.com/allenday/R-Storm) which works by extending the ShellBolt class to implement R integration, as well as an R library to handle communication between the java and R sides.

My problem is that if i create a topology based on regular (java-only) bolts, i can chain them together without issue. However, when one of the bolts in the middle of the chain is an R Shell Bolt, the thing just falls apart with:

5661 [Thread-18] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: Pipe to subprocess seems to be broken! No output read.
Shell Process Exception:


at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.daemon.executor$fn__3557$fn__3569$fn__3616.invoke(executor.clj:715) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.util$async_loop$fn__436.invoke(util.clj:377) ~[storm-0.9.0-wip16.jar:na]
at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na]
at java.lang.Thread.run(Unknown Source) ~[na:1.7.0_25]

Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Pipe to subprocess seems to be broken! No output read.

More concrete, the following topology works as expected:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("permutebolt", new PermuteBolt(), 1).shuffleGrouping("spout");

Where PermuteBolt is an R Shell Bolt. The logs for this example show the expected output:

6246 [Thread-18] INFO  backtype.storm.daemon.task - Emitting: spout default [four score and seven years ago]
6246 [Thread-16] INFO  backtype.storm.daemon.executor - Processing received message source: spout:3, stream: default, id: {}, [four score and seven years ago]
6261 [Thread-23] INFO  backtype.storm.daemon.task - Emitting: permutebolt default ["PERMUTE seven years ago and four score"]

If, however i add another bolt that gets its data from the first one, such as:

builder.setBolt("permutebolt", new PermuteBolt(), 1).shuffleGrouping("spout");
builder.setBolt("identity", new IdentityBolt(new Fields("identity")), 1).fieldsGrouping("permutebolt", new Fields("permutation"));

It fails with the trace printed above. Also, what's weird is that this second example which is failing is included with the project.

Is this an issue anyone has faced before ?

UPDATE: I noticed this only occurs when using R Shell Bolts, i have since tried launching bolts that use python scripts and have been able to chain them normally.

Was it helpful?

Solution

@andrei, this is fixed in 1.01 uploaded to github today: https://github.com/allenday/R-Storm/releases/tag/v1.01

It has been submitted to CRAN and will be available soon.

Thanks for reporting.

-Allen

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top