Quelles sont les opportunités d'améliorer les performances/la concurrence dans le code Scala + Akka suivant ?

StackOverflow https://stackoverflow.com/questions/9503658

Question

Je recherche des opportunités pour augmenter la concurrence et les performances dans mon code Scala 2.9 / Akka 2.0 RC2.Étant donné le code suivant :

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}

Lorsque je l'exécute, la sortie (convertie en microsecondes) est

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103

Vous pouvez voir le compilateur incrémentiel de la JVM démarrer.

Je pensais qu'une victoire rapide pourrait être de changer

    functionsToCompute.foreach(function => {

à

    functionsToCompute.par.foreach(function => {

mais cela donne les temps écoulés suivants

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171

Quelques informations:

1) Je l'utilise sur un Macbook Pro avec 2 cœurs.

2) Dans la version complète, les fonctions sont des opérations de longue durée qui bouclent sur des parties du tampon partagé mutable.Cela ne semble pas être un problème puisque la récupération des messages de la boîte aux lettres de l'acteur contrôle le flux, mais je soupçonne que cela pourrait être un problème avec une concurrence accrue.C'est pourquoi j'ai converti en IndexedSeq.

3) Dans la version complète, la liste FunctionsToCompute peut varier, de sorte que tous les éléments de FunctionMap ne sont pas nécessairement appelés (c'est-à-dire que FunctionMap.size peut être beaucoup plus grand que FunctionsToCompute.size

4) Les fonctions peuvent être calculées en parallèle, mais la carte résultante doit être complète avant de revenir

Quelques questions:

1) Que puis-je faire pour que la version parallèle s'exécute plus rapidement ?

2) Où serait-il judicieux d’ajouter des contrats à terme non bloquants et bloquants ?

3) Où serait-il judicieux de transmettre les calculs à un autre acteur ?

4) Quelles sont les opportunités pour accroître l'immuabilité/la sécurité ?

Merci, Bruce

Était-ce utile?

La solution

Fournir un exemple, comme demandé (désolé pour le retard...Je n'ai pas de notifications pour SO).

Il y a un excellent exemple dans la documentation Akka Section sur « Composer des futurs » mais je vais vous donner quelque chose d'un peu plus adapté à votre situation.

Maintenant, après avoir lu ceci, prenez le temps de lire les tutoriels et les documents sur le site Web d'Akka.Il vous manque de nombreuses informations clés que ces documents vous fourniront.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}

Tout ce dont vous avez besoin dans votre chemin de classe pour que cela fonctionne est le akka-actor pot.Le site Web d'Akka vous expliquera comment configurer ce dont vous avez besoin, mais c'est très simple.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top