Вопрос

I have some questions about writing a custom OutputAttributeProcessor. I use WSO2 CEP 2.1.0 and siddhi 1.1.0.

I want to create a custom OutputAttributeProcessor, so I create two java classes, TestFactory implements OutputAttributeProcessorFactory and Test implements OutputAttributeProcessor. Package of two classes is org.wso2.siddhi.extention.

TestFactory must override createAggregator and getProcessorType, and Test must override createNewInstance, getType, processInEventAttribute, and processRemoveEventAttribute.

First question is about each methods.

What should be written in getProcessorType?

And also, what is different between processInEventAttribute, and processRemoveEventAttribute?

In addition, I have one more question. I will create jar file consits of two java classes. I add the jar file to the class path at /repository/components/lib, and the fully-qualified class name for TestFactory to the siddhi.extension file located at /repository/conf/siddhi.

What is content of siddhi.extension?

Is the following a line?

org.wso2.siddhi.extention.TestFactory

If there is sample program about a custom OutputAttributeProcessor, please teach me.

Thank you in advance.

Это было полезно?

Решение

What should be written in getProcessorType?

Depending on the use case, you can return one of the AGGREGATOR or CONVERTER types here:

@Override
public ProcessorType getProcessorType() {
    return OutputAttributeProcessorFactory.ProcessorType.AGGREGATOR;
}

What exactly is your use case? As the name implies, you can use AGGREGATOR type if it does aggregations (i.e. something like calculating an average, taking min() of several events etc.)

And also, what is different between processInEventAttribute, and processRemoveEventAttribute?

These are the two methods that are used to add and remove events to/from your OutputAttributeProcessor. For an example, if you are taking an average, it needs to be done for a specific set of events (like a sliding window, usually not all events received so far), which changes dynamically. So when you receive an event through processInEventAttribute(), you can update your average including that event. Similary when processRemoveEventAttribute() is called, you can update the average removing that event. For an example, see the below code sample which calculates the average as a double value.

private double value = 0.0;
private long count=0;

public Attribute.Type getType() {
    return Attribute.Type.DOUBLE;
}


@Override
public Object processInEventAttribute(Object obj) {
    count++;
    value += (Double) obj;
    if (count == 0) {
        return 0;
    }
    return value / count;
}

@Override
public Object processRemoveEventAttribute(Object obj) {
    count--;
    value -= (Double) obj;
    if (count == 0) {
        return 0;
    }
    return value / count;
}

What is content of siddhi.extension?

It's just one line as you've mentioned. Just the fully qualified class name.

org.wso2.siddhi.extention.TestFactory
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top