Question

I am new to scala, so I am quite prepared to accept that I am doing something wrong!

I am playing around with Akka, and have a test using scalatest and the akka-testkit. Here is my build.sbt config

name := """EventHub"""

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-actor_2.10" % "2.2.3",
  "com.typesafe.akka" % "akka-testKit_2.10" % "2.2.3" % "test",
  "org.scalatest" % "scalatest_2.10.0-M4" % "1.9-2.10.0-M4-B2" % "test",
  "com.ning" % "async-http-client" % "1.8.1"
)

When I compile, I get a message that I don't understand. I have google for this and have found related scala compiler issues and bugs. I have no idea if that is what I am seeing or if I am making a basic mistake somewhere. Here is a summary of the output (I have removed alot of "noise" for brevity; can add more detail if required!):

scalac: 
     while compiling: /Users/robert/Documents/Programming/Scala/Projects/EventHub/src/test/scala/Hub/Subscription/SubscriberSpec.scala
        during phase: typer
     library version: version 2.10.3
    compiler version: version 2.10.3
...
...
== Expanded type of tree ==
TypeRef(
  TypeSymbol(
    class SubscriberSpec extends TestKit with WordSpec with BeforeAndAfterAll with ImplicitSender

  )
)
uncaught exception during compilation: scala.reflect.internal.FatalError

And:

scalac: Error: package api does not have a member materializeWeakTypeTag
scala.reflect.internal.FatalError: package api does not have a member materializeWeakTypeTag
    at scala.reflect.internal.Definitions$DefinitionsClass.scala$reflect$internal$Definitions$DefinitionsClass$$fatalMissingSymbol(Definitions.scala:1037)
    at scala.reflect.internal.Definitions$DefinitionsClass.getMember(Definitions.scala:1055)
    at scala.reflect.internal.Definitions$DefinitionsClass.getMemberMethod(Definitions.scala:1090)
    at scala.reflect.internal.Definitions$DefinitionsClass.materializeWeakTypeTag(Definitions.scala:518)
    at scala.tools.reflect.FastTrack$class.fastTrack(FastTrack.scala:34)
    at scala.tools.nsc.Global$$anon$1.fastTrack$lzycompute(Global.scala:493)
    at scala.tools.nsc.Global$$anon$1.fastTrack(Global.scala:493)
    at scala.tools.nsc.typechecker.Namers$Namer.methodSig(Namers.scala:1144)
    at scala.tools.nsc.typechecker.Namers$Namer.getSig$1(Namers.scala:1454)
    at scala.tools.nsc.typechecker.Namers$Namer.typeSig(Namers.scala:1466)
    at scala.tools.nsc.typechecker.Namers$Namer$$anonfun$monoTypeCompleter$1$$anonfun$apply$1.apply$mcV$sp(Namers.scala:731)
    at scala.tools.nsc.typechecker.Namers$Namer$$anonfun$monoTypeCompleter$1$$anonfun$apply$1.apply(Namers.scala:730)
    at scala.tools.nsc.typechecker.Namers$Namer$$anonfun$monoTypeCompleter$1$$anonfun$apply$1.apply(Namers.scala:730)
    at scala.tools.nsc.typechecker.Namers$Namer.scala$tools$nsc$typechecker$Namers$Namer$$logAndValidate(Namers.scala:1499)
    at scala.tools.nsc.typechecker.Namers$Namer$$anonfun$monoTypeCompleter$1.apply(Namers.scala:730)
    at scala.tools.nsc.typechecker.Namers$Namer$$anonfun$monoTypeCompleter$1.apply(Namers.scala:729)
    at scala.tools.nsc.typechecker.Namers$$anon$1.completeImpl(Namers.scala:1614)
...
...

I am using IntelliJ as the ide. There are a couple scala files; one contains an actor, the other a webclient:

package Hub.Subscription

import scala.concurrent.{Promise, Future}
import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient, Response}


trait WebClient {
  def postUpdate(url: String, payload: Any, topic: String): Future[Int]
  def postUnSubscribe(url: String, topic: String): Future[Int]
}

case class PostUpdateFailed(status: Int) extends RuntimeException

object AsyncWebClient extends WebClient{

  private val client = new AsyncHttpClient

  override def postUpdate(url: String, payload: Any, topic: String): Future[Int] = {
    val request = client.preparePost(url).build()
    val result = Promise[Int]()
    client.executeRequest(request, new AsyncCompletionHandler[Response]() {
      override def onCompleted(response: Response) = {
        if (response.getStatusCode / 100 < 4)
          result.success(response.getStatusCode)
        else
          result.failure(PostUpdateFailed(response.getStatusCode))
        response
      }

      override def onThrowable(t: Throwable) {
        result.failure(t)
      }
    })
    result.future
  }

  override def postUnSubscribe(url: String, topic: String): Future[Int] = {
    val request = client.preparePost(url).build()
    val result = Promise[Int]
    client.executeRequest(request, new AsyncCompletionHandler[Response] {
      override def onCompleted(response: Response) = {
        if (response.getStatusCode / 100 < 4)
          result.success(response.getStatusCode)
        else
          result.failure(PostUpdateFailed(response.getStatusCode))
        response
      }

      override def onThrowable(t: Throwable) {
        result.failure(t)
      }
    })
    result.future
  }

  def shutdown(): Unit = client.close()
}

And my actor:

package Hub.Subscription

import akka.actor.Actor
import Hub.Subscription.Subscriber.{Failed, Update, UnSubscribe}
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executor

object Subscriber {
  object UnSubscribe
  case class Update(payload: Any)
  case class Failed(callbackUrl: String)
}

class Subscriber(callbackUrl: String, unSubscribeUrl: String, topic: String) extends Actor{

  implicit val executor = context.dispatcher.asInstanceOf[Executor with ExecutionContext]
  def client: WebClient = AsyncWebClient

  def receive = {
    case Update(payload) => doUpdate(payload)
    case UnSubscribe => doUnSubscribe
    case Failed(clientUrl) => //log?
  }

  def doUpdate(payload: Any): Unit = {
    val future = client.postUpdate(callbackUrl, payload, topic)
    future onFailure  {
      case err: Throwable => sender ! Failed(callbackUrl)
    }
  }

  def doUnSubscribe: Unit = {
    //tell the client that they have been un-subscribed
    val future = client.postUnSubscribe(unSubscribeUrl, topic)
    future onFailure {
      case err: Throwable => //log
    }
  }
}

And finally my test spec:

package Hub.Subscription

import akka.testkit.{ImplicitSender, TestKit}
import akka.actor.{ActorRef, Props, ActorSystem}
import org.scalatest.{WordSpec, BeforeAndAfterAll}
import scala.concurrent.Future
import scala.concurrent.duration._

object SubscriberSpec {

  def buildTestSubscriber(url: String, unSubscribeUrl: String, topic: String, webClient: WebClient): Props =
    Props(new Subscriber(url, unSubscribeUrl, topic) {
      override def client = webClient
    })

  object FakeWebClient extends WebClient {
    override def postUpdate(url: String, payload: Any, topic: String): Future[Int] = Future.successful(201)
    override def postUnSubscribe(url: String, topic: String): Future[Int] = Future.failed(PostUpdateFailed(500))
  }
}

class SubscriberSpec extends TestKit(ActorSystem("SubscriberSpec"))
  with WordSpec
  with BeforeAndAfterAll
  with ImplicitSender {

  import SubscriberSpec._

  "A subscriber" must {

    "forward the update to the callback url" in {
      val fakeClient = FakeWebClient
      val callbackUrl = "http://localhost:9000/UserEvents"
      val subscriber: ActorRef = system.actorOf(buildTestSubscriber(callbackUrl, "unSubscribeUrl", "aTopic", fakeClient))

      subscriber ! Subscriber.Update(Nil)

      within(200 millis) {
        expectNoMsg
      }
    }
  }

  override def afterAll(): Unit = {
    system.shutdown()
  }
}

Thanks in advance for any help / pointers!

Update: I should have noted that if I do not include the test spec, then all is well. But when I add the test spec, I get the errors above.

Was it helpful?

Solution

Btw I just realized that you're using scalatest compiled for 2.10.0-M4.

Scala's final releases aren't supposed to be binary compatible with corresponding milestone releases, so weird things might happen, including crashes.

If you change scalatest's version to "org.scalatest" %% "scalatest" % "1.9.1", everything is going to work just fine.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top