Question

I am working on Apache WSO2 CEP and I am trying to do some scenarios that fits my requirements.

First I execute the sample that is explained in this link KPI Analyzer and I succced and see the result correctly. After that, I wanted to change the bucket in that sample a little bit from this

from phoneRetailStream[totalPrice>2500 and quantity>3]
insert into highPurchaseStream
buyer, brand, quantity, totalPrice; 

to this

from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by  brand;

and changed the tuple mappings accordingly. But this configuration always gives me the error

 [java] Wrongly formatted event sent for carbon.super
 [java] org.wso2.carbon.databridge.core.exception.EventConversionException: Error when converting org.wso2.high.purchase.buyers.new:1.6.0 of event bundle with events 4
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:126)
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toEventList(ThriftEventConverter.java:88)
 [java]     at org.wso2.carbon.databridge.core.internal.queue.QueueWorker.run(QueueWorker.java:72)
 [java]     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
 [java]     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
 [java]     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 [java]     at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 [java]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 [java]     at java.lang.Thread.run(Thread.java:662)
 [java] Caused by: java.lang.NullPointerException
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toObjectArray(ThriftEventConverter.java:49)
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:116)
 [java]     ... 8 more

I could not use aggration functions (sum, avg etc.) of siddhi language in these samples other than jmsbroker. What may be wrong with this scenario?

@Mohanadarshan

This is the last version of my bucket xml file

<bucket name="KPIAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
Notifies when a user purchases more then 3 phones for the total price higher than $2500.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="org.wso2.phone.retail.store/1.2.0" brokerName="localAgentBroker">
        <tupleMapping stream="phoneRetailStream" queryEventType="Tuple">
            <property name="brand" inputName="brand" inputDataType="payloadData"
                      type="java.lang.String"/>
            <property name="quantity" inputName="quantity" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="totalPrice" inputName="total" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="buyer" inputName="buyer" inputDataType="payloadData"
                      type="java.lang.String"/>
        </tupleMapping>
    </input>
    <query name="KPIQuery">
        <expression>
from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by  brand;
        </expression>
        <output topic="org.wso2.high.purchase.buyers.new/1.6.0" brokerName="externalAgentBroker">
            <tupleMapping>
                <metaData>
                </metaData>
                <correlationData/>
                <payloadData>
                    <property name="quantity" valueOf="quantitySum" type="java.lang.Integer"/>
                    <property name="purchasePrice" valueOf="totalpriceSum" type="java.lang.Integer"/>
                </payloadData>
            </tupleMapping>
        </output>
    </query>
</bucket>

thank you for your help.

Was it helpful?

Solution

I have checked your siddhi query, it is working properly with siddhi engine and there is no any issue with the query...

I have checked with the KPI analyser example, it is also working without any issue for your query... Please make sure whether you have proper configurations for output tuple mappings..

Again I need to mention, there are some issue with the "bucket editable UI". Please verify the bucket xml file which in the /repository/deployment/server/cepbuckets/ is correct... (some times when deleting a property, it will not delete properly)

Change

<property name="quantity" valueOf="quantitySum" type="java.lang.Long"/>
<property name="purchasePrice" valueOf="totalpriceSum" type="java.lang.Long"/>

Thanks,

Mohan

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top