Question

I have for example these batches of tuples with size of batch 5 with impressions from users:

Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]

Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]

And this is my example of saving count state:

TridentState ClientState = impressionStream
    .groupBy(new Fields("clientId"))
    .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
        "UserImpressionCounter"), new Count(), new Fields("count));

Stream ClientStream = ClientState.newValuesStream();

I have clear database and run my topology. After grouping stream by clientId I save the state with persistentAggregate function and Count aggregator. For the first batch is the result after newValuesStream method: [clientId1, 4], [clientId2, 1]. For the second batch: [clientId1, 7], [clientId2, 3] as expected.

ClientStream is used in couple of branches and in one of these branches I need to process tuples so as to have batch with size 1 because I need information about count for each tuple. Batch with size 1 is obviously crap so I have to somehow find out the previous state of the counter before I update it and emit this information with tuple there is already updated counter, e.g. for second batch [clientId1, 7, 4].

Have anybody idea how to do that?

Was it helpful?

Solution

I have solved this issue by adding new aggregator and join with persist aggregate:

TridentState ClientState = impressionStream
    .groupBy(new Fields("clientId"))
    .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
        "UserImpressionCounter"), new Count(), new Fields("count));

Stream ClientBatchAggregationStream = impressionStream
    .groupBy(new Fields("clientId"))
    .aggregate(new SumCountAggregator(), new Fields("batchCount"));

Stream GroupingPeriodCounterStateStream = topology
    .join(ClientState.newValuesStream(), new Fields("clientId"),
        ClientBatchAggregationStream, new Fields("clientId"), 
        new Fields("clientId", "count", "batchCount"));

SumCountAggregator:

public class SumCountAggregator extends BaseAggregator<SumCountAggregator.CountState> {

    static class CountState {
        long count = 0;
    }

    @Override
    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    @Override
    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector)            {
        state.count += 1;
    }

    @Override
    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }

}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top