Quais são algumas das oportunidades de melhorar o desempenho / simultaneidade na seguinte Scala + Akka código?

StackOverflow https://stackoverflow.com/questions/9503658

Pergunta

Eu estou procurando oportunidades para aumentar a concorrência e o desempenho no meu Scala De 2,9 / Akka 2.0 RC2 código.Dado o seguinte código:

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}

Quando eu executar esse saída (convertido em microssegundos) é

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103

Você pode ver a JVM incremental de compilador no chute.

Eu pensei que um ganho rápido, pode ser para mudar

    functionsToCompute.foreach(function => {

para

    functionsToCompute.par.foreach(function => {

mas isto resulta na seguinte decorrido vezes

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171

Algumas informações:

1) eu estou usando este em um Macbook Pro com 2 núcleos.

2) Na versão completa, as funções são de longa operações de execução do loop através de partes do mutável buffer compartilhado.Este não parece ser um problema, já que a obtenção de mensagens de ator da caixa de correio é controlar o fluxo, mas eu suspeito que poderia ser um problema com o aumento da concorrência.É por isso que eu tenha convertido em uma IndexedSeq.

3) Na versão completa, o functionsToCompute lista pode variar, de modo que nem todos os itens no functionMap são, necessariamente, chamado (i.e.) functionMap.o tamanho pode ser muito maior do que functionsToCompute.tamanho

4) As funções podem ser calculados em paralelo, mas a resultante do mapa deve ser concluída antes de retornar

Algumas perguntas:

1) o Que posso fazer para tornar a versão paralela correr mais rápido?

2) Onde faria sentido para adicionar sem bloqueio e bloqueio de futuros?

3) Onde faria sentido para a frente de computação para outro ator?

4) Quais são algumas oportunidades para aumentar a imutabilidade/segurança?

Obrigado, Bruce

Foi útil?

Solução

Dando um exemplo, conforme solicitado (desculpe o atraso...Eu não tenho notificações para ISSO).

Há um grande exemplo no Akka documentação Secção de 'Compor Futuros' mas eu vou dar-lhe algo um pouco mais adaptado à sua situação.

Agora, depois de ler isto, por favor, tome algum tempo para ler os tutoriais e documentos de Akka do site.Está faltando um monte de informações importantes que aqueles docs irá fornecer para você.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}

Tudo o que você precisa no seu classpath para obter esta executando é o akka-actor jar.O Akka site irá dizer-lhe como configurar o que você precisa, mas é realmente muito simples.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top