다음 Scala + Akka 코드에서 성능/동시성을 향상시킬 수 있는 기회는 무엇입니까?

StackOverflow https://stackoverflow.com/questions/9503658

문제

Scala 2.9/Akka 2.0 RC2 코드에서 동시성과 성능을 향상시킬 수 있는 기회를 찾고 있습니다.다음 코드가 주어지면:

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}

이것을 실행하면 출력(마이크로초로 변환)은 다음과 같습니다.

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103

JVM의 증분 컴파일러가 시작되는 것을 볼 수 있습니다.

한 번의 빠른 승리가 변화를 가져올 수도 있다고 생각했어요

    functionsToCompute.foreach(function => {

에게

    functionsToCompute.par.foreach(function => {

하지만 이로 인해 다음과 같은 경과 시간이 발생합니다.

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171

일부 정보:

1) 저는 이것을 2개의 코어가 있는 Macbook Pro에서 실행하고 있습니다.

2) 정식 버전에서 함수는 변경 가능한 공유 버퍼의 일부를 반복하는 장기 실행 작업입니다.배우의 메일함에서 메시지를 검색하는 것이 흐름을 제어하므로 이는 문제가 되지 않는 것으로 보이지만 동시성이 증가하면 문제가 될 수 있다고 생각됩니다.이것이 내가 IndexedSeq로 변환한 이유입니다.

3) 정식 버전에서는 functionToCompute 목록이 다를 수 있으므로 functionMap의 모든 항목이 반드시 호출되는 것은 아닙니다. 즉, functionMap.size가 functionToCompute.size보다 훨씬 클 수 있습니다.

4) 함수는 병렬로 계산될 수 있지만 결과 맵은 반환하기 전에 완료되어야 합니다.

몇 가지 질문:

1) 병렬 버전을 더 빠르게 실행하려면 어떻게 해야 합니까?

2) 비차단 퓨처와 블로킹 퓨처를 어디에 추가하는 것이 합리적일까요?

3) 계산을 다른 행위자에게 전달하는 것이 어디에서 의미가 있습니까?

4) 불변성/안전성을 높일 수 있는 기회는 무엇입니까?

고마워요, 브루스

도움이 되었습니까?

해결책

요청한 것처럼 예제를 제공합니다 (지연에 대해 죄송합니다 ... 나는 그렇게 알림이 없습니다.

AKKA 문서에서 그러나 당신에게 당신의 상황에 조금 더 적합한 것을 줄 것입니다.

이제이 글을 읽은 후에는 AKKA의 웹 사이트에서 자습서와 문서를 읽을 때까지 시간을내어주십시오.그 문서가 제공 할 수있는 많은 주요 정보가 없습니다.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}
.

CLASSPATH 에서이 실행중인 모든 것은 akka-actor JAR입니다.AKKA 웹 사이트는 필요한 것을 설정하는 방법을 알려줍니다. 그러나 정말로 간단합니다.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top