Question

I'm learning how to use Storm's Trident with Cassandra 2.0.5, Storm version 0.9.0.1. I'm also using com.hmsonline storm-cassandra 0.4.0-rc4 contrib.

My goal is simply to insert some text rows to a table with id (int), name (text) and a sentence (text) columns. id and name are primary keys.

The partitionPersist requires a StateUpdater, and for that I'm using com.hmsonline.storm.cassandra.trident.CassandraUpdater<K, C, V>. But from what it seems it gets only one key as input and not two (I need id and name). The tuple mapper (TridentTupleMapper) also uses one key:

TridentTupleMapper<K, C, V> tupleMapper

Maybe I'm missing something but how do I define multiple columns as keys?

Was it helpful?

Solution

Let me point you to the project that Brian and I have been working on that utilizes cassandra with storm: https://github.com/hmsonline/storm-cassandra-cql

There are several examples you can look at to see how to develop a CqlTupleMapper that fits your key/column mapping. The code is still being developed but there is a suitable backing map implementation for CQL3 that works for persisting aggregations as well as just storing partition persists.

For your needs, you would want to define a trident topology that groups your incoming data (sentences) by the method:

inputStream.groupBy(new Field("sentences"))

You would then implement a CqlTupleMapper -- specifically the map(K key, V value) that would have a custom CQL insert statement that maps the keys to its passed value. Your query would be something like:

@Override
public Statement map(List<String> keys, String value) {
    Insert statement = QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME);
    statement.value("id", keys.get(0));
    statement.value("name", keys.get(1));
    statement.value("sentence", value);
    return statement;
}

I hope that helps.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top