Question

I am evaluating cassandra. I am using the datastax driver and CQL.

I would like to store some data with the following internal structure, where the names are different for each update.

+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

So time should be the column key, and name should be the row key. The CQL statement I use to create this table is:

CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

I want the schema to be this way for ease of querying. I also have to occasionally store updates with more than 65000 rows. So using the cassandra list/set/map data types is not an option.

I have to be able to handle at least 1000 wide row inserts per second, with a varying but large (~1000) number of name/value pairs.

The problem is the following: I have written a simple benchmark that does 1000 wide row inserts of 10000 name/value pairs each. I am getting very slow performance with CQL and the datastax driver, whereas the version that does not use CQL (using astyanax) has good performance on the same test cluster.

I have read this related question, and in the accepted answer of this question suggests that you should be able to atomically and quickly create a new wide row by using batch prepared statements, which are available in cassandra 2.

So I tried using those, but I still get slow performance (two inserts per second for a small three-node cluster running on localhost). Am I missing something obvious, or do I have to use the lower level thrift API? I have implemented the same insert with a ColumnListMutation in astyanax, and I get about 30 inserts per second.

If I have to use the lower level thrift API:

  • is it actually deprecated, or is it just inconvenient to use because it is lower level?

  • will I be able to query a table created with the thrift api with CQL?

Below is a self-contained code example in scala. It simply creates a batch statement for inserting a wide row with 10000 columns and times the insertion performance repeatedly.

I played with the options of BatchStatement and with the consistency level, but nothing could get me better performance.

The only explanation I have is that despite the batch consisting of prepared statements, the entries are added to the row one by one.


package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}

Here is the astyanax code (modified from an astyanax example) that does essentially the same thing 15 times faster. Note that this also does not use asynchronous calls so it is a fair comparison. This requires the column family to already exist, since I did not yet figure out how to create it using astyanax and the example did not have any code for creating the columnfamily.

package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

Update: I found this thread on the cassandra-user mailing list. It seems that there is a performance problem with CQL when doing large wide row inserts. There is a ticket CASSANDRA-6737 to track this issue.

Update2: I have tried out the patch that is attached to CASSANDRA-6737, and I can confirm that this patch completely fixes the issue. Thanks to Sylvain Lebresne from DataStax for fixing this so quickly!

Était-ce utile?

La solution 2

You are not the only person to experience this. I wrote a blog post a while ago focused more on conversion between CQL and thrift, but there are links to mail list issues of folks seeing the same thing (the performance issue of wide-row inserts were my initial motivations for investigating): http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

In sum - CQL is great for removing the burdens of dealing with typing and understanding the data model for folks new to Cassandra. The DataStax driver is well written and contains lots of useful features.

However, the Thrift API is more than slightly faster for wide row inserts. The Netflix blog does not go in to this specific use case so much. Further, the Thrift API is not legacy so long as people are using it (many folks are). It's an ASF project and as such is not run by any single vendor.

In general, with any Cassandra-based application, if you find a way of doing something that meets (or often exceeds) the performance requirements of your workload, stick with it.

Autres conseils

You have a mistake in your code that I think explains a lot of the performance problems you're seeing: for each batch you prepare the statement again. Preparing a statement isn't super expensive, but doing it as you do adds a lot of latency. The time you spend waiting for that statement to be prepared is time you don't build the batch, and time Cassandra doesn't spend processing that batch. A prepared statement only needs to be prepared once and should be re-used.

I think much of the bad performance can be explained latency problems. The bottleneck is most likely your application code, not Cassandra. Even if you only prepare that statement once, you still spend most of the time either being CPU bound in the application (building a big batch) or not doing anything (waiting for the network and Cassandra).

There are two things you can do: first of all use the async API of the CQL driver and build the next batch while the network and Cassandra are busy with the one you just completed; and secondly try running multiple threads doing the same thing. The exact number of threads you'll have to experiment with and will depend on the number of cores you have and if you're running one or three nodes on the same machine.

Running a three node cluster on the same machine makes the cluster slower than running a single node, while running on different machines makes it faster. Also running the application on the same machine doesn't exactly help. If you want to test performance, either run only one node or run a real cluster on separate machines.

Batches can give you extra performance, but not always. They can lead to the kind of problem you're seeing in your test code: buffer bloat. Once batches get too big your application spends too much time building them, then too much time pushing them out on the network, and too much time waiting for Cassandra to process them. You need to experiment with batch sizes and see what works best (but do that with a real cluster, otherwise you won't see the effects of the network, which will be a big factor when your batches get bigger).

And if you use batches, use compression. Compression makes no difference in most request loads (responses are another matter), but when you send huge batches it can make a big difference.

There's nothing special about wide row writes in Cassandra. With some exceptions the schema doesn't change the time it takes to process a write. I run applications that do tens of thousands of non-batched mixed wide-row and non-wide-row writes per second. The clusters aren't big, just three or four m1.xlarge EC2 nodes each. The trick is never to wait for an request to return before sending the next (that doesn't mean fire and forget, just handle the responses in the same asynchronous manner). Latency is a performance killer.

Some things you can try... In your cassandra.yaml (this is Cassandra 1.2.x, maybe the params are called somewhat differently in 2.x):

  • Disable the row cache (row_cache_size_in_mb: 0)
  • Increase the memory limit before in-memory rows spill over to disk (min_memory_compaction_limit_in_mb), only do this if you see some log output that sais that spilling does happen
  • Make sure num_tokens / initial_token values are configured properly so rows get distributed across your nodes

Other things you can try:

  • Provide all node IPs in your cluster to the client, not just one
  • Provide more RAM to each Cassandra node
  • Try to run your test multi-threaded
  • Make sure you have JNA installed and in use if you run Cassandra on Linux

Things to clarify:

  • Have you confirmed via nodetool that the 3 nodes have found each other?
  • What does nodetool say about the load distribution of your 3 nodes?
  • What does the physical host of your virtual cluster say about CPU and I/O usage? Maybe it has simply maxed out?
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top