Question

I have a very large MySQL table (billions of rows, with dozens of columns) I would like to convert into a ColumnFamily in Cassandra. I'm using Hector.

I first create my schema as such :

    String clusterName = "Test Cluster";
    String host = "cassandra.lanhost.com:9160";
    String newKeyspaceName = "KeyspaceName";
    String newColumnFamilyName = "CFName";

    ThriftCluster cassandraCluster;
    CassandraHostConfigurator cassandraHostConfigurator;

    cassandraHostConfigurator = new CassandraHostConfigurator(host);
    cassandraCluster = new ThriftCluster(clusterName, cassandraHostConfigurator);

    BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
    columnFamilyDefinition.setKeyspaceName(newKeyspaceName);
    columnFamilyDefinition.setName(newColumnFamilyName);    
    columnFamilyDefinition.setDefaultValidationClass("UTF8Type");
    columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
    columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);

    BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
    columnDefinition.setName(StringSerializer.get().toByteBuffer("id"));
    columnDefinition.setIndexType(ColumnIndexType.KEYS);
    columnDefinition.setValidationClass(ComparatorType.INTEGERTYPE.getClassName());
    columnDefinition.setIndexName("id_index");
    columnFamilyDefinition.addColumnDefinition(columnDefinition);

    columnDefinition = new BasicColumnDefinition();
    columnDefinition.setName(StringSerializer.get().toByteBuffer("status"));
    columnDefinition.setIndexType(ColumnIndexType.KEYS);
    columnDefinition.setValidationClass(ComparatorType.ASCIITYPE.getClassName());
    columnDefinition.setIndexName("status_index");
    columnFamilyDefinition.addColumnDefinition(columnDefinition);

        .......

    ColumnFamilyDefinition cfDef = new ThriftCfDef(columnFamilyDefinition);

    KeyspaceDefinition keyspaceDefinition = 
        HFactory.createKeyspaceDefinition(newKeyspaceName, "org.apache.cassandra.locator.SimpleStrategy", 1, Arrays.asList(cfDef));

    cassandraCluster.addKeyspace(keyspaceDefinition);

Once that done, I load my data, stored in a List, since I'm fetching the MySQL data with a namedParametersJdbcTemplate, as such :

String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String KeyspaceName = "KeyspaceName";
String ColumnFamilyName = "CFName";
final StringSerializer serializer = StringSerializer.get();

public void insert(List<SqlParameterSource> dataToInsert) throws ExceptionParserInterrupted {

    Keyspace workingKeyspace = null;
    Cluster cassandraCluster = HFactory.getOrCreateCluster(clusterName, host);
    workingKeyspace = HFactory.createKeyspace(KeyspaceName, cassandraCluster);
    Mutator<String> mutator = HFactory.createMutator(workingKeyspace, serializer);

    ColumnFamilyTemplate<String, String> template = new ThriftColumnFamilyTemplate<String, String>(workingKeyspace, ColumnFamilyName, serializer, serializer);

    long t1 = System.currentTimeMillis();

    for (SqlParameterSource data : dataToInsert) {

        String keyId = "id" + (Integer) data.getValue("id");

    mutator.addInsertion(keyId, ColumnFamilyName, HFactory.createColumn("id", (Integer) data.getValue("id"), StringSerializer.get(), IntegerSerializer.get()));
    mutator.addInsertion(keyId,ColumnFamilyName, HFactory.createStringColumn("status", data.getValue("status").toString()));

          ...............

    }

    mutator.execute();

    System.out.println(t1 - System.currentTimeMillis());

I'm inserting 100 000 lines in approximatively 1 hour, which is really slow. I heard about multi-threading my inserts, but in this particular case I don't know what to do. Should I use BatchMutate?

Was it helpful?

Solution

Yes, you should run your insertion code from multiple threads. Take a look at the following stress testing code for an example of how to do this efficiently with hector: https://github.com/zznate/cassandra-stress

An additional source of your insert performance issue may be the number of secondary indexes you are applying on the column family (each secondary index creates an additional column family 'under the hood').

Correctly designed data models should not really need a large number of secondary indexes. The following article provides a good overview of data modeling in Cassandra: http://www.datastax.com/docs/1.0/ddl/index

OTHER TIPS

There is one alternate way of achieving this. You can try exploring https://github.com/impetus-opensource/Kundera. You would love it.

Kundera is a JPA 2.0 compliant Object-Datastore Mapping Library for NoSQL Datastores and currently supports Cassandra, HBase, MongoDB and all relational datastores (Kundera internally uses Hibernate for all relational datastores).

In your case you can use your existing objects along with JPA annotations to store them in Cassandra. Since Kundera supports polyglot persistence you also use a MySQL + Cassandra combination where you can use MySQL for most of your data and Cassandra for transactional data.And since all you need to care about is objects and JPA annotations, your job would be much easier.

For performance you can have a look at https://github.com/impetus-opensource/Kundera/wiki/Kundera-Performance

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top