Frage

ich mit dem folgenden Problem jetzt für eine Woche habe Schwierigkeiten und brauche ein paar Ratschläge.

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

Ich will eine Pipeline wie konstruieren:

query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate                  (collected-meta-infos-state per query)
   => List[  TerminatorI-List[MetaInfo],  TerminatorII-List[MetaInfo],  ...]

Bisher habe ich jedes Pipeline-Segment als Schauspieler umgesetzt. Ich brauche für jede Abfrage engagierte Schauspieler-Instanzen zu erstellen, da einige dieser Akteure wie filterXXX und konsolidieren Notwendigkeit Zustand pro Abfrage zu erhalten.

Funktionen wie askIMDB produzieren mehrere Ergebnisse, die ich möchte gleichzeitig (jeweils mit einem separaten Schauspieler) verarbeiten. So habe ich keine Möglichkeit gefunden, um die gesamte grafische Darstellung der Akteure vor Ausführen die query () und weder eine elegante Art und Weise zu ändern, es zur Laufzeit.

Pre-Konstrukt

Mein erster Versuch war eine Kette von Akteuren und vorbei sth wie Transaction-IDs in den Nachrichten, so dass jeder Schauspieler eine Karte hatte [TransactionID-> State], aber diese ziemlich hässlich fühlte. Der zweite Versuch war eine Art-of-Pipeline Abstrahieren den Digraph von Akteuren in einen Fluss zu schaffen, aber ich nicht so weit.

Dies ist mein erster Beitrag, sorry, wenn ich etwas vergessen oder die Frage ist zu allgemein / pseudo-codiert. Jede Beratung sehr geschätzt. Dank!

War es hilfreich?

Lösung

Ich schlage vor, Sie nehmen einen Blick auf ScalaQuery , die die gleiche Sache tut über. Und es kann so tun, denn dies ist ein Monade Problem. In der Tat, einige Haskell Lösungen wie Pfeile, die von der Scalaz Bibliothek , scheint sein ziemlich nahe.

Das wäre die beste Lösung sein, da die richtige Abstraktion wird Änderungen in Zukunft einfacher.

Als Hack, ich Figur so etwas wie folgt aus:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
  self =>

  // Create actors
  def createActor(qm: QueryModifiers): Actor = {
    val actor = qm match {
      case Consolidate => // create a consolidator actor
      case //... as needed
    }
    actor.start
    actor
  }

  // The pipeline
  val pipe: List[List[QueryModifiers]] = Nil

  // Build the pipeline
  def ->(qms: List[QueryModifiers]) = new Query(title) {
    override val pipe = qms :: self.pipe
  }
  def ->(qm: QueryModifiers) = new Query(title) {
    override val pipe = List(qm) :: self.pipe
  }
  def ->(c: Consolidate.type) = {
    // Define the full pipeline
    // Because the way pipe is built, the last layer comes first, and the first comes last
    val pipeline = Consolidate :: pipe

    // Create an actor for every QueryModifier, using an unspecified createActor function
    val actors = pipeline map (_ map (createActor(_))

    // We have a list of lists of actors now, where the first element of the list
    // was the last QueryModifiers we received; so, group the layers by two, and for each
    // pair, make the second element send the result to the first.
    // Since each layer can contain many actors, make each member of the second
    // layer send the results to each member of the first layer.
    // The actors should be expecting to receive message SendResultsTo at any time.
    for {
      List(nextLayer, previousLayer) <- actors.iterator sliding 2
      nextActor <- nextLayer
      previousActor <- previousLayer
    } previousActor ! SendResultsTo(nextActor)

    // Send the query to the first layer
    for ( firstActor <- actors.last ) firstActor ! Query(title)

    // Get the result from the last layer, which is the consolidator
    val results = actors.head.head !? Results

    // Return the results
    results
  }
}

Bearbeiten

Sie können garantieren Bestellung auch mit einem kleinen Trick. Ich versuche Scala 2.8 hier zu vermeiden, obwohl es so viel einfacher, mit dem Namen und Standardparameter machen.

sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
  def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
  }
  // Do similarly for qm: NextQM

  // Consolidation
  def ->(qm: Consolidate.type) = {
     // Create Searchers actors
     // Send them the Filters
     // Send them Fetchers
     // Create the Consolidator actor
     // Send it to Searchers actors
     // Send Searchers the query
     // Ask Consolidator for answer
  }
}

object Query {
  def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

Nun halten Searchers Akteure eine Liste von Filtern, eine Liste der Bringer, und den Verweis auf die Consolidator. Sie hören Nachrichten sie diese Dinge, und für die Abfrage informiert. Für jedes Ergebnis, sie einen Filter Schauspieler für jeden Filter in der Liste erstellen, jede von ihnen die Liste des Bringer und das Consolidator senden, und dann das Ergebnis senden.

Filter Akteure halten eine Liste der Bringer und einen Verweis auf die Consolidator. Sie hören Nachrichten sie diese Dinge, und für das Ergebnis des Suchenden zu informieren. Sie schicken ihre Leistung, wenn überhaupt, zu neu Abholer Akteure geschaffen, die zunächst von dem Consolidator informiert.

Fetchers halten einen Verweis auf die Konsolidierer. Sie hören eine Nachricht sie von dieser Referenz und das Ergebnis aus dem Filter zu informieren. Sie schicken ihr Ergebnis wiederum an den Consolidator.

Der Consolidator hören zwei Nachrichten. Eine Nachricht, von Abholer Schauspieler kommen, informieren sie über Ergebnisse, die sie sich ansammeln. Eine weitere Nachricht, von der Abfrage kommt, fragt zu diesem Ergebnis, das es gibt.

Das einzige, was links ist eine Art und Weise der Ausarbeitung des Consolidator wissen zu lassen, dass alle Ergebnisse verarbeitet wurden. Eine Möglichkeit wäre, die folgende:

  1. In der Abfrage informiert die Consolidator Schauspieler jedes Searcher, die erstellt wurde. Der Consolidator führt eine Liste von ihnen, mit einem Kennzeichen, das anzeigt, ob sie fertig sind oder nicht.
  2. hält Jeder Sucher eine Liste der Filter es erstellt wurde, und wartet auf eine „done“ Nachricht von ihnen. Wenn ein Sucher hat keine Verarbeitung mehr zu tun und erhielt „done“ von allen Filtern, sendet er eine Nachricht an den Consolidator es darüber informiert, dass es beendet ist.
  3. Jeder Filter, der wiederum führt eine Liste der Bringer es geschaffen hat, und ebenso wartet auf „done“ Nachrichten von ihnen. Wenn es fertig Verarbeitung hat, und erhielt „done“ von allen Fetcher, den Sucher informiert, dass es getan hat.
  4. Es Abholer eine „done“ Nachricht an den Filter sendet, die er geschaffen hat, wenn seine Arbeit abgeschlossen ist und an den Consolidator gesendet.
  5. Die Konsolidierer nur auf die Nachricht hört Abfrage das Ergebnis, nachdem es eine „done“ von allen Suchenden erhalten hat.
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top