Domanda

This answer instructs how to convert java.util.concurrent.Future into scala.concurrent.Future, while managing where the blocking will occur:

import java.util.concurrent.{Future => JFuture}
import scala.concurrent.{Future => SFuture}

val jfuture: JFuture[T] = ???
val promise = Promise[T]()
new Thread(
  new Runnable {
    def run() { promise.complete(Try{ jfuture.get }) }
  }
).start
val future = promise.future

My queston is the same as a question asked in the comments:

what's wrong with future { jfuture.get }? Why you used an extra thread combined with Promise?

It was answered as follows:

it'll block thread in your thread pull. If you have a configured ExecutionContext for such futures it's fine, but default ExecutionContext contains as many threads as you have processors.

I'm not sure I understand the explanation. To reiterate:

What's wrong with future { jfuture.get }? Isn't blocking inside a future the same as manually creating a new Thread and blocking there? If not, how is it different?

È stato utile?

Soluzione

There is almost no difference between future { jfuture.get } and future { future { jfuture.get }}.

There are as many treads in default thread pool as many you have processors.

With jfuture.get you'll get 1 thread blocked.

Let's assume you have 8 processors. Also let's suppose each jfuture.get takes 10 seconds. Now create 8 future { jfuture.get }.

val format = new java.text.SimpleDateFormat("HH:mm:ss").format(_: Date)

val startTime = new Date
(1 to 8) map {_ => future{ Thread.sleep(10000) }}
future{
  2+2
  println(s"2+2 done. Start time: ${format(startTime)}, end time: ${format(new Date)}")
}

// 2+2 done. Start time: 20:48:18, end time: 20:48:28

10 seconds is a little too long for 2+2 evaluation.

All other futures and all actors on the same execution context will be stopped for 10 seconds.

With additional execution context:

object BlockingExecution {
  val executor = ExecutionContext.fromExecutor(new ForkJoinPool(20))
}

def blockingFuture[T](f: => T) = {
  future( f )(BlockingExecution.executor)
}

val startTime = new Date
(1 to 8) map {_ => blockingFuture{ Thread.sleep(10000) }}
future{
  2+2
  println(s"2+2 done. Start time: ${format(startTime)}, end time: ${format(new Date)}")
}

// 2+2 done. Start time: 21:26:18, end time: 21:26:18

You could implement blockingFuture using new Thread(new Runnable {..., but additional execution context allows you to limit threads count.

Altri suggerimenti

It's actually quite simple. scala.concurrent.Promise is a concrete implementation of a Future, destined to be an asynchronous computation.

When you want to convert, with jfuture.get, you are running a blocking computation and outputting an immediately resolved scala.concurrent.Future.

The Thread will block until the computation inside jfuture is complete. The get method is blocking.

Blocking means nothing else will happen inside that Thread until the computation is complete. You are essentially monopolising the Thread with something that looks like a while loop intermittently checking for results.

while (!isDone() && !timeout) {
   // check if the computation is complete
}

Specifically:

val jfuture: JFuture[T] = ??? // some blocking task

When blocking cannot be avoided, the common practice is to spawn a new Thread and a new Runnable or new Callable to allow the computation to execute/monopolize a child thread.

In the example @senia gave:

new Thread(new Runnable { def run() {
  promise.complete(Try{ jfuture.get })
}}).start

How is this different than future {jfuture.get}? It doesn't block your default ExecutionContext, provided by Scala, which has as many threads as the processors of the machine.

That would mean all other futures in your code will always have to wait for future { jfuture.get } to complete, since the entire context is blocked.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top