Pregunta

This is a followup to my previous question.

Suppose I have a task, which executes an interruptible blocking call. I would like to run it as a Future and cancel it with failure method of Promise.

I would like the cancel to work as follows:

  • If one cancels the task before it finished I would like the task to finish "immediately", interrupting the blocking call if it has already started and I would like the Future to invoke onFailure.

  • If one cancels the task after the task finished I would like to get a status saying that the cancel failed since the task already finished.

Does it make sense? Is it possible to implement in Scala? Are there any examples of such implementations?

¿Fue útil?

Solución

scala.concurrent.Future is read-only, so one reader cannot mess things up for the other readers.

It seems like you should be able to implement what you want as follows:

def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
  val p = Promise[T]()
  val f = p.future
  p tryCompleteWith Future(fun(f))
  (f, () => p.tryFailure(new CancellationException))
}

val (f, cancel) = cancellableFuture( future => {
  while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag

  result  // when we're done, return some result
})

val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)

Otros consejos

Here is the interruptable version of Victor's code per his comments (Victor, please correct me if I misinterpreted).

object CancellableFuture extends App {

  def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
    val p = Promise[T]()
    val f = p.future
    val aref = new AtomicReference[Thread](null)
    p tryCompleteWith Future {
      val thread = Thread.currentThread
      aref.synchronized { aref.set(thread) }
      try fun() finally {
        val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
        //Deal with interrupted flag of this thread in desired
      }
    }

    (f, () => {
      aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
      p.tryFailure(new CancellationException)
    })
  }

  val (f, cancel) = interruptableFuture[Int] { () =>
    val latch = new CountDownLatch(1)

    latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
    println("latch timed out")

    42  // Completed
  }

  f.onFailure { case ex => println(ex.getClass) }
  f.onSuccess { case i => println(i) }

  Thread.sleep(6000)   // Set to less than 5000 to cancel

  val wasCancelled = cancel()

  println("wasCancelled: " + wasCancelled)
}

With Thread.sleep(6000) the output is:

latch timed out
42
wasCancelled: false

With Thread.sleep(1000) the output is:

wasCancelled: true
class java.util.concurrent.CancellationException

Twitter's futures implement cancellation. Have a look here:

https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

Line 563 shows the abstract method responsible for this. Scala's futures currently do not support cancellation.

You can use Monix library instead of Future

https://monix.io

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top