Domanda

Come nella mia risposta alla mia domanda , ho la situazione in cui sto elaborando un gran numero di eventi che arrivano in coda. Ogni evento viene gestito esattamente allo stesso modo e ognuno può essere gestito indipendentemente da tutti gli altri eventi.

Il mio programma sfrutta il framework di concorrenza Scala e molti dei processi coinvolti sono modellati come Actor . Poiché Actor elabora i loro messaggi in sequenza, non sono adatti a questo particolare problema (anche se i miei altri attori stanno compiendo azioni che sono sequenziale). Come voglio che Scala controlli " control " tutta la creazione di thread (che presumo sia il punto in cui ha un sistema di concorrenza in primo luogo) sembra che io abbia 2 scelte:

  1. Invia gli eventi a un pool di processori di eventi, che controllo
  2. chiedi al mio attore di elaborarli contemporaneamente con qualche altro meccanismo

Avrei pensato che il n. 1 negasse il punto di usare il sottosistema attori: quanti attori processore avrei dovuto creare? essendo una domanda ovvia. Queste cose sono presumibilmente nascoste da me e risolte dal sottosistema.

La mia risposta è stata la seguente:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Esiste un approccio migliore? È errato?

modifica: un approccio forse migliore è:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
È stato utile?

Soluzione

Sembra un duplicato di un'altra domanda. Quindi duplicherò la mia risposta

Gli attori elaborano un messaggio alla volta. Il modello classico per elaborare più messaggi è quello di avere un attore coordinatore davanti a un pool di attori consumatori. Se si utilizza la reazione, il pool di consumatori può essere grande, ma utilizzerà comunque solo un numero limitato di thread JVM. Ecco un esempio in cui creo un pool di 10 consumatori e un coordinatore per loro.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

Questo codice verifica se il consumatore è disponibile e invia una richiesta a quel consumatore. Le alternative sono semplicemente assegnare casualmente ai consumatori o utilizzare un pianificatore round robin.

A seconda di ciò che stai facendo, potresti essere meglio servito con i Futures di Scala. Ad esempio, se non hai davvero bisogno di attori, tutte le macchine sopra potrebbero essere scritte come

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

Altri suggerimenti

Se tutti gli eventi possono essere gestiti in modo indipendente, perché sono in coda? Non sapendo nient'altro sul tuo design, questo sembra un passaggio inutile. Se potessi comporre la funzione process con qualunque cosa stia attivando quegli eventi, potresti potenzialmente evitare la coda.

Un attore è essenzialmente un effetto concorrente dotato di una coda. Se vuoi elaborare più messaggi contemporaneamente, non vuoi davvero un attore. Vuoi solo che una funzione (Any = > ()) sia programmata per l'esecuzione in un momento opportuno.

Detto questo, il tuo approccio è ragionevole se vuoi rimanere nella biblioteca degli attori e se la coda degli eventi non è sotto il tuo controllo.

Scalaz distingue tra attori ed effetti concorrenti. Mentre il suo Actor è molto leggero, scalaz.concurrent.Effect è ancora più leggero. Ecco il tuo codice tradotto approssimativamente nella libreria Scalaz:

val eventProcessor = effect (x => process x)

Questo è con l'ultima testata del bagagliaio, non ancora rilasciata.

Sembra un semplice problema consumatore / produttore. Userei una coda con un pool di consumatori. Probabilmente potresti scriverlo con poche righe di codice usando java.util.concurrent.

Lo scopo di un attore (beh, uno di loro) è quello di garantire che lo stato all'interno dell'attore sia accessibile solo da un singolo thread alla volta. Se l'elaborazione di un messaggio non dipende da alcuno stato modificabile all'interno dell'attore, probabilmente sarebbe più appropriato inoltrare un'attività a uno scheduler o un pool di thread da elaborare. L'astrazione extra che l'attore fornisce ti sta effettivamente ostacolando.

Ci sono metodi convenienti in scala.actors.Scheduler per questo, oppure potresti usare un Executor da java.util.concurrent.

Gli attori sono molto più leggeri dei thread e, come tale, un'altra opzione è quella di utilizzare oggetti attore come gli oggetti Runnable che siete abituati a inviare a un pool di thread. La differenza principale è che non devi preoccuparti di ThreadPool: il pool di thread è gestito per te dal framework degli attori ed è principalmente un problema di configurazione.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Quindi, per inviare un messaggio, dì questo:

submit(new MyEvent(x))

, che corrisponde a

eventProcessor ! new MyEvent(x)

dalla tua domanda.

Testato questo schema con successo con 1 milione di messaggi inviati e ricevuti in circa 10 secondi su un laptop quad-core i7.

Spero che questo aiuti.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top