Pregunta

Como en mi propia respuesta a mi propia pregunta , tengo la situación en la que estoy procesando una gran cantidad de eventos que llegan a una cola. Cada evento se maneja exactamente de la misma manera e incluso se puede manejar independientemente de todos los demás eventos.

Mi programa aprovecha el marco de concurrencia de Scala y muchos de los procesos involucrados se modelan como Actor s. A medida que los Actores procesan sus mensajes secuencialmente, no se adaptan bien a este problema en particular (aunque mis otros actores están realizando acciones que son secuencial). Como quiero que Scala "controle" toda la creación de subprocesos (que supongo que es el punto de tener un sistema de concurrencia en primer lugar) parece que tengo 2 opciones:

  1. Enviar los eventos a un grupo de procesadores de eventos, que yo controlo
  2. obtener mi Actor para procesarlos simultáneamente por algún otro mecanismo

Pensé que # 1 niega el punto de usar el subsistema de actores: ¿cuántos procesadores debo crear? es una pregunta obvia. Estas cosas supuestamente están ocultas para mí y resueltas por el subsistema.

Mi respuesta fue hacer lo siguiente:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

¿Hay un mejor enfoque? ¿Es esto incorrecto?

editar: Un enfoque posiblemente mejor es:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
¿Fue útil?

Solución

Esto parece un duplicado de otra pregunta. Entonces duplicaré mi respuesta

Los actores procesan un mensaje a la vez. El patrón clásico para procesar múltiples mensajes es tener un coordinador de frente de actor para un grupo de actores de consumo. Si usa reaccionar, el grupo de consumidores puede ser grande pero solo usará una pequeña cantidad de subprocesos JVM. Aquí hay un ejemplo en el que creo un grupo de 10 consumidores y un coordinador para que los presente.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

Este código prueba para ver qué consumidor está disponible y envía una solicitud a ese consumidor. Las alternativas son simplemente asignar aleatoriamente a los consumidores o utilizar un programador de turnos.

Dependiendo de lo que esté haciendo, podría ser mejor servido con Scala's Futures. Por ejemplo, si realmente no necesita actores, toda la maquinaria anterior podría escribirse como

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

Otros consejos

Si todos los eventos se pueden manejar de forma independiente, ¿por qué están en una cola? Sin saber nada más sobre su diseño, esto parece un paso innecesario. Si pudiera componer la función process con lo que sea que active esos eventos, podría obviar la cola.

Un actor es esencialmente un efecto concurrente equipado con una cola. Si desea procesar varios mensajes simultáneamente, realmente no quiere un actor. Solo desea que una función (Any = > ()) se programe para su ejecución en algún momento conveniente.

Dicho esto, su enfoque es razonable si desea permanecer dentro de la biblioteca de actores y si la cola de eventos no está bajo su control.

Scalaz hace una distinción entre Actores y Efectos concurrentes. Si bien su Actor es muy liviano, scalaz.concurrent.Effect es aún más ligero. Aquí está su código traducido aproximadamente a la biblioteca Scalaz:

val eventProcessor = effect (x => process x)

Esto es con la última cabeza del tronco, aún no lanzada.

Esto suena como un simple problema de consumidor / productor. Usaría una cola con un grupo de consumidores. Probablemente podría escribir esto con unas pocas líneas de código usando java.util.concurrent.

El propósito de un actor (bueno, uno de ellos) es garantizar que solo se pueda acceder al estado dentro del actor mediante un solo hilo a la vez. Si el procesamiento de un mensaje no depende de ningún estado mutable dentro del actor, entonces probablemente sería más apropiado enviar una tarea a un planificador o un grupo de subprocesos para procesar. La abstracción adicional que proporciona el actor se está interponiendo en tu camino.

Existen métodos convenientes en scala.actors.Scheduler para esto, o puede usar un ejecutor de java.util.concurrent.

Los actores son mucho más livianos que los hilos y, como tal, otra opción es usar objetos de actor como objetos Runnable que estás acostumbrado a enviar a un grupo de subprocesos. La principal diferencia es que no necesita preocuparse por ThreadPool: el grupo de subprocesos lo gestiona el actor framework y es principalmente un problema de configuración.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Luego, para enviar un mensaje, diga esto:

submit(new MyEvent(x))

, que corresponde a

eventProcessor ! new MyEvent(x)

de tu pregunta.

Probó este patrón con éxito con 1 millón de mensajes enviados y recibidos en aproximadamente 10 segundos en una computadora portátil i7 de cuatro núcleos.

Espero que esto ayude.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top