Frage

Wie in meiner eigenen Antwort auf meine eigene Frage , ich habe die Situation, in der ich eine große Anzahl von Ereignissen bin die Verarbeitung, die in einer Warteschlange ankommen. Jedes Ereignis wird in genau der gleichen Weise behandelt, und jeder kann auch unabhängig von allen anderen Veranstaltungen behandelt werden.

Mein Programm nutzt die Scala Concurrency-Framework und viele der beteiligten Prozesse werden als Actors modelliert. Als Actors ihre Nachrichten nacheinander verarbeiten, sie sind nicht gut geeignet für dieses spezielle Problem (obwohl meine andere Akteure der Durchführung von Aktionen, die sind sequentiell). Wie ich Scala zu „kontrollieren“ dem ganzen Schöpfung Thread will (was ich davon ausgehen, ist der Punkt, der es eine Gleichzeitigkeit System an erster Stelle hat) es scheint, ich habe zwei Möglichkeiten:

  1. Senden Sie die Ereignisse zu einem Pool von Event-Prozessoren, die ich steuern
  2. bekommen meine Actor sie durch einen anderen Mechanismus gleichzeitig zu verarbeiten

Ich habe gedacht, dass # 1 den Punkt mit dem Schauspieler Subsystem negiert: wie viele Prozessor Akteure soll ich schaffen eine offensichtliche Frage zu sein. Diese Dinge sind angeblich von mir verborgen und gelöst durch das Subsystem.

Meine Antwort war folgendes zu tun:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Gibt es einen besseren Ansatz? Ist das falsch?

edit: Ein möglicherweise besserer Ansatz ist:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
War es hilfreich?

Lösung

Dies scheint ein Duplikat einer anderen Frage. Also werde ich meine Antwort duplizieren

Schauspieler verarbeiten eine Nachricht zu einer Zeit. Das klassische Muster mehr Nachrichten zu verarbeiten, ist einen Koordinator Schauspieler Front für einen Pool von Verbrauchern Akteure zu haben. Wenn Sie reagieren verwenden dann kann der Verbraucher Pool groß sein, sondern nur noch eine geringe Anzahl von JVM-Threads verwenden. Hier ist ein Beispiel, wo ich einen Pool von 10 Verbrauchern und einem Koordinator nach vorne für sie erstellen.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

Dieser Code prüft, um zu sehen, welche Verbraucher verfügbar ist und sendet eine Anfrage an diesen Verbraucher. Alternativen sind nur zufällig an den Verbraucher zuweisen oder einen Round-Robin-Scheduler zu verwenden.

Je nachdem, was Sie tun, werden Sie vielleicht besser mit Scala Futures serviert. Wenn Sie zum Beispiel nicht wirklich Akteure müssen dann alle der oben genannten Maschinen könnten als

geschrieben
import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

Andere Tipps

Wenn die Ereignisse alle unabhängig gehandhabt werden können, warum sind sie in einer Warteschlange? Zu wissen, nichts anderes über Ihr Design, scheint dies wie ein unnötiger Schritt. Wenn Sie die Funktion process mit komponieren könnte, was diese Ereignisse feuert, könnten Sie möglicherweise die Warteschlange umgehen.

Ein Akteur im Wesentlichen eine gleichzeitige Wirkung mit einer Warteschlange ausgestattet. Wenn Sie mehrere Nachrichten gleichzeitig verarbeiten möchten, wollen Sie nicht wirklich ein Schauspieler. Sie wollen einfach nur eine Funktion (Alle => ()) zur Ausführung an einem geeigneten Zeitpunkt eingeplant werden.

Having said that, Ihr Ansatz ist vernünftig , wenn Sie innerhalb des Schauspielers Bibliothek bleiben und wenn die Ereigniswarteschlange ist nicht in Ihrer Kontrolle.

Scalaz macht einen Unterschied zwischen Schauspielern und gleichzeitigen Effekten. Während seines Actor sehr leicht ist, ist scalaz.concurrent.Effect noch leichter. Hier ist der Code in etwa die Scalaz Bibliothek übersetzt:

val eventProcessor = effect (x => process x)

Das mit dem neuesten Stamm Kopf ist noch nicht freigegeben.

Das klingt wie ein einfaches Verbraucher / Produzenten Problem. Ich würde eine Warteschlange mit einem Pool von Verbrauchern nutzen. Sie könnten wahrscheinlich mit java.util.concurrent dies mit ein paar Zeilen Code schreiben.

Der Zweck eines Darsteller (gut, einer von ihnen) ist, um sicherzustellen, dass der Zustand innerhalb des Schauspielers kann nur von einem einzigen Faden zu einer Zeit zugegriffen werden. Wenn die Verarbeitung einer Nachricht nicht innerhalb der Schauspieler auf jeden wandelbar Staat abhängt, dann wäre es wahrscheinlich besser geeignet sein, nur eine Aufgabe zu einem Scheduler oder einen Threadpool einreichen zu verarbeiten. Die zusätzliche Abstraktion, die der Schauspieler sieht tatsächlich im Weg.

Es gibt praktische Möglichkeiten in scala.actors.Scheduler für diese, oder man könnte einen Executor von java.util.concurrent verwenden.

Schauspieler sind viel leichter als Fäden, und als solche eine andere Option ist, sind Sie Schauspieler Objekte wie Runnable Objekte zu verwenden, verwendet, um einen Thread-Pool zu senden. Der wesentliche Unterschied ist, dass Sie nicht brauchen, um über den Thread Sorge -. Der Thread-Pool ist für Sie von dem Schauspieler Rahmen verwaltet und ist vor allem einer Konfiguration betreffen

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Dann eine Nachricht zu übermitteln, sagen diese:

submit(new MyEvent(x))

, die dem entspricht,

eventProcessor ! new MyEvent(x)

aus Ihrer Frage.

Getestet dieses Muster erfolgreich mit 1 Million Nachrichten in etwa 10 Sekunden auf einem Quad-Core-i7 Laptop gesendet und empfangen werden.

Hope, das hilft.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top