Frage

Ich frage mich, ob es einen Weg zur Ausführung sehr einfache Aufgaben auf einem anderen Thread in scala, die nicht viel Aufwand hat?

Im Grunde würde Ich mag einen globalen ‚Vollstrecker‘ machen, das eine beliebige Anzahl von Aufgaben ausführen verarbeiten kann. Ich kann dann den Executor verwenden, um zusätzliche Konstrukte aufzubauen.

Außerdem wäre es schön, wenn Sperrung oder nicht-blockierende Überlegungen haben nicht von den Kunden berücksichtigt werden.

Ich weiß, dass die scala Schauspieler Bibliothek auf der Doug Lea FJ Sachen gebaut wird, und auch, dass sie in begrenztem Maß unterstützen, was ich zu erreichen versuchen. Doch von meinem Verständnis werde ich ein ‚Schauspieler Pool‘ haben vorbelegt zu erreichen.

Ich möchte mich für diese machen einen globalen Thread-Pool zu vermeiden, da von dem, was ich verstehe, es ist nicht alles so gut in feine Parallelität gekörnt.

Hier ist ein einfaches Beispiel:

import concurrent.SyncVar
object SimpleExecutor {
  import actors.Actor._
  def exec[A](task:  => A) : SyncVar[A] = {
    //what goes here?
    //This is what I currently have
    val x = new concurrent.SyncVar[A]
    //The overhead of making the actor appears to be a killer
    actor {
      x.set(task)
    }
    x
  }
  //Not really sure what to stick here
  def execBlocker[A](task: => A) : SyncVar[A] = exec(task)

}

und nun ein Beispiel für exec mit:

object Examples {
  //Benchmarks a task
  def benchmark(blk : => Unit) = {
    val start = System.nanoTime
    blk
    System.nanoTime - start
  }

  //Benchmarks and compares 2 tasks
  def cmp(a: => Any, b: => Any) = {
    val at = benchmark(a)
    val bt = benchmark(b)
    println(at + " " + bt + " " +at.toDouble / bt)
  }

  //Simple example for simple non blocking comparison
  import SimpleExecutor._
  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

  //Simple example for the blocking performance
  import Thread.sleep
  def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
  def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}

Schließlich die Beispiele laufen (vielleicht wollen sie ein paar Mal tun, so HotSpot aufwärmen):

import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
War es hilfreich?

Lösung

Das ist, was Futures gemacht wurde. Nur import scala.actors.Futures._, Verwendung future zu neuen Futures zu schaffen, Methoden wie awaitAll auf die Ergebnisse für eine Weile, apply oder respond zu blockieren, zu warten, bis das Ergebnis empfangen wird, isSet zu sehen, ob es bereit ist oder nicht, usw.

Sie brauchen nicht einen Thread-Pool entweder zu erstellen. Oder zumindest, die normalerweise nicht tun Sie nicht. Warum denken Sie, was Sie tun?

Bearbeiten

Sie können nicht die Leistung Parallelisierung etwas so einfaches wie eine ganze Zahl zusätzlich gewinnen, denn das ist sogar schneller als ein Funktionsaufruf. Concurrency nur Leistung bringen, indem Zeit zu vermeiden verloren zu blockieren i / o und durch mehr CPU-Kerne mit Aufgaben parallel auszuführen. Im letzteren Fall muss die Aufgabe rechnerisch teuer genug sein, um die Kosten der Aufteilung der Arbeitsbelastung und der Zusammenführung der Ergebnisse zu kompensieren.

Ein weiterer Grund für die Parallelität zu gehen, ist die Ansprechempfindlichkeit die Anwendung zu verbessern. Das ist, so dass es nicht schneller, das ist was es schneller auf den Benutzer, und ein Weg, dies zu tun ist immer noch relativ schnell Operationen auf einem anderen Thread ausgelagert, so dass die Fäden der Handhabung, was der Benutzer sieht oder schneller macht sein. Aber ich schweife ab.

Es ist ein ernstes Problem mit Ihrem Code:

  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

Oder in Futures übersetzen,

  def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

Man könnte denken, paraAdd die Aufgaben in paralallel tut, aber es ist nicht, weil Range eine nicht strikte Umsetzung der map hat (die 2,7 bis Scala liegt, beginnend mit Scala 2.8.0 ist Range streng). Sie können es auf andere Scala Fragen nachschlagen. Was geschieht, ist dies:

  1. Eine Reihe von 0 erstellt bis hi
  2. Ein Bereich Projektion von jedem Element erzeugt wird, i des Bereichs in eine Funktion, die zurückkehrt, wenn future(i+5) genannt.
  3. Für jedes Element des Bereichs Vorsprung (i => future(i+5)), das Element ausgewertet wird (foreach ist streng) und dann wird die Funktion apply sie aufgefordert hatte.

Also, weil future ist nicht genannt in Schritt 2, aber nur in Schritt 3 werden Sie für jede future warten abgeschlossen ist, bevor die nächste zu tun. Sie können das Problem beheben mit:

  def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)

Welche Ihnen eine bessere Leistung geben, aber nie so gut wie eine einfache sofortige Zugabe. Auf der anderen Seite an, dass Sie dies tun:

def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) = 
  (0 until n).force map (_ => future(f)) foreach (_.apply)

Und dann vergleichen:

cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))

Sie beginnen können Gewinne zu sehen (es wird auf die Anzahl der Kerne abhängen und Prozessorgeschwindigkeit).

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top