Abhängigkeit zwischen Operationen in scala Akteure
Frage
Ich versuche, einen Code mit scala Schauspieler parallelisieren. Das ist mein erster richtiger Code mit Schauspielern, aber ich habe einige Erfahrung mit Java Mulithreading und MPI in C. Aber ich völlig verloren bin.
Der Workflow I realize will, ist eine Ringleitung und kann wie folgt beschrieben werden:
- Jeder Arbeiter Schauspieler einen Verweis auf eine andere hat, so bilden einen Kreis
- Es ist ein Koordinator Schauspieler, die eine Berechnung durch das Senden einer Nachricht
StartWork()
auslösen können - Wenn ein Arbeiter eine
StartWork()
Nachricht empfängt, verarbeitet es einige Sachen lokal und sendetDoWork(...)
Nachricht an seine Nachbarn im Kreis. - Die Nachbarn tun einige andere Sachen und sendet eine
DoWork(...)
Nachricht an seinen eigenen Nachbarn machen. - Dies setzt sich fort, bis der erste Arbeiter eine
DoWork()
Nachricht empfängt. - Der Koordinator eine
GetResult()
Nachricht an den ursprünglichen Arbeiter und wartet auf eine Antwort senden.
Der Punkt ist, dass der Koordinator nur ein Ergebnis erhalten soll, wenn Daten bereit ist. Wie kann ein Arbeiter warten, dass der Auftrag an sie zurückgegeben, bevor die GetResult()
Nachricht zu beantworten?
Berechnung zu beschleunigen, jeder Arbeitnehmer einen StartWork()
jederzeit empfangen können.
Hier ist meine erster Versuch pseudo-Implementierung des Arbeitnehmers:
class Worker( neighbor: Worker, numWorkers: Int ) {
var ready = Foo()
def act() {
case StartWork() => {
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
}
case DoWork( resultData, remaining ) => if( remaining == 0 ) {
ready = resultData
} else {
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
}
case GetResult() => reply( ready )
}
}
Auf dem Koordinator Seite:
worker ! StartWork()
val result = worker !? GetResult() // should wait
Lösung
Zuerst Sie eindeutig eine Kennung haben müssen, was ein einzelnes Stück Arbeit bildet, so dass die GetResult
das richtige Ergebnis bekommen. Ich denke, die offensichtliche Lösung, um Ihre Schauspieler ist ein Map
der Ergebnisse halten haben und eine Map
jeder Warte Getter :
class Worker( neighbor: Worker, numWorkers: Int ) {
var res: Map[Long, Result] = Map.empty
var gets: Map[Long, OutputChannel[Any]] = Map.empty
def act() {
...
case DoWork( id, resultData, remaining ) if remaining == 0 =>
res += (id -> resultData)
gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
gets -= id //clear out getter map now?
case GetResult(id) if res.isDefinedAt(d) => //result is ready
reply (res(id))
case GetResult(id) => //no result ready
gets += (id -> sender)
}
}
Hinweis: die Verwendung von if
in der Anpassungsbedingung Nachricht macht die Verarbeitung etwas klarer
Andere Tipps
Eine Alternative wäre dies:
class Worker( neighbor: Worker, numWorkers: Int ) {
var ready = Foo()
def act() {
case StartWork() => {
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
}
case DoWork( resultData, remaining ) => if( remaining == 0 ) {
ready = resultData
react {
case GetResult() => reply( ready )
}
} else {
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
}
}
}
Nachdem die Arbeit beendet hat, wird diese Arbeiter stecken, bis er die GetResult
Nachricht empfängt. Auf der anderen Seite kann der Koordinator sofort die GetResult
senden, wie es in der Mailbox bleiben, bis der Arbeitgeber dem Arbeitnehmer erhält.