I have solved this issue by adding new aggregator and join with persist aggregate:
TridentState ClientState = impressionStream
.groupBy(new Fields("clientId"))
.persistentAggregate(getCassandraStateFactory("users", "DataComputation",
"UserImpressionCounter"), new Count(), new Fields("count));
Stream ClientBatchAggregationStream = impressionStream
.groupBy(new Fields("clientId"))
.aggregate(new SumCountAggregator(), new Fields("batchCount"));
Stream GroupingPeriodCounterStateStream = topology
.join(ClientState.newValuesStream(), new Fields("clientId"),
ClientBatchAggregationStream, new Fields("clientId"),
new Fields("clientId", "count", "batchCount"));
SumCountAggregator:
public class SumCountAggregator extends BaseAggregator<SumCountAggregator.CountState> {
static class CountState {
long count = 0;
}
@Override
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
@Override
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count += 1;
}
@Override
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}