¿Cuáles son algunas oportunidades para mejorar el rendimiento/concurrencia en el siguiente código Scala + Akka?

StackOverflow https://stackoverflow.com/questions/9503658

Pregunta

Estoy buscando oportunidades para aumentar la concurrencia y el rendimiento en mi código Scala 2.9/Akka 2.0 RC2.Dado el siguiente código:

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}

Cuando ejecuto esto, la salida (convertida a microsegundos) es

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103

Puede ver cómo se activa el compilador incremental de la JVM.

Pensé que una victoria rápida podría ser cambiar

    functionsToCompute.foreach(function => {

a

    functionsToCompute.par.foreach(function => {

pero esto da como resultado los siguientes tiempos transcurridos

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171

Alguna información:

1) Estoy ejecutando esto en una Macbook Pro con 2 núcleos.

2) En la versión completa, las funciones son operaciones de larga duración que recorren partes del búfer compartido mutable.Esto no parece ser un problema ya que la recuperación de mensajes del buzón del actor controla el flujo, pero sospecho que podría ser un problema con una mayor concurrencia.Es por eso que me convertí a IndexedSeq.

3) En la versión completa, la lista funcionesToCompute puede variar, por lo que no todos los elementos en el mapa de funciones se llaman necesariamente (es decir, funciónMap.size puede ser mucho más grande que funcionesToCompute.size

4) Las funciones se pueden calcular en paralelo, pero el mapa resultante debe estar completo antes de regresar

Algunas preguntas:

1) ¿Qué puedo hacer para que la versión paralela se ejecute más rápido?

2) ¿Dónde tendría sentido agregar futuros bloqueantes y sin bloqueo?

3) ¿Dónde tendría sentido reenviar el cálculo a otro actor?

4) ¿Cuáles son algunas oportunidades para aumentar la inmutabilidad/seguridad?

Gracias Bruce

¿Fue útil?

Solución

Proporcionando un ejemplo, según lo solicitado (perdón por el retraso...No tengo notificaciones activadas para SO).

Hay un gran ejemplo en la documentación de Akka. Sección sobre 'Composición de futuros' pero te daré algo un poco más adaptado a tu situación.

Ahora, después de leer esto, tómate un tiempo para leer los tutoriales y documentos en el sitio web de Akka.Le falta mucha información clave que esos documentos le proporcionarán.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}

Todo lo que necesitas en tu classpath para que esto funcione es el akka-actor frasco.El sitio web de Akka le dirá cómo configurar lo que necesita, pero es realmente muy sencillo.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top