Question

I would like to know if I can use tinkerpop within Akka Futures, so far when I commit my changes to the graph they don't get persisted. I understand tinkerpop is a thread-local library which mean I'd need to set my thread again within the future ODatabaseRecordThreadLocal.INSTANCE.set(thread)

I tried the following method without success :

def test[T](graphChanges: => T): T = {
    val thread = ODatabaseRecordThreadLocal.INSTANCE.get
    try graphChanges finally {
      ODatabaseRecordThreadLocal.INSTANCE.set(thread)
      GraphPool.get("partitioned").commit
    }
}

// collect tinkerpop frames
test {
  future {
  // add changes to my tinkerpop frames
  }
}

I would like to have on Tinkerpop thread per play.mvc.Http.Context

Here is a sample project with what I want to achieve : https://github.com/D-Roch/tinkerpop-play

Was it helpful?

Solution

The problem

The problem is, that Tinkerpop works thread local. So your changes are only committed to the current thread. When creating Scala futures, you let the environment choose, in which thread the future will be executed. And the environment doesn't know better, so it chooses the wrong thread.

The problem is similar for Akka futures.

In which thread does a future run?

When creating a future, you are creating it with two parameters:

  1. The block that should be executed
  2. The Execution Context that should execute the block

The second parameter is usually given as an implicit parameter. But you can override the default.

Solution

When creating futures dealing with Tinkerpop, use an execution context that runs every block in the same thread.

Example:

import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors

implicit val ec=ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)

future { 
    println(Thread.currentThread); 
    future {
        println(Thread.currentThread)
    }  
}

This code prints out the same thread id twice on the console (tested with Java 7 and Scala 2.10.2.

Attention: Using such a small thread pool can easily lead to dead locks or starvation. Use it only for your Tinkerpop interaction.

You can either provide a special method tinkerpopFuture that takes a block as an parameter and returns a future that will run in the tinkerpop thread. Or you can create a special actor which encapsulates all tinkerpop interactions (and runs them with the special tinkerpop exection context).

Literature

OTHER TIPS

This doesn't look like anything specific to Tinkerpop, it looks like a common error made with using Futures. Just consider this fragment:

try graphChanges finally { ... }

It looks fine by itself, but I can also see that graphChanges here is creating a future. So...

  • graphChanges initiates a Future, returning instantly
  • the try block completes and the finally block is executed
  • At some point immediately before this, or after, or maybe in parallel, but almost certainly on another thread, the Future is executed

My advice would be to move the asynchronous logic inside test, so that you can be sure of the correct thread-affinity and ensure that any calls are correctly flagged as blocking. Like this:

def test[T](graphChanges: => T): Future[T] = future {
  blocking {
    val tlocal = ODatabaseRecordThreadLocal.INSTANCE
    val dbrecord = tlocal.get

    try graphChanges finally {
      tlocal.set(dbrecord)
      GraphPool.get("partitioned").commit
    }
  }
}

// collect tinkerpop frames
test {
  // add changes to my tinkerpop frames
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top