Question

I am using the asynchronous I/O library of the playframework which uses Iteratees and Enumerators. I now have an Iterator[T] as data sink (for simplification say it's an Iterator[Byte] which stores its content into a file). This Iterator[Byte] is passed to the function which handles the writing.

But before writing I want to add some statistical information at the file begin (for simplification say it's one Byte), so I transfer the iterator the following way before passing it to the write function:

def write(value: Byte, output: Iteratee[Byte]): Iteratee[Byte] =
    Iteratee.flatten(output.feed(Input.El(value)))

When I now read the stored file from the disk, I get an Enumerator[Byte] for it. At first I want to read and remove the additional data and then I want to pass the rest of the Enumerator[Byte] to a function which handles the reading. So I also need to transform the enumerator:

def read(input: Enumerator[Byte]): (Byte, Enumerator[Byte]) = {
   val firstEnumeratorEntry = ...
   val remainingEnumerator = ...
   (firstEnumeratorEntry, remainingEnumerator)
}

But I have no idea, how to do this. How can I read some bytes from an Enumerator and get the remaining Enumerator?

Replacing Iteratee[Byte] with OutputStream and Enumerator[Byte] with InputStream, this would be very easy:

def write(value: Byte, output: OutputStream) = {
    output.write(value)
    output
}
def read(input: InputStream) = (input.read,input)

But I need the asynchronous I/O of the play framework.

Was it helpful?

Solution

Here is one way to achieve this by folding within the Iteratee and an appropriate (kind-of) State accumulator (a tuple here)

I go read the routes file, the first byte will be read as a Char and the other will be appended to a String as UTF-8 bytestrings.

  def index = Action {
    /*let's do everything asyncly*/
    Async {
      /*for comprehension for read-friendly*/
      for (
        i <- read; /*read the file */
        (r:(Option[Char], String)) <- i.run /*"create" the related Promise and run it*/
      ) yield Ok("first : " + r._1.get + "\n" + "rest" + r._2) /* map the Promised result in a correct Request's Result*/
    }
  }


  def read = {
    //get the routes file in an Enumerator
    val file: Enumerator[Array[Byte]] = Enumerator.fromFile(Play.getFile("/conf/routes"))

    //apply the enumerator with an Iteratee that folds the data as wished
    file(Iteratee.fold((None, ""):(Option[Char], String)) { (acc, b) =>
       acc._1 match {
         /*on the first chunk*/ case None => (Some(b(0).toChar), acc._2 + new String(b.tail, Charset.forName("utf-8")))
         /*on other chunks*/ case x => (x, acc._2 + new String(b, Charset.forName("utf-8")))
       }
    })

  }

EDIT

I found yet another way using Enumeratee but it needs to create 2 Enumerator s (one short lived). However is it a bit more elegant. We use a "kind-of" Enumeratee but the Traversal one which works at a finer level than Enumeratee (chunck level). We use take 1 that will take only 1 byte and then close the stream. On the other one, we use drop that simply drops the first byte (because we're using a Enumerator[Array[Byte]])

Furthermore, now read2 has a signature much more closer than what you wished, because it returns 2 enumerators (not so far from Promise, Enumerator)

def index = Action {
  Async {
    val (first, rest) = read2
    val enee = Enumeratee.map[Array[Byte]] {bs => new String(bs, Charset.forName("utf-8"))}

    def useEnee(enumor:Enumerator[Array[Byte]]) = Iteratee.flatten(enumor &> enee |>> Iteratee.consume[String]()).run.asInstanceOf[Promise[String]]

    for {
      f <- useEnee(first);
      r <- useEnee(rest)
    } yield Ok("first : " + f + "\n" + "rest" + r)
  }
}

def read2 = {
  def create = Enumerator.fromFile(Play.getFile("/conf/routes"))

  val file: Enumerator[Array[Byte]] = create
  val file2: Enumerator[Array[Byte]] = create

  (file &> Traversable.take[Array[Byte]](1), file2 &> Traversable.drop[Array[Byte]](1))

}

OTHER TIPS

I wonder if you can tackle your goal from another angle.

That function that would use the remaining enumerator, let's call it remaining, presumably it applies to an iteratee to do the processing of the remainder: remaining |>> iteratee yielding another iteratee. Let's call that resulting iteratee iteratee2... Can you check whether you can get a reference to iteratee2? If that's the case, then you can get and process the first byte using a first iteratee head, then combine head and iteratee2 through flatMap:

val head = Enumeratee.take[Byte](1) &>> Iteratee.foreach[Byte](println)
val processing = for { h <- head; i <- iteratee2 } yield (h, i)
Iteratee.flatten(processing).run

If you cannot get a hold of iteratee2 - which would be the case if your enumerator combines with an enumeratee that you did not implement - then this approach won't work.

Actually we like Iteratees because they compose. So instead of creating multiple Enumerators from your original one, you rather compose the two Iteratees sequentially (read-first and read-rest), and feed it with your single Enumerator.

For this you need a sequential composition method, now I call it andThen. Here is a rough implementation. Note that returning the unconsumed input is a bit harsh, maybe could customize behavior with a typeclass based on the Input type. Also it doesn't handle passing the leftover stuff from the first iterator to the second one (Exercise :).

object Iteratees {
  def andThen[E, A, B](a: Iteratee[E, A], b: Iteratee[E, B]): Iteratee[E, (A,B)] = new Iteratee[E, (A,B)] {
    def fold[C](
        done: ((A, B), Input[E]) => Promise[C],
        cont: ((Input[E]) => Iteratee[E, (A, B)]) => Promise[C],
        error: (String, Input[E]) => Promise[C]): Promise[C] = {

      a.fold(
        (ra, aleft) => b.fold(
          (rb, bleft) => done((ra, rb), aleft /* could be magicop(aleft, bleft)*/),
          (bcont) => cont(e => bcont(e) map (rb => (ra, rb))),
          (s, err) => error(s, err)
        ),
        (acont) => cont(e => andThen[E, A, B](acont(e), b)),
        (s, err) => error(s, err)
      )
    }
  }
}

Now you can just use the following:

object Application extends Controller {

  def index = Action { Async {

    val strings: Enumerator[String] = Enumerator("1","2","3","4")
    val takeOne = Cont[String, String](e => e match {
      case Input.El(e) => Done(e, Input.Empty)
      case x => Error("not enough", x)
    })
    val takeRest = Iteratee.consume[String]()
    val firstAndRest = Iteratees.andThen(takeOne, takeRest)

    val futureRes = strings(firstAndRest) flatMap (_.run)

    futureRes.map(x => Ok(x.toString)) // prints (1,234)
  } }

}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top