Question

I am using hadoop 1.2.1 stable version in centos 6.5 and using apache flume 1.x i am running the flume agent and collecting the tweets in hdfs my flume.conf is

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = ******
TwitterAgent.sources.Twitter.consumerSecret =*****
TwitterAgent.sources.Twitter.accessToken = *****
TwitterAgent.sources.Twitter.accessTokenSecret = ***
TwitterAgent.sources.Twitter.keywords = CrudeOilPrice,Crude Oil,platts oil, Oil & Gas Journal 
TwitterAgent.sources.Twitter.keywords = big data,hadoop
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://master:9000/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

for run this i used command:

>bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent

now i am trying to run this with java program can any one give some idea i tried this code

public class fl {

    public static void main(String[] args) throws IOException, InterruptedException
    {
        Process p;

        p = Runtime.getRuntime().exec("/home/dsri/flume/bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent");

        p.waitFor();
         //p.exitValue();

        BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));

        String line = "";

        while ((line = reader.readLine())!= null) 
        {
        System.out.println(line);
        }
    }
}

but is not working for me.. now i am doing this code in java....

package dsri;
//package org.jai.flume.agent;

import java.util.HashMap;
import java.util.Map;
import org.apache.flume.agent.embedded.EmbeddedAgent;


public class FlumeAgentServiceImpl {

    private static EmbeddedAgent agent;
    private void createAgent() {
        final Map<String, String> properties = new HashMap<String, String>();

        properties.put("channel.type", "memory");
        properties.put("channel.capacity", "200");
        properties.put("sinks", "sink1 sink2");
        properties.put("sink1.type", "avro");
        properties.put("sink2.type", "avro");
        properties.put("sink1.hostname", "collector1.apache.org");
        properties.put("sink1.port", "5564");
        properties.put("sink2.hostname", "collector2.apache.org");
        properties.put("sink2.port",  "5565");
        properties.put("processor.type", "load_balance");

        agent = new EmbeddedAgent("myagent");
        agent.configure(properties);
        agent.start();

    }

    public EmbeddedAgent getFlumeAgent() {
        if (agent == null) {
            createAgent();
        }
        return agent;
    }

    public static void main(String[] args) {
        FlumeAgentServiceImpl f= new FlumeAgentServiceImpl();
        System.out.println(f.getFlumeAgent());
    }

}

But I am getting an exception...

org.apache.flume.FlumeException: NettyAvroRpcClient { host: collector1.apache.org, port: 5564 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:289)
at org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
at org.apache.flume.sink.LoadBalancingSinkProcessor.start(LoadBalancingSinkProcessor.java:134)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.agent.embedded.EmbeddedAgent.doStart(EmbeddedAgent.java:216)
at org.apache.flume.agent.embedded.EmbeddedAgent.start(EmbeddedAgent.java:114)
at dsri.FlumeAgentServiceImpl.createAgent(FlumeAgentServiceImpl.java:48)
at dsri.FlumeAgentServiceImpl.getFlumeAgent(FlumeAgentServiceImpl.java:53)
at dsri.FlumeAgentServiceImpl.main(FlumeAgentServiceImpl.java:61)
Caused by: java.io.IOException: Error connecting to collector1.apache.org/218.93.250.18:5564
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
... 14 more
Was it helpful?

Solution

You could also load the Flume Configuration file rather than writing it within the Java code.The same configuration could be used while starting your standalone Flume Agent.

public static void main(String[] args)
{
    String[] args = new String[] { "agent", "-nAgent",
            "-fflume.conf" };

    Application.main(args);

}

Where "Agent" is the name of your Flume Agent.
"flume.conf" is the configuration file which should be placed in the resources folder of your Java project.

OTHER TIPS

Instead of doing that, try using an embedded agent (a far more elegant and cleaner solution). You create a Map<String, String> with the configuration of the Flume agent you wish to run, and then create an agent and configure it.

Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port",  "5565");
properties.put("processor.type", "load_balance");

EmbeddedAgent agent = new EmbeddedAgent("myagent");

agent.configure(properties);
agent.start();

List<Event> events = Lists.newArrayList();

events.add(event);
events.add(event);
events.add(event);
events.add(event);

agent.putAll(events);

...

agent.stop();

You can find more information about it here.

I was stuck on the same problem. You can't use EmbeddedAgent here because it only support avro sink.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top