Question

I am using anorm to access data on my DB. The DB is written to using another service, which is made in Java, and persist using ebean.

I have the following scala object

import java.sql.Connection

import scala.concurrent.{ Future, blocking, future }
import scala.concurrent.ExecutionContext.Implicits.global

import anorm.{ SQL, SqlQuery, SqlRow, sqlToSimple, toParameterValue }
import play.api.Logger
import play.api.Play.current
import play.api.db.DB

object Queries {

  private val readDataSource: String = play.Configuration.root().getString("data.provider.api.source", "default")
  //better IO execution context

  import play.api.libs.concurrent.Execution.Implicits.defaultContext

  private val dataSetDescription: SqlQuery = SQL("SELECT DISTINCT platform, name FROM data_nugget")

  private val identityCreationTime: SqlQuery = SQL("SELECT i.creation_time FROM identity i WHERE platform = {pfm} AND userid = {uid};")

  private val identityData: SqlQuery = SQL("SELECT n.name, n.value FROM data_nugget n WHERE platform = {pfm} AND userid = {uid};")

  private val playerData: SqlQuery = SQL("SELECT n.platform, n.name, n.value, r.userid, r.registration_time FROM data_nugget n JOIN registration r ON n.platform=r.platform AND n.userid=r.userid  WHERE r.playerid = {pid} AND r.application = {app};")

  private def withAsyncAnormConnection(function: Connection => Stream[SqlRow]): Future[List[SqlRow]] = {
    future {
      blocking {
        DB.withConnection(readDataSource)(c => function(c)).toList
      }
    }
  }

  def fetchDistinctDataNames(): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => dataSetDescription())
  }

  def fetchIdentityCreationTime(platform: String, userid: String): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => identityCreationTime.on("pfm" -> platform, "uid" -> userid)())
  }

  def fetchIdentityData(platform: String, userid: String): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => identityData.on("pfm" -> platform, "uid" -> userid)())
  }

  def fetchRegistrationData(game: String, playerid: String): Future[List[SqlRow]] = {
    withAsyncAnormConnection(implicit c => playerData.on("app" -> game, "pid" -> playerid)())
  }

}

I use it to wrap my SQL queries executions within futures.

Everytime I run any of those queries I obtain an error with this following stack trace :

(Error,com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1073)
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:987)
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:982)
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:927)
com.mysql.jdbc.ResultSetImpl.checkClosed(ResultSetImpl.java:794)
com.mysql.jdbc.ResultSetImpl.next(ResultSetImpl.java:7139)
anorm.Sql$$anonfun$resultSetToStream$1.apply(Anorm.scala:527)
anorm.Sql$$anonfun$resultSetToStream$1.apply(Anorm.scala:527)
anorm.Useful$.unfold(Anorm.scala:315)
anorm.Useful$$anonfun$unfold$1.apply(Anorm.scala:317)
anorm.Useful$$anonfun$unfold$1.apply(Anorm.scala:317)
scala.collection.immutable.Stream$Cons.tail(Stream.scala:1078)
scala.collection.immutable.Stream$Cons.tail(Stream.scala:1070)
scala.collection.immutable.Stream.foreach(Stream.scala:548)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:178)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
scala.collection.AbstractTraversable.to(Traversable.scala:105)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:243)
scala.collection.AbstractTraversable.toList(Traversable.scala:105)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1$$anonfun$apply$1.apply(Queries.scala:31)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1$$anonfun$apply$1.apply(Queries.scala:31)
scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2$$anon$3.block(ExecutionContextImpl.scala:44)
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:2803)
scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:41)
scala.concurrent.package$.blocking(package.scala:50)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1.apply(Queries.scala:30)
controllers.dataprovider.data.Queries$$anonfun$withAsyncAnormConnection$1.apply(Queries.scala:30)
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1417)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104))

I already encountered those on previous Java services using jdbc but here I am not touching the ResultSet, and I am even returning a list asap from the Stream of rows I receive from the connection.

What is happening? Where am I closing the ResultSet? What did I refactor wrong?

As a note, on the prototype of this service (when everything was in the controller) I used to have the SQL("...") directly in the code with something like that:

future {
    blocking {
      DB.withConnection(implicit c => {
        SQL("SELECT DISTINCT platform, name FROM data_nugget")().map(row => (row[String]("platform"), row[String]("name"))).toArray
      })
    }
  }

and it worked just fine.

PS : Sorry for the long copy/paste of stacktrace and the code ... trying to be detailed.

Was it helpful?

Solution

I solved it myself and it is a very fine line.

I changed this function

  private def withAsyncAnormConnection(function: Connection => Stream[SqlRow]): Future[List[SqlRow]] = {
    future {
      blocking {
        DB.withConnection(readDataSource)(c => function(c)).toList
      }
    }
  }

to THIS:

  private def withAsyncAnormConnection(function: Connection => Stream[SqlRow]): Future[List[SqlRow]] = {
    future {
      blocking {
        DB.withConnection(readDataSource)(c => function(c).toList)
      }
    }
  }

The trick is that I am using the "loan-pattern" of withConnection, so I need to iter through the Stream to get all the rows before I release the connection.
The connection is alive only within this round brackets (c => function(c).toList)

OTHER TIPS

There's a difference between the code that is working for you and the code that is not working. In your working example, you are calling map on the lazy Stream of Row instances. In the non-working example, you are calling toList without using map. Maybe map is forcing the full processing of the underlying ResultSet within the withConnection block and toList is not, leaving it lazy until you get outside of the withConnection block after which the underlying ResultSet is closed. Maybe you can modify your new code to try and map the results (mapping the Row to itself, no actual mapping logic) and see if this fixes anything.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top