Welche Möglichkeiten gibt es, die Leistung/Parallelität im folgenden Scala + Akka-Code zu verbessern?

StackOverflow https://stackoverflow.com/questions/9503658

Frage

Ich suche nach Möglichkeiten, die Parallelität und Leistung in meinem Scala 2.9 / Akka 2.0 RC2-Code zu steigern.Angesichts des folgenden Codes:

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}

Wenn ich dies ausführe, ist die Ausgabe (in Mikrosekunden konvertiert).

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103

Sie können sehen, wie der inkrementelle Compiler der JVM einsetzt.

Ich dachte, dass ein schneller Erfolg darin bestehen könnte, sich zu ändern

    functionsToCompute.foreach(function => {

Zu

    functionsToCompute.par.foreach(function => {

Dies führt jedoch zu den folgenden verstrichenen Zeiten

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171

Einige Infos:

1) Ich verwende dies auf einem Macbook Pro mit 2 Kernen.

2) In der Vollversion handelt es sich bei den Funktionen um lang laufende Vorgänge, die Teile des veränderlichen gemeinsam genutzten Puffers durchlaufen.Dies scheint kein Problem zu sein, da das Abrufen von Nachrichten aus dem Postfach des Akteurs den Nachrichtenfluss steuert, aber ich vermute, dass es sich bei erhöhter Parallelität um ein Problem handeln könnte.Aus diesem Grund habe ich auf eine IndexedSeq umgestellt.

3) In der Vollversion kann die Liste „functionsToCompute“ variieren, sodass nicht unbedingt alle Elemente in der „functionMap“ aufgerufen werden (d. h. „functionMap.size“ kann viel größer sein als „functionsToCompute.size“)

4) Die Funktionen können parallel berechnet werden, aber die resultierende Karte muss vor der Rückgabe vollständig sein

Einige Fragen:

1) Was kann ich tun, damit die Parallelversion schneller läuft?

2) Wo wäre es sinnvoll, nicht-blockierende und blockierende Futures hinzuzufügen?

3) Wo wäre es sinnvoll, die Berechnung an einen anderen Akteur weiterzuleiten?

4) Welche Möglichkeiten gibt es zur Erhöhung der Unveränderlichkeit/Sicherheit?

Danke, Bruce

War es hilfreich?

Lösung

Geben Sie wie gewünscht ein Beispiel an (entschuldigen Sie die Verzögerung ...Ich habe keine Benachrichtigungen für SO aktiviert.

Es gibt ein großartiges Beispiel in der Akka-Dokumentation Abschnitt zum Thema „Zusammenstellen von Futures“ aber ich gebe Ihnen etwas, das etwas besser auf Ihre Situation zugeschnitten ist.

Nachdem Sie dies gelesen haben, nehmen Sie sich bitte etwas Zeit, um die Tutorials und Dokumente auf der Website von Akka durchzulesen.Ihnen fehlen viele wichtige Informationen, die Ihnen diese Dokumente liefern werden.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}

Alles, was Sie in Ihrem Klassenpfad benötigen, um dies zum Laufen zu bringen, ist das akka-actor Krug.Auf der Akka-Website erfahren Sie, wie Sie das einrichten, was Sie benötigen, aber es ist wirklich kinderleicht.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top