Frage

Ich versuche, einen Code mit scala Schauspieler parallelisieren. Das ist mein erster richtiger Code mit Schauspielern, aber ich habe einige Erfahrung mit Java Mulithreading und MPI in C. Aber ich völlig verloren bin.

Der Workflow I realize will, ist eine Ringleitung und kann wie folgt beschrieben werden:

  • Jeder Arbeiter Schauspieler einen Verweis auf eine andere hat, so bilden einen Kreis
  • Es ist ein Koordinator Schauspieler, die eine Berechnung durch das Senden einer Nachricht StartWork() auslösen können
  • Wenn ein Arbeiter eine StartWork() Nachricht empfängt, verarbeitet es einige Sachen lokal und sendet DoWork(...) Nachricht an seine Nachbarn im Kreis.
  • Die Nachbarn tun einige andere Sachen und sendet eine DoWork(...) Nachricht an seinen eigenen Nachbarn machen.
  • Dies setzt sich fort, bis der erste Arbeiter eine DoWork() Nachricht empfängt.
  • Der Koordinator eine GetResult() Nachricht an den ursprünglichen Arbeiter und wartet auf eine Antwort senden.

Der Punkt ist, dass der Koordinator nur ein Ergebnis erhalten soll, wenn Daten bereit ist. Wie kann ein Arbeiter warten, dass der Auftrag an sie zurückgegeben, bevor die GetResult() Nachricht zu beantworten?

Berechnung zu beschleunigen, jeder Arbeitnehmer einen StartWork() jederzeit empfangen können.

Hier ist meine erster Versuch pseudo-Implementierung des Arbeitnehmers:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
    case GetResult() => reply( ready )
  }
}

Auf dem Koordinator Seite:

worker ! StartWork()
val result = worker !? GetResult() // should wait
War es hilfreich?

Lösung

Zuerst Sie eindeutig eine Kennung haben müssen, was ein einzelnes Stück Arbeit bildet, so dass die GetResult das richtige Ergebnis bekommen. Ich denke, die offensichtliche Lösung, um Ihre Schauspieler ist ein Map der Ergebnisse halten haben und eine Map jeder Warte Getter :

class Worker( neighbor: Worker, numWorkers: Int ) {
   var res: Map[Long, Result] = Map.empty
   var gets: Map[Long, OutputChannel[Any]] = Map.empty   
   def act() {
     ...
     case DoWork( id, resultData, remaining ) if remaining == 0 =>
       res += (id -> resultData)
       gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
       gets -= id //clear out getter map now?
     case GetResult(id) if res.isDefinedAt(d) => //result is ready
       reply (res(id))
     case GetResult(id) => //no result ready 
       gets += (id -> sender)
   }
}

Hinweis: die Verwendung von if in der Anpassungsbedingung Nachricht macht die Verarbeitung etwas klarer

Andere Tipps

Eine Alternative wäre dies:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
         react {
           case GetResult() => reply( ready )
         }
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
  }
}

Nachdem die Arbeit beendet hat, wird diese Arbeiter stecken, bis er die GetResult Nachricht empfängt. Auf der anderen Seite kann der Koordinator sofort die GetResult senden, wie es in der Mailbox bleiben, bis der Arbeitgeber dem Arbeitnehmer erhält.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top