Question

What is the recommended way to create a completely custom aggregator in mule 3.x? By completely custom, I mean according to my own logic, not using correlation IDs, message counts, etc.

The documentation on the mulesoft site is outdated, saying to use AbstractEventAggregator which does not exist in 3.x:

http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio

Digging deeper, it looks like this class has been renamed to AbstractAggregator in 3.x:

http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/routing/AbstractAggregator.html

However, there are no examples that show how to use this. The LoanBroker example described in the first link above actually uses a correlation aggregator (in the 2.x examples, which I assume is what the document is referring to).

At one point, there was an abstract class that had abstract methods shouldAggregate and doAggregate. This is the kind of class I would like to extend.

Was it helpful?

Solution

Look at TestAggregator below for an example of subclassing AbstractAggregator.

import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.transformer.TransformerException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.AggregationException;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.CollectionCorrelatorCallback;
import org.mule.routing.correlation.EventCorrelatorCallback;
import org.mule.util.concurrent.ThreadNameHelper;

import java.util.Iterator;

public class TestAggregator extends AbstractAggregator
{
    @Override
    protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
    {
        return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
        {
            @Override
            public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
            {
                StringBuffer buffer = new StringBuffer(128);

                try
                {
                    for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
                    {
                        MuleEvent event = iterator.next();
                        try
                        {
                            buffer.append(event.transformMessageToString());
                        }
                        catch (TransformerException e)
                        {
                            throw new AggregationException(events, null, e);
                        }
                    }
                }
                catch (ObjectStoreException e)
                {
                    throw new AggregationException(events,null,e);
                }

                logger.debug("event payload is: " + buffer.toString());
                return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
            }
        };
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top