Question

The below code does streaming back to client, in, what I gather is a more idiomatic way than using Java's IO Streams. It, however, has an issue: connection is kept open after stream is done.

def getImage() = Action { request =>
  val imageUrl = "http://hereandthere.com/someimageurl.png"
  Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
    WS.url(imageUrl).withHeaders("Accept"->"image/png").get { response => content }
    return
  }).withHeaders("Content-Type"->"image/png")
}

this is intended for streaming large (>1 mb) files from internal API to requester.

The question is, why does it keep the connection open? Is there something it expects from upstream server? I tested the upstream server using curl, and the connection does close - it just doesn't close when passed through this proxy.

Was it helpful?

Solution

The reason that the stream doesn't finish is because an EOF isn't sent to the iteratee that comes back from WS.get() call. Without this explicit EOF, the connection stays open - as it's in chunked mode, and potentially a long-running, comet-like connection.

Here's the fixed code:

Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
  WS.url(imageUrl)
    .withHeaders("Accept"->"image/png")
    .get { response => content }
    .onRedeem { ii =>
       ii.feed(Input.EOF)
    }
}).withHeaders("Content-Type"->"image/png")

OTHER TIPS

Here is a modified version for play 2.1.0. See https://groups.google.com/forum/#!msg/play-framework/HwoRR-nipCc/gUKs9NexCx4J

Thanks Anatoly G for sharing.

def proxy = Action {

   val url = "..."

   Async {
     val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]
     val resultPromise = Promise[ChunkedResult[Array[Byte]]]

     WS.url(url).get { responseHeaders =>
       resultPromise.success {
         new Status(responseHeaders.status).stream({ content: Iteratee[Array[Byte], Unit] =>
           iterateePromise.success(content)
         }).withHeaders(
           "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
           "Connection" -> "Close")
       }
       Iteratee.flatten(iterateePromise.future)
     }.onComplete {
       case Success(ii) => ii.feed(Input.EOF)
       case Failure(t) => resultPromise.failure(t)
     }

     resultPromise.future
   }

}

Update for play 2.2.x:

def proxy = Action.async {
  val url = "http://localhost:9000"

  def enumerator(chunks: Iteratee[Array[Byte], Unit] => _) = {
    new Enumerator[Array[Byte]] {
      def apply[C](i: Iteratee[Array[Byte], C]): Future[Iteratee[Array[Byte], C]] = {
        val doneIteratee = Promise[Iteratee[Array[Byte], C]]()
        chunks(i.map {
          done =>
            doneIteratee.success(Done[Array[Byte], C](done)).asInstanceOf[Unit]
        })
        doneIteratee.future
      }
    }
  }

  val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]()
  val resultPromise = Promise[SimpleResult]()

  WS.url(url).get {
    responseHeaders =>

      resultPromise.success(new Status(responseHeaders.status).chunked(
        enumerator({
          content: Iteratee[Array[Byte], Unit] => iterateePromise.success(content)
        }
        )).withHeaders(
        "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
        "Connection" -> "Close"))

      Iteratee.flatten(iterateePromise.future)
  }.onComplete {
    case Success(ii) => ii.feed(Input.EOF)
    case Failure(t) => throw t
  }

  resultPromise.future
}

if anyone has a better solution, it interests me greatly!

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top