Wie trenne und sende ich einen asynchronen Kontrollfluss mit Fortschritten?
-
20-09-2019 - |
Frage
Ich habe einen asynchronen Kontrollfluss wie folgt:
ActorA ! DoA(dataA, callback1, callbackOnErrorA)
def callback1() = {
...
ActorB ! DoB(dataB, callback2, callbackOnErrorB)
}
def callback2() = {
ActorC ! DoC(dataC, callback3, callbackOnErrorC)
}
...
Wie würde ich diesen Fluss in mehrere Teile (Kontinuationen) unterteilen und diese nacheinander an verschiedene Akteure (oder Fäden/Aufgaben) versenden, während der Gesamtzustand beibehalten wird?
Jeder Hinweis geschätzt, danke
Lösung
Dies ist sehr vereinfacht, zeigt jedoch, wie ein einzelner Kontrollfluss zwischen drei Akteuren aufgeteilt wird und den Zustand an jeden weitergibt:
package blevins.example
import scala.continuations._
import scala.continuations.ControlContext._
import scala.actors.Actor._
import scala.actors._
object App extends Application {
val actorA, actorB, actorC = actor {
receive {
case f: Function1[Unit,Unit] => { f() }
}
}
def handle(a: Actor) = shift { k: (Unit=>Unit) =>
a ! k
}
// Control flow to split up
reset {
// this is not handled by any actor
var x = 1
println("a: " + x)
handle(actorA) // actorA handles the below
x += 4
println("b: " + x)
handle(actorB) // then, actorB handles the rest
var y = 2
x += 2
println("c: " + x)
handle(actorC) // and so on...
y += 1
println("d: " + x + ":" + y)
}
}
Andere Tipps
Ich benutze gerne scalaz.concurrent.Promise
. Dieses Beispiel ist nicht genau wie das in Ihrer Frage, aber es gibt Ihnen die Idee.
object Async extends Application {
import scalaz._
import Scalaz._
import concurrent._
import concurrent.strategy._
import java.util.concurrent.{ExecutorService, Executors}
case class ResultA(resultb: ResultB, resulta: ResultC)
case class ResultB()
case class ResultC()
run
def run {
implicit val executor: ExecutorService = Executors.newFixedThreadPool(8)
import Executor.strategy
val promiseA = doA
println("waiting for results")
val a: ResultA = promiseA.get
println("got " + a)
executor.shutdown
}
def doA(implicit s: Strategy[Unit]): Promise[ResultA] = {
println("triggered A")
val b = doB
val c = doC
for {bb <- b; cc <- c} yield ResultA(bb, cc)
}
def doB(implicit s: Strategy[Unit]): Promise[ResultB] = {
println("triggered B")
promise { Thread.sleep(1000); println("returning B"); ResultB() }
}
def doC(implicit s: Strategy[Unit]): Promise[ResultC] = {
println("triggered C")
promise { Thread.sleep(1000); println("returning C"); ResultC() }
}
}
Ausgabe:
triggered A
triggered B
triggered C
waiting for results
returning B
returning C
got ResultA(ResultB(),ResultC())
Sie finden eine Einführung in Scalaz Parallelität in diesem Präsentation aus Runar.
Dieser Ansatz ist nicht so flexibel wie Schauspieler, komponiert aber besser und kann nicht zu Deadlock.
Sehen Akkas Zukunft und wie man sie komponiert oder Scalaz 'Versprechen, Sie sind fast gleich, es gibt nur geringfügige Unterschiede.