Question

Comme dans ma propre réponse à ma propre question , je suis dans le cas où je traite un grand nombre d’événements qui arrivent en file d’attente. Chaque événement est traité exactement de la même manière et chaque événement peut être traité indépendamment de tous les autres événements.

Mon programme tire parti de la structure de concurrence de Scala et bon nombre des processus impliqués sont modélisés sous la forme de Acteurs . Comme les Acteurs traitent leurs messages de manière séquentielle, ils ne sont pas adaptés à ce problème particulier (même si mes autres acteurs exécutent des actions qui sont séquentiel). Comme je veux que Scala "contrôle" toute la création de threads (que je suppose est le point d'avoir un système d'accès concurrentiel en premier lieu), il semble que j'ai 2 choix:

  1. Envoyez les événements à un pool de processeurs d'événements que je contrôle
  2. demander à mon acteur de les traiter simultanément par un autre mécanisme

J'aurais pensé que n ° 1 nierait l'intérêt d'utiliser le sous-système des acteurs: combien d'acteurs de processeur dois-je créer? étant une question évidente. Ces choses sont censées être cachées et résolues par le sous-système.

Ma réponse a été la suivante:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Existe-t-il une meilleure approche? Est-ce incorrect?

edit: Une meilleure approche est peut-être:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
Était-ce utile?

La solution

Cela semble être une copie d'une autre question. Donc, je vais dupliquer ma réponse

Les acteurs traitent un message à la fois. Le modèle classique pour traiter plusieurs messages consiste à avoir un coordonnateur avant acteur pour un groupe d’acteurs consommateurs. Si vous utilisez react, le pool de consommateurs peut être volumineux, mais n'utilisera toujours qu'un petit nombre de threads JVM. Voici un exemple où je crée un pool de 10 consommateurs et un coordinateur à leur tête.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

Ce code teste le consommateur disponible et envoie une demande à ce consommateur. Les solutions de rechange consistent simplement à attribuer aux consommateurs de manière aléatoire ou à utiliser un planificateur à tour de rôle.

Selon ce que vous faites, vous serez peut-être mieux servi avec les Futures de Scala. Par exemple, si vous n'avez pas vraiment besoin d'acteurs, tous les mécanismes ci-dessus peuvent être écrits ainsi:

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

Autres conseils

Si les événements peuvent tous être gérés indépendamment, pourquoi sont-ils en file d'attente? Ne sachant rien d'autre sur votre conception, cela semble être une étape inutile. Si vous pouviez composer la fonction process avec tout ce qui déclenche ces événements, vous pourriez potentiellement éviter la file d'attente.

Un acteur est essentiellement un effet simultané doté d’une file d’attente. Si vous souhaitez traiter plusieurs messages simultanément, vous ne voulez pas vraiment un acteur. Vous souhaitez simplement qu'une fonction (Any = > ()) soit planifiée pour être exécutée à un moment opportun.

Cela dit, votre approche est raisonnable si vous souhaitez rester dans la bibliothèque des acteurs et si la file d'attente des événements n'est pas sous votre contrôle.

Scalaz fait la distinction entre les acteurs et les effets concurrents. Bien que son acteur soit très léger, scalaz.concurrent.Effect est encore plus clair. Voici votre code traduit grossièrement dans la bibliothèque Scalaz:

val eventProcessor = effect (x => process x)

Il s’agit de la dernière tête de coffre, non encore publiée.

Cela ressemble à un simple problème de consommateur / producteur. J'utiliserais une file d'attente avec un groupe de consommateurs. Vous pourriez probablement écrire ceci avec quelques lignes de code en utilisant java.util.concurrent.

Le rôle d'un acteur (l'un d'entre eux) est de faire en sorte que l'état au sein de l'acteur ne soit accessible qu'à un seul thread à la fois. Si le traitement d'un message ne dépend d'aucun état mutable au sein de l'acteur, il serait probablement plus approprié de simplement soumettre une tâche à un planificateur ou à un pool de threads à traiter. L'abstraction supplémentaire fournie par l'acteur vous gêne réellement.

Il existe pour cela des méthodes pratiques dans scala.actors.Scheduler, ou vous pouvez utiliser un exécuteur à partir de java.util.concurrent.

Les acteurs sont beaucoup plus légers que les threads et une autre option consiste à utiliser des objets acteur tels que les objets Runnable que vous avez l'habitude de soumettre à un pool de threads. La principale différence est que vous n’avez pas à vous soucier de ThreadPool - le pool de threads est géré pour vous par le cadre de l’acteur et constitue principalement un problème de configuration.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Ensuite, pour envoyer un message, dites ceci:

submit(new MyEvent(x))

, ce qui correspond à

eventProcessor ! new MyEvent(x)

de votre question.

Ce modèle a été testé avec succès avec 1 million de messages envoyés et reçus en environ 10 secondes sur un ordinateur portable quad-core i7.

J'espère que cela vous aidera.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top