Frage

Ich habe einen asynchronen Kontrollfluss wie folgt:

ActorA ! DoA(dataA, callback1, callbackOnErrorA)

def callback1() = {
  ...
  ActorB ! DoB(dataB, callback2, callbackOnErrorB)
}

def callback2() = {
  ActorC ! DoC(dataC, callback3, callbackOnErrorC)
} 

...

Wie würde ich diesen Fluss in mehrere Teile (Kontinuationen) unterteilen und diese nacheinander an verschiedene Akteure (oder Fäden/Aufgaben) versenden, während der Gesamtzustand beibehalten wird?

Jeder Hinweis geschätzt, danke

War es hilfreich?

Lösung

Dies ist sehr vereinfacht, zeigt jedoch, wie ein einzelner Kontrollfluss zwischen drei Akteuren aufgeteilt wird und den Zustand an jeden weitergibt:

package blevins.example

import scala.continuations._
import scala.continuations.ControlContext._
import scala.actors.Actor._
import scala.actors._

object App extends Application {

  val actorA, actorB, actorC = actor {
    receive {
      case f: Function1[Unit,Unit] => { f() }
    }
  }

  def handle(a: Actor) = shift { k: (Unit=>Unit) =>
    a ! k
  }

  // Control flow to split up
  reset {
      // this is not handled by any actor
      var x = 1
      println("a: " + x)

      handle(actorA)  // actorA handles the below
      x += 4
      println("b: " + x)

      handle(actorB) // then, actorB handles the rest
      var y = 2
      x += 2
      println("c: " + x)

      handle(actorC) // and so on...
      y += 1
      println("d: " + x + ":" + y)
  }

}

Andere Tipps

Ich benutze gerne scalaz.concurrent.Promise. Dieses Beispiel ist nicht genau wie das in Ihrer Frage, aber es gibt Ihnen die Idee.

object Async extends Application {
  import scalaz._
  import Scalaz._
  import concurrent._
  import concurrent.strategy._
  import java.util.concurrent.{ExecutorService, Executors}

  case class ResultA(resultb: ResultB, resulta: ResultC)
  case class ResultB()
  case class ResultC()

  run

  def run {
    implicit val executor: ExecutorService = Executors.newFixedThreadPool(8)
    import Executor.strategy

    val promiseA = doA
    println("waiting for results")
    val a: ResultA = promiseA.get
    println("got " + a)
    executor.shutdown    
  }

  def doA(implicit s: Strategy[Unit]): Promise[ResultA] = {
    println("triggered A")
    val b = doB
    val c = doC
    for {bb <- b; cc <- c} yield ResultA(bb, cc)
  }

  def doB(implicit s: Strategy[Unit]): Promise[ResultB] = {
    println("triggered B")
    promise { Thread.sleep(1000); println("returning B"); ResultB() }
  }

  def doC(implicit s: Strategy[Unit]): Promise[ResultC] = {
    println("triggered C")
    promise { Thread.sleep(1000); println("returning C"); ResultC() }
  }
}

Ausgabe:

triggered A
triggered B
triggered C
waiting for results
returning B
returning C
got ResultA(ResultB(),ResultC())

Sie finden eine Einführung in Scalaz Parallelität in diesem Präsentation aus Runar.

Dieser Ansatz ist nicht so flexibel wie Schauspieler, komponiert aber besser und kann nicht zu Deadlock.

Sehen Akkas Zukunft und wie man sie komponiert oder Scalaz 'Versprechen, Sie sind fast gleich, es gibt nur geringfügige Unterschiede.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top