Question

I am new to Flume-ng. I have to write a program, which can transfer a text file to other program (agent). I know we must know about agent i.e. host-ip, port number etc. Then a source, sink and a channel should be defined. I just want to transfer a log file to server. My client code is as follows. public class MyRpcClientFacade {

public class MyClient{

  private RpcClient client;
  private String hostname;
  private int port;

  public void init(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);

      }

      public void sendDataToFlume(String data) {
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
        }
      }

      public void cleanUp() {
        client.close();
      }
}

Above code can send only String data to specified process. But i have to send files. Moreover tell me please that whether Source,Channel and Sink have to be written onto server? And if so, how to configure and write these three. Please help me. Give a small sample of Source,Sink And Channel

Was it helpful?

Solution

Actually you just have to get flume client on each node. Then you provide a config file providing information about their behaviors. For instance, if your node read a file (read each new lines and send them as events to channel ), and send file contents trough a RPC socket. Your configuration will look like :

  # sources/sinks/channels list
  <Agent>.sources = <Name Source1>
  <Agent>.sinks = <Name Sink1>
  <Agent>.channels = <Name Channel1> 
  # Channel attribution to a source
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel attribution to sink
  <Agent>.sinks.<Name Sink1>.channels = <Name Channel1>
  # Configuration (sources,channels and sinks)
  # Source properties : <Name Source1>
  <Agent>.sources.<Name Source1>.type = exec
  <Agent>.sources.<Name Source1>.command = tail -F test
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel properties : <Name Channel1>
  <Agent>.channels.<Name Channel1>.type = memory
  <Agent>.channels.<Name Channel1>.capacity = 1000
  <Agent>.channels.<Name Channel1>.transactionCapacity = 1000
  # Sink properties : <Name Sink1>
  <Agent>.sinks.<Nom Sink1>.type = avro
  <Agent>.sinks.<Nom Sink1>.channel = <Nom Channel1>
  <Agent>.sinks.<Nom Sink1>.hostname = <HOST NAME or IP>
  <Agent>.sinks.<Nom Sink1>.port = <PORT NUMBER>

Then you will have to set an agent, which will read on an avro source on same port and process the event the way you want to store them. I hope it helps ;)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top