I am working on a dataflow including some aggregating steps in Pig and storing steps into Cassandra. I have been able to pass relatively simple data types such as integer, long or dates but can't find how to pass some sort of list, set or tuple from Pig to Cassandra using CqlStorage.

I use Pig 0.9.2, so I can't use the FLATTEN methods.

Question

How do I fill a Cassandra table carrying complex data types such as sets or lists from Pig 0.9.2?

Overview of my specific application:

  • I created the corresponding Cassandra table respecting the description:

    CREATE TABLE mycassandracf (
    my_id int,
    date timestamp,
    my_count bigint,
    grouped_ids list<bigint>,
    PRIMARY KEY (my_id, date)); 
    
  • and a STORE instruction carrying prepared statement:

    STORE CassandraAggregate
    INTO 'cql://test/mycassandracf?output_query=UPDATE+test.mycassandracf+set+my_count+%3D+%3F%2C+grouped_ids+%3D+%3F'
    USING CqlStorage;
    
  • From a 'GROUP BY' relation, I 'generate' a relation in a cql-friendly format (e.g. in tuples), that I want to store into Cassandra.

    CassandraAggregate = FOREACH GroupedRelation
        GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
        TOTUPLE('date', ISOToUnix($0.createdAt))),
        TOTUPLE(COUNT($1), $1.grouped_id);
    
    DUMP CassandraAggregate;
    
    (((my_id,30021),(date,1357084800000)),(2,{(60128490006325819),(62726281032786005)}))
    (((my_id,30165),(date,1357084800000)),(1,{(60128411174143024)}))
    (((my_id,30376),(date,1357084800000)),(4,{(60128411146211875),(63645100121476995),(60128411146211875),(63645100121476995)}))
    

Unsurprisingly, using the STORE instruction on this relation raises the exception:

java.lang.ClassCastException: org.apache.pig.data.DefaultDataBag cannot be cast to org.apache.pig.data.DataByteArray

I thus add a UDF written in python to apply some flattening on the grouped_id bag:

@outputSchema("flat_bag:bag{}")
def flattenBag(bag):
    return tuple([long(item) for tup in bag for item in tup])

I use tuple because using python sets as well as python lists ends up in getting casting errors.

Adding it to my pipeline, I have:

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(60128411146211875,63645100121476995,6012841114621187563645100121476995)))

Using the STORE instruction on this last relation raises the exception with error stack:

java.io.IOException: java.io.IOException: org.apache.thrift.transport.TTransportException
at     org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:428)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:408)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:262)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:266)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:260)
Caused by: java.io.IOException: org.apache.thrift.transport.TTransportException
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:248)
Caused by: org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.cassandra.thrift.Cassandra$Client.recv_execute_prepared_cql3_query(Cassandra.java:1724)
at org.apache.cassandra.thrift.Cassandra$Client.execute_prepared_cql3_query(Cassandra.java:1709)
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:232)

I tested the exact same workflow with simple data types and is working perfectly. What I am really looking for is the way to fill a cassandra table with complex types such as sets or lists from Pig.

Many thanks

有帮助吗?

解决方案

After further investigation, I found the solution here:

https://issues.apache.org/jira/browse/CASSANDRA-5867

Basically, CqlStorage supports complex types. For that, the type should represented by a tuple in the tuples, carrying as first element the very data type as a string. For list, this is how one does this:

# python
@outputSchema("flat_bag:bag{}")
def flattenBag(bag):
    return ('list',) + tuple([long(item) for tup in bag for item in tup])

Thus, in grunt:

# pig
CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(list, 60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411146211875,63645100121476995,6012841114621187563645100121476995)))

This is then stored into cassandra using classic encoded prepared statement.

Hope this will of some help.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top