ما هي بعض الفرص لتحسين الأداء/التزامن في كود Scala + Akka التالي؟

StackOverflow https://stackoverflow.com/questions/9503658

سؤال

أنا أبحث عن فرص لزيادة التزامن والأداء في كود Scala 2.9 / Akka 2.0 RC2 الخاص بي.نظرا للكود التالي:

import akka.actor._

case class DataDelivery(data:Double)

class ComputeActor extends Actor {
    var buffer = scala.collection.mutable.ArrayBuffer[Double]()

    val functionsToCompute = List("f1","f2","f3","f4","f5")
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()  
    functionMap += {"f1" -> f1}
    functionMap += {"f2" -> f2}
    functionMap += {"f3" -> f3}
    functionMap += {"f4" -> f4}
    functionMap += {"f5" -> f5}

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
        buffer += data
        buffer
    }

    def f1(map:Map[String,Any]):Double = {
//    println("hello from f1")
      0.0
    }

    def f2(map:Map[String,Any]):Double = {
//    println("hello from f2")
      0.0
    }

    def f3(map:Map[String,Any]):Double = {
//    println("hello from f3")
      0.0
    }

    def f4(map:Map[String,Any]):Double = {
//    println("hello from f4")
      0.0
    }

    def f5(map:Map[String,Any]):Double = {
//    println("hello from f5")
      0.0
    }

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
        var map = Map[String,Double]()
        try {
            functionsToCompute.foreach(function => {
                val value = functionMap(function)
                function match {
                    case "f1" =>
                        var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
                        map += {function -> v}
                    case "f2" =>
                        var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f3" =>
                        var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
                        map += {function -> v}
                    case "f4" =>
                        var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
                        map += {function -> v}
                    case "f5" =>
                        var v = value(Map("buffer"->immutableBuffer))
                        map += {function -> v}
                    case _ => 
                        println(this.unhandled())
                }
            })
        } catch {
            case ex: Exception =>
              ex.printStackTrace()
        }
        map
    }

    def receive = {
      case DataDelivery(data) =>
        val startTime = System.nanoTime()/1000
        val answers = computeValues(updateData(data))
        val endTime = System.nanoTime()/1000
        val elapsedTime = endTime - startTime
        println("elapsed time is " + elapsedTime)
        // reply or forward
      case msg =>
        println("msg is " + msg)
    }

}

object Test {
    def main(args:Array[String]) {
        val system = ActorSystem("actorSystem") 
        val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
        var i = 0
        while (i < 1000) {  
            computeActor ! DataDelivery(i.toDouble)
            i += 1
        }
    }
}

عندما أقوم بتشغيل هذا الإخراج (تم تحويله إلى ميكروثانية).

elapsed time is 4898
elapsed time is 184
elapsed time is 144
    .
    .
    .
elapsed time is 109
elapsed time is 103

يمكنك رؤية المترجم التزايدي لـ JVM وهو يبدأ العمل.

اعتقدت أن الفوز السريع قد يكون التغيير

    functionsToCompute.foreach(function => {

ل

    functionsToCompute.par.foreach(function => {

ولكن هذا يؤدي إلى الأوقات المنقضية التالية

elapsed time is 31689
elapsed time is 4874
elapsed time is 622
    .
    .
    .
elapsed time is 698
elapsed time is 2171

بعض المعلومات:

1) أقوم بتشغيل هذا على جهاز Macbook Pro مع مركزين.

2) في النسخة الكاملة، الوظائف عبارة عن عمليات طويلة الأمد تتكرر عبر أجزاء من المخزن المؤقت المشترك القابل للتغيير.لا يبدو أن هذا يمثل مشكلة نظرًا لأن استرداد الرسائل من صندوق بريد الممثل يتحكم في التدفق، لكنني أظن أنها قد تكون مشكلة تتعلق بزيادة التزامن.هذا هو السبب في أنني قمت بالتحويل إلى IndexedSeq.

3) في النسخة الكاملة، قد تختلف قائمة functionToCompute، بحيث لا يتم بالضرورة استدعاء جميع العناصر الموجودة في functionMap (على سبيل المثال) functionMap.size قد تكون أكبر بكثير من functionToCompute.size

4) يمكن حساب الوظائف بالتوازي، ولكن يجب أن تكون الخريطة الناتجة كاملة قبل العودة

بعض الأسئلة:

1) ما الذي يمكنني فعله لجعل الإصدار الموازي يعمل بشكل أسرع؟

2) أين سيكون من المنطقي إضافة العقود الآجلة غير المحظورة والمحظورة؟

3) أين يكون من المنطقي إعادة توجيه العمليات الحسابية إلى جهة فاعلة أخرى؟

4) ما هي بعض الفرص لزيادة الثبات/السلامة؟

شكرا يا بروس

هل كانت مفيدة؟

المحلول

تقديم مثال، كما هو مطلوب (آسف على التأخير...ليس لدي إشعارات بشأن SO).

هناك مثال رائع في وثائق Akka قسم "تأليف العقود الآجلة" لكنني سأعطيك شيئًا أكثر ملاءمة لموقفك.

الآن، بعد قراءة هذا، يرجى تخصيص بعض الوقت لقراءة البرامج التعليمية والمستندات الموجودة على موقع Akka الإلكتروني.أنت تفتقد الكثير من المعلومات الأساسية التي ستوفرها لك هذه المستندات.

import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors

object Main {
  // This just makes the example work.  You probably have enough context
  // set up already to not need these next two lines
  val pool = Executors.newCachedThreadPool()
  implicit val ec = ExecutionContext.fromExecutorService(pool)

  // I'm simulating your function.  It just has to return a tuple, I believe
  // with a String and a Double
  def theFunction(s: String, d: Double) = (s, d)
  def main(args: Array[String]) {
    // Here we run your functions - I'm just doing a thousand of them
    // for fun.  You do what yo need to do
    val listOfFutures = (1 to 1000) map { i =>
      // Run them in parallel in the future
      Future {
        theFunction(i.toString, i.toDouble)
      }
    }
    // These lines can be composed better, but breaking them up should
    // be more illustrative.
    //
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
    val futureOfResults = Future.sequence(listOfFutures)

    // Convert that future into another future that contains a map instead
    // instead of a sequence
    val intermediate = futureOfResults map { _.toList.toMap }

    // Wait for it complete.  Ideally you don't do this.  Continue to
    // transform the future into other forms or use pipeTo() to get it to go
    // as a result to some other Actor.  "Await" is really just evil... the
    // only place you should really use it is in silly programs like this or
    // some other special purpose app.
    val resultingMap = Await.result(intermediate, 1 second)
    println(resultingMap)

    // Again, just to make the example work
    pool.shutdown()
  }
}

كل ما تحتاجه في مسار الفصل الخاص بك لتشغيل هذا هو akka-actor إناء.سيخبرك موقع Akka بكيفية إعداد ما تحتاج إليه، لكن الأمر في غاية البساطة.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top