How to start Flume agent from java code
Question
I am using hadoop 1.2.1 stable version in centos 6.5 and using apache flume 1.x i am running the flume agent and collecting the tweets in hdfs my flume.conf is
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = ******
TwitterAgent.sources.Twitter.consumerSecret =*****
TwitterAgent.sources.Twitter.accessToken = *****
TwitterAgent.sources.Twitter.accessTokenSecret = ***
TwitterAgent.sources.Twitter.keywords = CrudeOilPrice,Crude Oil,platts oil, Oil & Gas Journal
TwitterAgent.sources.Twitter.keywords = big data,hadoop
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://master:9000/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
for run this i used command:
>bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent
now i am trying to run this with java program can any one give some idea i tried this code
public class fl {
public static void main(String[] args) throws IOException, InterruptedException
{
Process p;
p = Runtime.getRuntime().exec("/home/dsri/flume/bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent");
p.waitFor();
//p.exitValue();
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = "";
while ((line = reader.readLine())!= null)
{
System.out.println(line);
}
}
}
but is not working for me.. now i am doing this code in java....
package dsri;
//package org.jai.flume.agent;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.agent.embedded.EmbeddedAgent;
public class FlumeAgentServiceImpl {
private static EmbeddedAgent agent;
private void createAgent() {
final Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");
agent = new EmbeddedAgent("myagent");
agent.configure(properties);
agent.start();
}
public EmbeddedAgent getFlumeAgent() {
if (agent == null) {
createAgent();
}
return agent;
}
public static void main(String[] args) {
FlumeAgentServiceImpl f= new FlumeAgentServiceImpl();
System.out.println(f.getFlumeAgent());
}
}
But I am getting an exception...
org.apache.flume.FlumeException: NettyAvroRpcClient { host: collector1.apache.org, port: 5564 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:289)
at org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
at org.apache.flume.sink.LoadBalancingSinkProcessor.start(LoadBalancingSinkProcessor.java:134)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.agent.embedded.EmbeddedAgent.doStart(EmbeddedAgent.java:216)
at org.apache.flume.agent.embedded.EmbeddedAgent.start(EmbeddedAgent.java:114)
at dsri.FlumeAgentServiceImpl.createAgent(FlumeAgentServiceImpl.java:48)
at dsri.FlumeAgentServiceImpl.getFlumeAgent(FlumeAgentServiceImpl.java:53)
at dsri.FlumeAgentServiceImpl.main(FlumeAgentServiceImpl.java:61)
Caused by: java.io.IOException: Error connecting to collector1.apache.org/218.93.250.18:5564
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
... 14 more
Solution
You could also load the Flume Configuration file rather than writing it within the Java code.The same configuration could be used while starting your standalone Flume Agent.
public static void main(String[] args)
{
String[] args = new String[] { "agent", "-nAgent",
"-fflume.conf" };
Application.main(args);
}
Where "Agent" is the name of your Flume Agent.
"flume.conf" is the configuration file which should be placed in the resources folder of your Java project.
OTHER TIPS
Instead of doing that, try using an embedded agent (a far more elegant and cleaner solution). You create a Map<String, String>
with the configuration of the Flume agent you wish to run, and then create an agent and configure it.
Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");
EmbeddedAgent agent = new EmbeddedAgent("myagent");
agent.configure(properties);
agent.start();
List<Event> events = Lists.newArrayList();
events.add(event);
events.add(event);
events.add(event);
events.add(event);
agent.putAll(events);
...
agent.stop();
You can find more information about it here.
I was stuck on the same problem. You can't use EmbeddedAgent here because it only support avro sink.