Question

I have a problem with a topology. I try to explain the workflow... I have a source that emits ~500k tuples every 2 minutes, these tuples must be read by a spout and processed exatly once like a single object (i think a batch in trident). After that, a bolt/function/what else?...must appends a timestamp and save the tuples into Redis.

I tried to implement a Trident topology with a Function that save all the tuples into Redis using a Jedis object (Redis library for Java) into this Function class, but when i deploy i receive a NotSerializable Exception on this object.

My question is.How can i implement a Function that writes on Redis this batch of tuples? Reading on the web i cannot found any example that writes from a function to Redis or any example using State object in Trident (probably i have to use it...)

My simple topology:

TridentTopology topology = new TridentTopology();
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379"));

Thanks in advance

Was it helpful?

Solution

(replying about state in general since the specific issue related to Redis seems solved in other comments)

The concepts of DB updates in Storm becomes clearer when we keep in mind that Storm reads from distributed (or "partitioned") data sources (through Storm "spouts"), processes streams of data on many nodes in parallel, optionally perform calculations on those streams of data (called "aggregations") and saves the results to distributed data stores (called "states"). Aggregation is a very broad term that just means "computing stuff": for example computing the minimum value over a stream is seen in Storm as an aggregation of the previously known minimum value with the new values currently processed in some node of the cluster.

With the concepts of aggregations and partition in mind, we can have a look at the two main primitives in Storm that allow to save something in a state: partitionPersist and persistentAggregate, the first one runs at the level of each cluster node without coordination with the other partitions and feels a bit like talking to the DB through a DAO, while the second one involves "repartitioning" the tuples (i.e. re-distributing them across the cluster, typically along some groupby logic), doing some calculation (an "aggregate") before reading/saving something to DB and it feels a bit like talking to a HashMap rather than a DB (Storm calls the DB a "MapState" in that case, or a "Snapshot" if there's only one key in the map).

One more thing to have in mind is that the exactly once semantic of Storm is not achieved by processing each tuple exactly once: this would be too brittle since there are potentially several read/write operations per tuple defined in our topology, we want to avoid 2-phase commits for scalability reasons and at large scale, network partitions become more likely. Rather, Storm will typically continue replaying the tuples until he's sure they have been completely successfully processed at least once. The important relationship of this to state updates is that Storm gives us primitive (OpaqueMap) that allows idempotent state update so that those replays do not corrupt previously stored data. For example, if we are summing up the numbers [1,2,3,4,5], the resulting thing saved in DB will always be 15 even if they are replayed and processed in the "sum" operation several times due to some transient failure. OpaqueMap has a slight impact on the format used to save data in DB. Note that those replay and opaque logic are only present if we tell Storm to act like that, but we usually do.

If you're interested in reading more, I posted 2 blog articles here on the subject.

http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/

http://svendvanderveken.wordpress.com/2014/02/05/error-handling-in-storm-trident-topologies/

One last thing: as hinted by the replay stuff above, Storm is a very asynchronous mechanism by nature: we typically have some data producer that post event in a queueing system (e,g. Kafka or 0MQ) and Storm reads from there. As a result, assigning a timestamp from within storm as suggested in the question may or may not have the desired effect: this timestamp will reflect the "latest successful processing time", not the data ingestion time, and of course it will not be identical in case of replayed tuples.

OTHER TIPS

Have you tried trident-state for redis. There is a code on github that does it already: https://github.com/kstyrc/trident-redis.

Let me know if this answers your question or not.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top