Welche Möglichkeiten gibt es, die Leistung/Parallelität im folgenden Scala + Akka-Code zu verbessern?
-
14-11-2019 - |
Frage
Ich suche nach Möglichkeiten, die Parallelität und Leistung in meinem Scala 2.9 / Akka 2.0 RC2-Code zu steigern.Angesichts des folgenden Codes:
import akka.actor._
case class DataDelivery(data:Double)
class ComputeActor extends Actor {
var buffer = scala.collection.mutable.ArrayBuffer[Double]()
val functionsToCompute = List("f1","f2","f3","f4","f5")
var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()
functionMap += {"f1" -> f1}
functionMap += {"f2" -> f2}
functionMap += {"f3" -> f3}
functionMap += {"f4" -> f4}
functionMap += {"f5" -> f5}
def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
buffer += data
buffer
}
def f1(map:Map[String,Any]):Double = {
// println("hello from f1")
0.0
}
def f2(map:Map[String,Any]):Double = {
// println("hello from f2")
0.0
}
def f3(map:Map[String,Any]):Double = {
// println("hello from f3")
0.0
}
def f4(map:Map[String,Any]):Double = {
// println("hello from f4")
0.0
}
def f5(map:Map[String,Any]):Double = {
// println("hello from f5")
0.0
}
def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
var map = Map[String,Double]()
try {
functionsToCompute.foreach(function => {
val value = functionMap(function)
function match {
case "f1" =>
var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
map += {function -> v}
case "f2" =>
var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
map += {function -> v}
case "f3" =>
var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
map += {function -> v}
case "f4" =>
var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
map += {function -> v}
case "f5" =>
var v = value(Map("buffer"->immutableBuffer))
map += {function -> v}
case _ =>
println(this.unhandled())
}
})
} catch {
case ex: Exception =>
ex.printStackTrace()
}
map
}
def receive = {
case DataDelivery(data) =>
val startTime = System.nanoTime()/1000
val answers = computeValues(updateData(data))
val endTime = System.nanoTime()/1000
val elapsedTime = endTime - startTime
println("elapsed time is " + elapsedTime)
// reply or forward
case msg =>
println("msg is " + msg)
}
}
object Test {
def main(args:Array[String]) {
val system = ActorSystem("actorSystem")
val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
var i = 0
while (i < 1000) {
computeActor ! DataDelivery(i.toDouble)
i += 1
}
}
}
Wenn ich dies ausführe, ist die Ausgabe (in Mikrosekunden konvertiert).
elapsed time is 4898
elapsed time is 184
elapsed time is 144
.
.
.
elapsed time is 109
elapsed time is 103
Sie können sehen, wie der inkrementelle Compiler der JVM einsetzt.
Ich dachte, dass ein schneller Erfolg darin bestehen könnte, sich zu ändern
functionsToCompute.foreach(function => {
Zu
functionsToCompute.par.foreach(function => {
Dies führt jedoch zu den folgenden verstrichenen Zeiten
elapsed time is 31689
elapsed time is 4874
elapsed time is 622
.
.
.
elapsed time is 698
elapsed time is 2171
Einige Infos:
1) Ich verwende dies auf einem Macbook Pro mit 2 Kernen.
2) In der Vollversion handelt es sich bei den Funktionen um lang laufende Vorgänge, die Teile des veränderlichen gemeinsam genutzten Puffers durchlaufen.Dies scheint kein Problem zu sein, da das Abrufen von Nachrichten aus dem Postfach des Akteurs den Nachrichtenfluss steuert, aber ich vermute, dass es sich bei erhöhter Parallelität um ein Problem handeln könnte.Aus diesem Grund habe ich auf eine IndexedSeq umgestellt.
3) In der Vollversion kann die Liste „functionsToCompute“ variieren, sodass nicht unbedingt alle Elemente in der „functionMap“ aufgerufen werden (d. h. „functionMap.size“ kann viel größer sein als „functionsToCompute.size“)
4) Die Funktionen können parallel berechnet werden, aber die resultierende Karte muss vor der Rückgabe vollständig sein
Einige Fragen:
1) Was kann ich tun, damit die Parallelversion schneller läuft?
2) Wo wäre es sinnvoll, nicht-blockierende und blockierende Futures hinzuzufügen?
3) Wo wäre es sinnvoll, die Berechnung an einen anderen Akteur weiterzuleiten?
4) Welche Möglichkeiten gibt es zur Erhöhung der Unveränderlichkeit/Sicherheit?
Danke, Bruce
Lösung
Geben Sie wie gewünscht ein Beispiel an (entschuldigen Sie die Verzögerung ...Ich habe keine Benachrichtigungen für SO aktiviert.
Es gibt ein großartiges Beispiel in der Akka-Dokumentation Abschnitt zum Thema „Zusammenstellen von Futures“ aber ich gebe Ihnen etwas, das etwas besser auf Ihre Situation zugeschnitten ist.
Nachdem Sie dies gelesen haben, nehmen Sie sich bitte etwas Zeit, um die Tutorials und Dokumente auf der Website von Akka durchzulesen.Ihnen fehlen viele wichtige Informationen, die Ihnen diese Dokumente liefern werden.
import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors
object Main {
// This just makes the example work. You probably have enough context
// set up already to not need these next two lines
val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)
// I'm simulating your function. It just has to return a tuple, I believe
// with a String and a Double
def theFunction(s: String, d: Double) = (s, d)
def main(args: Array[String]) {
// Here we run your functions - I'm just doing a thousand of them
// for fun. You do what yo need to do
val listOfFutures = (1 to 1000) map { i =>
// Run them in parallel in the future
Future {
theFunction(i.toString, i.toDouble)
}
}
// These lines can be composed better, but breaking them up should
// be more illustrative.
//
// Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
// Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
val futureOfResults = Future.sequence(listOfFutures)
// Convert that future into another future that contains a map instead
// instead of a sequence
val intermediate = futureOfResults map { _.toList.toMap }
// Wait for it complete. Ideally you don't do this. Continue to
// transform the future into other forms or use pipeTo() to get it to go
// as a result to some other Actor. "Await" is really just evil... the
// only place you should really use it is in silly programs like this or
// some other special purpose app.
val resultingMap = Await.result(intermediate, 1 second)
println(resultingMap)
// Again, just to make the example work
pool.shutdown()
}
}
Alles, was Sie in Ihrem Klassenpfad benötigen, um dies zum Laufen zu bringen, ist das akka-actor
Krug.Auf der Akka-Website erfahren Sie, wie Sie das einrichten, was Sie benötigen, aber es ist wirklich kinderleicht.