¿Cuáles son algunas oportunidades para mejorar el rendimiento/concurrencia en el siguiente código Scala + Akka?
-
14-11-2019 - |
Pregunta
Estoy buscando oportunidades para aumentar la concurrencia y el rendimiento en mi código Scala 2.9/Akka 2.0 RC2.Dado el siguiente código:
import akka.actor._
case class DataDelivery(data:Double)
class ComputeActor extends Actor {
var buffer = scala.collection.mutable.ArrayBuffer[Double]()
val functionsToCompute = List("f1","f2","f3","f4","f5")
var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()
functionMap += {"f1" -> f1}
functionMap += {"f2" -> f2}
functionMap += {"f3" -> f3}
functionMap += {"f4" -> f4}
functionMap += {"f5" -> f5}
def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
buffer += data
buffer
}
def f1(map:Map[String,Any]):Double = {
// println("hello from f1")
0.0
}
def f2(map:Map[String,Any]):Double = {
// println("hello from f2")
0.0
}
def f3(map:Map[String,Any]):Double = {
// println("hello from f3")
0.0
}
def f4(map:Map[String,Any]):Double = {
// println("hello from f4")
0.0
}
def f5(map:Map[String,Any]):Double = {
// println("hello from f5")
0.0
}
def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
var map = Map[String,Double]()
try {
functionsToCompute.foreach(function => {
val value = functionMap(function)
function match {
case "f1" =>
var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
map += {function -> v}
case "f2" =>
var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
map += {function -> v}
case "f3" =>
var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
map += {function -> v}
case "f4" =>
var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
map += {function -> v}
case "f5" =>
var v = value(Map("buffer"->immutableBuffer))
map += {function -> v}
case _ =>
println(this.unhandled())
}
})
} catch {
case ex: Exception =>
ex.printStackTrace()
}
map
}
def receive = {
case DataDelivery(data) =>
val startTime = System.nanoTime()/1000
val answers = computeValues(updateData(data))
val endTime = System.nanoTime()/1000
val elapsedTime = endTime - startTime
println("elapsed time is " + elapsedTime)
// reply or forward
case msg =>
println("msg is " + msg)
}
}
object Test {
def main(args:Array[String]) {
val system = ActorSystem("actorSystem")
val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
var i = 0
while (i < 1000) {
computeActor ! DataDelivery(i.toDouble)
i += 1
}
}
}
Cuando ejecuto esto, la salida (convertida a microsegundos) es
elapsed time is 4898
elapsed time is 184
elapsed time is 144
.
.
.
elapsed time is 109
elapsed time is 103
Puede ver cómo se activa el compilador incremental de la JVM.
Pensé que una victoria rápida podría ser cambiar
functionsToCompute.foreach(function => {
a
functionsToCompute.par.foreach(function => {
pero esto da como resultado los siguientes tiempos transcurridos
elapsed time is 31689
elapsed time is 4874
elapsed time is 622
.
.
.
elapsed time is 698
elapsed time is 2171
Alguna información:
1) Estoy ejecutando esto en una Macbook Pro con 2 núcleos.
2) En la versión completa, las funciones son operaciones de larga duración que recorren partes del búfer compartido mutable.Esto no parece ser un problema ya que la recuperación de mensajes del buzón del actor controla el flujo, pero sospecho que podría ser un problema con una mayor concurrencia.Es por eso que me convertí a IndexedSeq.
3) En la versión completa, la lista funcionesToCompute puede variar, por lo que no todos los elementos en el mapa de funciones se llaman necesariamente (es decir, funciónMap.size puede ser mucho más grande que funcionesToCompute.size
4) Las funciones se pueden calcular en paralelo, pero el mapa resultante debe estar completo antes de regresar
Algunas preguntas:
1) ¿Qué puedo hacer para que la versión paralela se ejecute más rápido?
2) ¿Dónde tendría sentido agregar futuros bloqueantes y sin bloqueo?
3) ¿Dónde tendría sentido reenviar el cálculo a otro actor?
4) ¿Cuáles son algunas oportunidades para aumentar la inmutabilidad/seguridad?
Gracias Bruce
Solución
Proporcionando un ejemplo, según lo solicitado (perdón por el retraso...No tengo notificaciones activadas para SO).
Hay un gran ejemplo en la documentación de Akka. Sección sobre 'Composición de futuros' pero te daré algo un poco más adaptado a tu situación.
Ahora, después de leer esto, tómate un tiempo para leer los tutoriales y documentos en el sitio web de Akka.Le falta mucha información clave que esos documentos le proporcionarán.
import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors
object Main {
// This just makes the example work. You probably have enough context
// set up already to not need these next two lines
val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)
// I'm simulating your function. It just has to return a tuple, I believe
// with a String and a Double
def theFunction(s: String, d: Double) = (s, d)
def main(args: Array[String]) {
// Here we run your functions - I'm just doing a thousand of them
// for fun. You do what yo need to do
val listOfFutures = (1 to 1000) map { i =>
// Run them in parallel in the future
Future {
theFunction(i.toString, i.toDouble)
}
}
// These lines can be composed better, but breaking them up should
// be more illustrative.
//
// Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
// Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
val futureOfResults = Future.sequence(listOfFutures)
// Convert that future into another future that contains a map instead
// instead of a sequence
val intermediate = futureOfResults map { _.toList.toMap }
// Wait for it complete. Ideally you don't do this. Continue to
// transform the future into other forms or use pipeTo() to get it to go
// as a result to some other Actor. "Await" is really just evil... the
// only place you should really use it is in silly programs like this or
// some other special purpose app.
val resultingMap = Await.result(intermediate, 1 second)
println(resultingMap)
// Again, just to make the example work
pool.shutdown()
}
}
Todo lo que necesitas en tu classpath para que esto funcione es el akka-actor
frasco.El sitio web de Akka le dirá cómo configurar lo que necesita, pero es realmente muy sencillo.