Domanda

I am trying to get the grips of using Iteratees in Play 2 to stream comet results. I have got the handle go creating an enumerator from a callback and an enumeratee from a map. My issue is with the Enumeratee.map, this takes a function that takes a pure input and returns a pure output (e.g. the String to Int conversion in the doc). What I would like to do is take a pure input and return a promise of a result. After all, an enumerator feeds an enumeratee with promises, an enumeratee converts one enumerator to the other, so there should be a way to make an enumeratee that maps to promises.

Now, let me make an example to make this a bit clearer. Let's say that I have an http request coming in with a list of IDs to query in my database. Let's say that these ids represent rows in a database table and the request does a set of (long) computations on these rows and then returns a set of json objects representing the computation. As I have long blocking stuff to do, it would be cool to stream that one ID at a time, so I would like to have an enumeratee pipeline that does:

  1. query a row in the database (returns a promise of the row)
  2. make a long computation on the row (takes a row and returns a promise of a computation)
  3. convert the long computation to JSON
  4. &> this out to the Comet enumeratee provided by Play 2

1 is kinda easy, I can construct an enumerator with a fromCallback that will return a promise of the query result. 3 is also kinda easy, as it's a simple Enumeratee.map

But I can't wrap my head around how to implement the applyOn of the enumeratee of step 2. I can of understood that I have bo build a new iteratee that get the promise from the "inner" iteratee, flatMap the long calculation and return the new promise. What I don't get is how to make this given the odd applyOn signature: def applyOn[A](it: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]]

Can someone help me with that?

Thanks

È stato utile?

Soluzione

The signature of applyOn makes more sense when you think that the enumeratee combines with an iteratee on the right like in enumerator |>> (enumeratee &> iteratee). iteratee has type Iteratee[E, A] and enumerator expects a Iteratee[Promise[E], Iteratee[E, A] so that the inner iteratee can be extracted. So applyOn will have to a Iteratee[E, A] and return a Iteratee[Promise[E], Iteratee[E, A].

Here is the outline of an implementation. It defines a step function that takes an input and return the expected result. Then recursively steps through the promise elements.

import play.api.libs.concurrent._
import play.api.libs.iteratee._

def unpromise[E]: Enumeratee[Promise[E], E] = new Enumeratee[Promise[E], E] {
  def applyOn[A](inner: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
    def step(input: Input[Promise[E]], i: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
      input match {
        case Input.EOF => Done(i, Input.EOF)
        case Input.Empty => Cont(step(_, i))
        case Input.El(pe) =>
          val pe2 = pe.map(e => i.feed(Input.El(e))).flatMap(identity)
          val i2 = Iteratee.flatten(pe2)
          i2.pureFlatFold(
            (a, e2) => Done(i2, Input.Empty),
            k => Cont(step(_, i2)),
            (msg, e2) => Done(i2, Input.Empty))
      }
    }
    // should check that inner is not done or error - skipped for clarity
    Cont(step(_, inner))
  }
}

I'm discarding e2, so there is probably more code to ensure some input is not lost.

Altri suggerimenti

There is a method on master Enumeratee.mapM[E] that takes f: E => Promise[NE] and returns Enumeratee[E, NE]

https://github.com/playframework/Play20/blob/master/framework/src/play/src/main/scala/play/api/libs/iteratee/Enumeratee.scala#L150

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top