Question

The problem with the Cassandra Python driver is that the "future" objects returned add a callback via side effect. Meaning the "future" is not composable in the same sense that a Future from either Javascript or Scala is composable. I am wondering if there is a pattern that can be used to bring about transforming a non-composable future into a composable future (preferably without leak memory.)

   my_query_object.insert(1, 2, 3, 'Fred Flinstone')
     .insert(1, 2, 3, 'Barney Rubble')
     .insert(5000, 2, 3, 'George Jetson')
     .insert(5000, 2, 3, 'Jane his wife')

Looking at the performance section of the Cassandra Python driver from Datastax, I see an example of how they're creating a constantly chainable series of insert queries. Namely a slightly more complex version of this pattern:

def insert_next(previous_result=sentinel):
    if previous_result is not sentinel:
        if isinstance(previous_result, BaseException):
            log.error("Error on insert: %r", previous_result)

    future = session.execute_async(query)
    # NOTE: this callback also handles errors
    future.add_callbacks(insert_next, insert_next)

which works great as a toy example. The minute one query is done, another equivalent query is executed again. This scheme allows them to achieve 7k writes/s while the version which does not attempt to "chain" callbacks caps out around 2k writes/s.

I've been trying to get my head around creating some sort of mechanism which allows me to recapture that exact mechanism but to no avail. Anyone come up with something like it?

Was it helpful?

Solution

Took me a bit to think about how to preserve the future in some form or another:

import logging
from Queue import Queue #queue in python 3
from threading import Event #hmm... this needed?


insert_logger = logging.getLogger('async_insert')
insert_logger.setLevel(logging.INFO)

def handle_err(err):
  insert_logger.warning('Failed to insert due to %s', err)


#Designed to work in a high write environment. Chained callbacks for best performance and fast fail/stop when error
#encountered. Next insert should re-up the writing. Potential loss of failed write. Some guarantee on order of write
#preservation.
class CappedQueueInserter(object):
  def __init__(self, session, max_count=0):
    self.__queue = Queue(max_count)
    self.__session = session
    self.__started = Event()

  @property
  def started(self):
    return self.__started.is_set()

  def insert(self, bound_statement):
    if not self.started:
      self._begin(bound_statement)
    else:
      self._enqueue(bound_statement)

  def _begin(self, bound_statement):
    def callback():
      try:
        bound = self.__queue.get(True) #block until an item is added to the queue
        future = self.__session.execute_async(bound)
        future.add_callbacks(callback, handle_err)
      except:
        self.__started.clear()

    self.__started.set()
    future = self.__session.execute_async(bound_statement)
    future.add_callbacks(callback, handle_err)

  def _enqueue(self, bound_statement):
    self.__queue.put(bound_statement, True)


#Separate insert statement binding from the insertion loop
class InsertEnqueue(object):
  def __init__(self, prepared_query, insert, consistency_level=None):
    self.__statement = prepared_query
    self.__level = consistency_level
    self.__sink = insert

  def insert(self, *args):
    bound = self.bind(*args)
    self.__sink.insert(bound)

  @property
  def consistency_level(self):
    return self.__level or self.__statement.consistency_level

  @consistency_level.setter
  def adjust_level(self, value):
    if value:
      self.__level = value

  def bind(self, *args):
    bound = self.__statement.bind(*args)
    bound.consistency_level = self.consistency_level

    return bound

Combination of a Queue and an Event to trigger things. Assuming that write can happen "eventually" this should work.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top