
I am working on Apache WSO2 CEP and I am trying to do some scenarios that fits my requirements.

First I execute the sample that is explained in this link KPI Analyzer and I succced and see the result correctly. After that, I wanted to change the bucket in that sample a little bit from this

from phoneRetailStream[totalPrice>2500 and quantity>3]
insert into highPurchaseStream
buyer, brand, quantity, totalPrice; 

to this

from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by  brand;

and changed the tuple mappings accordingly. But this configuration always gives me the error

 [java] Wrongly formatted event sent for carbon.super
 [java] org.wso2.carbon.databridge.core.exception.EventConversionException: Error when converting of event bundle with events 4
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toEventList(
 [java]     at
 [java]     at java.util.concurrent.Executors$
 [java]     at java.util.concurrent.FutureTask$Sync.innerRun(
 [java]     at
 [java]     at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(
 [java]     at java.util.concurrent.ThreadPoolExecutor$
 [java]     at
 [java] Caused by: java.lang.NullPointerException
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toObjectArray(
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(
 [java]     ... 8 more

I could not use aggration functions (sum, avg etc.) of siddhi language in these samples other than jmsbroker. What may be wrong with this scenario?


This is the last version of my bucket xml file

<bucket name="KPIAnalyzer" xmlns="">
Notifies when a user purchases more then 3 phones for the total price higher than $2500.
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    <input topic="" brokerName="localAgentBroker">
        <tupleMapping stream="phoneRetailStream" queryEventType="Tuple">
            <property name="brand" inputName="brand" inputDataType="payloadData"
            <property name="quantity" inputName="quantity" inputDataType="payloadData"
            <property name="totalPrice" inputName="total" inputDataType="payloadData"
            <property name="buyer" inputName="buyer" inputDataType="payloadData"
    <query name="KPIQuery">
from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by  brand;
        <output topic="" brokerName="externalAgentBroker">
                    <property name="quantity" valueOf="quantitySum" type="java.lang.Integer"/>
                    <property name="purchasePrice" valueOf="totalpriceSum" type="java.lang.Integer"/>

thank you for your help.

È stato utile?


I have checked your siddhi query, it is working properly with siddhi engine and there is no any issue with the query...

I have checked with the KPI analyser example, it is also working without any issue for your query... Please make sure whether you have proper configurations for output tuple mappings..

Again I need to mention, there are some issue with the "bucket editable UI". Please verify the bucket xml file which in the /repository/deployment/server/cepbuckets/ is correct... (some times when deleting a property, it will not delete properly)


<property name="quantity" valueOf="quantitySum" type="java.lang.Long"/>
<property name="purchasePrice" valueOf="totalpriceSum" type="java.lang.Long"/>



Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top