Question

Given rowParser of type RowParser[Photo], this is how you would parse a list of rows coming from a table photo, according to the code samples I have seen so far:

def getPhotos(album: Album): List[Photo] = DB.withConnection { implicit c =>
  SQL("select * from photo where album = {album}").on(
    'album -> album.id
  ).as(rowParser *)
}

Where the * operator creates a parser of type ResultSetParser[List[Photo]]. Now, I was wondering if it was equally possible to get a parser that yields a Stream (thinking that being more lazy is always better), but I only came up with this:

def getPhotos(album: Album): Stream[Photo] = DB.withConnection { implicit c =>
  SQL("select * from photo where album = {album}").on(
    'album -> album.id
  )() collect (rowParser(_) match { case Success(photo) => photo })
}

It works, but it seems overly complicated. I could of course just call toStream on the List I get from the first function, but my goal was to only apply rowParser on rows that are actually read. Is there an easier way to achieve this?

EDIT: I know that limit should be used in the query, if the number of rows of interest is known beforehand. I am also aware that, in many cases, you are going to use the whole result anyway, so being lazy will not improve performance. But there might be a case where you save a few cycles, e.g. if for some reason, you have search criteria that you cannot or do not want to express in SQL. So I thought it was odd that, given the fact that anorm provides a way to obtain a Stream of SqlRow, I didn't find a straightforward way to apply a RowParser on that.

Was it helpful?

Solution

I ended up creating my own stream method which corresponds to the list method:

def stream[A](p: RowParser[A]) = new ResultSetParser[Stream[A]]  {
      def apply(rows: SqlParser.ResultSet): SqlResult[Stream[A]] = rows.headOption.map(p(_)) match {
        case None => Success(Stream.empty[A])
        case Some(Success(a)) => {
          val s: Stream[A] = a #:: rows.tail.flatMap(r => p(r) match {
            case Success(r) => Some(r)
            case _ => None
          })  

          Success(s)
        }
        case Some(Error(msg)) => Error(msg)
      }
   } 

Note that the Play SqlResult can only be either Success/Error while each row can also be Success/Error. I handle this for the first row only, assuming the rest will be the same. This may or may not work for you.

OTHER TIPS

You're better off making smaller (paged) queries using limit and offset.

Anorm would need some modification if you're going to keep your (large) result around in memory and stream it from there. Then the other concern would be the new memory requirements for your JVM. And how would you deal with caching on the service level? See, previously you could easily cache something like photos?page=1&size=10, but now you just have photos, and the caching technology would have no idea what to do with the stream.

Even worse, and possibly on a JDBC-level, wrapping Stream around limited and offset-ed execute statements and just making multiple calls to the database behind the scenes, but this sounds like it would need a fair bit of work to port the Stream code that Scala generates to Java land (to work with Groovy, jRuby, etc), then get it on the approved for the JDBC 5 or 6 roadmap. This idea will probably be shunned as being too complicated, which it is.

You could wrap Stream around your entire DAO (where the limit and offset trickery would happen), but this almost sounds like more trouble than it's worth :-)

I ran into a similar situation but ran into a Call Stack Overflow exception when the built-in anorm function to convert to Streams attempted to parse the result set.

In order to get around this I elected to abandon the anorm ResultSetParser paradigm, and fall back to the java.sql.ResultSet object.

I wanted to use anorm's internal classes for the parsing result set rows, but, ever since version 2.4, they have made all of the pertinent classes and methods private to their package, and have deprecated several other methods that would have been more straight-forward to use.

I used a combination of Promises and Futures to work around the ManagedResource that anorm now returns. I avoided all deprecated functions.

import anorm._
import java.sql.ResultSet
import scala.concurrent._

def SqlStream[T](sql:SqlQuery)(parse:ResultSet => T)(implicit ec:ExecutionContext):Future[Stream[T]] = {
  val conn = db.getConnection()
  val mr = sql.preparedStatement(conn, false)
  val p = Promise[Unit]()
  val p2 = Promise[ResultSet]()
  Future {
    mr.map({ stmt =>
      p2.success(stmt.executeQuery)
      Await.ready(p.future, duration.Duration.Inf)
    }).acquireAndGet(identity).andThen { case _ => conn.close() }
  }
  def _stream(rs:ResultSet):Stream[T] = {
    if (rs.next()) parse(rs) #:: _stream(rs)
    else {
      p.success(())
      Stream.empty
    }
  }
  p2.future.map { rs =>
    rs.beforeFirst()
    _stream(rs)
  }
}

A rather trivial usage of this function would be something like this:

def getText(implicit ec:ExecutionContext):Future[Stream[String]] = {
  SqlStream(SQL("select FIELD from TABLE")) { rs => rs.getString("FIELD") }
}

There are, of course, drawbacks to this approach, however, this got around my problem and did not require inclusion of any other libraries.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top