Quali sono alcune opportunità migliorano le prestazioni / concorrenza nel seguente codice Scala + Akka?

StackOverflow https://stackoverflow.com/questions/9503658

Domanda

Sto cercando opportunità per aumentare la concorrenza e le prestazioni nel mio codice RC2 Scala 2.9 / Akka 2.0. Dato il seguente codice:

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}
.

Quando eseguo questo l'uscita (convertita in microsecondi) è

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103
.

Puoi vedere il compilatore incrementale di JVM a calci.

Pensavo che una vittoria veloce potrebbe essere quella di cambiare

    functionsToCompute.foreach(function => {
.

a

    functionsToCompute.par.foreach(function => {
.

Ma questo risulta nei seguenti tempi trascorsi

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171
.

Alcune informazioni:

1) Lo sto eseguendo su un MacBook Pro con 2 core.

2) Nella versione completa, le funzioni sono le operazioni di funzionamento lunghe che loop su porzioni del buffer condiviso mutabile. Questo non sembra essere un problema poiché il recupero dei messaggi dalla cassetta postale dell'attore sta controllando il flusso, ma sospetto che potrebbe essere un problema con una maggiore concorrenza. Questo è il motivo per cui ho convertito in un indicizzatoQ.

3) Nella versione completa, l'elenco FunctionStocompute può variare, in modo che non tutti gli elementi nella funzione PARTIONMap siano necessariamente chiamati (I.E.) FunzioneMap.Size potrebbe essere molto più grande di funzionaleStocompute.size

4) Le funzioni possono essere calcolate in parallelo, ma la mappa risultante deve essere completa prima di restituire

Alcune domande:

1) Cosa posso fare per far funzionare la versione parallela più veloce?

2) Dove avrebbe senso aggiungere futures non bloccanti e bloccando?

3) Dove avrebbe senso inoltrare il calcolo a un altro attore?

4) Quali sono alcune opportunità per aumentare l'immutabilità / sicurezza?

Grazie, Bruce

È stato utile?

Soluzione

Fornendo un esempio, come richiesto (mi dispiace per il ritardo ... non ho notifiche per così).

C'è un ottimo esempio nella documentazione Akka Sezione su" Composizione dei futures " Ma ti darò qualcosa un po 'più personalizzato per la tua situazione.

Ora, dopo aver letto questo, prenditi un po 'di tempo per leggere i tutorial e i documenti sul sito web di Akka.Ti mancano molte informazioni chiave che quei documenti ti forniranno.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}
.

Tutto ciò di cui hai bisogno nel tuo classpath per ottenere questo funzionamento è il barattolo akka-actor.Il sito Web Akka ti dirà come impostare ciò di cui hai bisogno, ma è davvero morto semplice.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top