Question

I had read in a Flume book that if in the intercept method of interceptor an event is returned as null, the event will be dropped. Hence i have created a custom interceptor which on basis of a condition returns the event as null like:

public Event intercept(Event event) {
    // TODO Auto-generated method stub
    Event finalEvent = event;
    check = new String(event.getBody(),Charsets.UTF_8);

    if(check.matches("([0-9]-.+?-.+?-[0-9][0-9]+)")){

        try {
            fileWriter.append(new String(event.getBody(),Charsets.UTF_8)+ "\n");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finalEvent = null;
    }
    System.out.println("Event is : " + finalEvent);
    return finalEvent;
}

The interceptor emits null event but the file channel still passes it to the HDFS sink as empty. Why doesn't the event get dropped?? I am using Spooling directory as source.

Was it helpful?

Solution 2

In my interceptor class, the method intercept(Event event) contains the logic as to how the data flows(as written in the question) which returns null in case the regex is not satisfied, and in the method intercept(List<Event> events) the null events are excluded which servers the purpose. Following is the intercept(List<Event> events) code:

public List<Event> intercept(List<Event> events) 
    {
          List<Event> interceptedEvents = new ArrayList<Event>(events.size());
          for (Event event : events) 
          {
              // Intercept any event
              Event interceptedEvent = intercept(event);
              if(interceptedEvent!=null)
                  interceptedEvents.add(interceptedEvent);
          }

          return interceptedEvents;
    }

OTHER TIPS

Let's see what happens.you use Spooling directory as source, the source call function processEventBatch(events),inside processEventBatch() :

events = interceptorChain.intercept(events);//use your custom interceptor
...
eventQueue.add(event); // add user event to queue,even the event == null

if Spooling directory source use processEvent(),then your interceptor will works: inside processEvent():

event = interceptorChain.intercept(event);
if (event == null) {
  //null event then return !! intercept works!!
  return;
}

so you should modiry processEventBatch(),and do:

if (event == null){
    //dont add to eventQueue
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top