User/Actor not receiving message from system/TestActor Akka TestKit

367 views Asked by At

I have a simple scala class CartService which creates a actor and sends it some message and the actor sends back a response to the service via sender ! result Actor code

    package actors

import java.util.UUID

import akka.persistence.PersistentActor
import entites._




/**
  * Created by spineor on 12/12/16.
  */

//Command to be processed by the Actors
case class AddSkuToCartCmd(cart_id :UUID, sku_id :String, price :Double)
case class RemoveSkufromCmd(cart_id :UUID, sku_id :String)
case class ChangeSkuQuantityCmd(cart_id: UUID, sku_id: String, new_quantity: Int)
case class GetCartUniqueSkuCountCmd(cart_id: UUID)
case class CheckoutCartCmd(cart_id: UUID)

//Events to be persisted in Journal
case class AddedSkutoCartEvent(cart_id :UUID, sku_id:String, price :Double)
case class RemovedSkuFromCartEvent(cart_id :UUID, sku_id :String)
case class ChangeSkuQuantityEvent(cart_id: UUID, sku_id: String, new_quantity: Int)


class CartActor extends PersistentActor{
  import kafka.producerConsumer._
  var cart :Cart = _
  override def preStart() {
    val actorName = self.path.name
    println("hi2")
    var cart_entries = Map[String, CartEntry]()
    var cart_new = new Cart(UUID.fromString(actorName), cart_entries)
    cart = cart_new

  }

  override def receiveRecover: Receive = {

    case AddedSkutoCartEvent(cart_id , sku_id, price) =>
       addSkutoCart(cart_id,sku_id, price)

    case RemovedSkuFromCartEvent(cart_id , sku_id) =>
       removeSkufromCart(cart_id,sku_id)

    case ChangeSkuQuantityEvent(cart_id , sku_id, new_quantity) =>
       changeSkuQuantity(cart_id,sku_id, new_quantity)

    case CheckoutCartCmd(cart_id) =>

    case GetCartUniqueSkuCountCmd(cart_id) =>
      println("in recover")
  }

  override def receiveCommand: Receive = {

    case AddSkuToCartCmd(cart_id , sku_id, price) =>
      println("hi")
      persist(AddedSkutoCartEvent(cart_id , sku_id, price)) (evt =>{
          addSkutoCart(cart_id,sku_id, price)
        })


    case RemoveSkufromCmd(cart_id , sku_id) =>
      persist(RemovedSkuFromCartEvent(cart_id , sku_id)) (evt =>{
          removeSkufromCart(cart_id,sku_id)
        })



    case ChangeSkuQuantityCmd(cart_id, sku_id, new_quantity) =>
      persist(ChangeSkuQuantityEvent(cart_id, sku_id, new_quantity)) (evt =>{
        changeSkuQuantity(cart_id, sku_id, new_quantity)
      })

    case GetCartUniqueSkuCountCmd(cart_id) =>
       sender ! getCartUniqueSkuCount(cart_id)

    case CheckoutCartCmd(cart_id) =>
       checkoutCart(cart_id)
  }


  override def persistenceId: String = self.path.name


  def addSkutoCart(cart_id: UUID, sku_id: String, price: Double) : UUID= {
    var entries = cart.cart_entries
    val new_entry =  CartEntry(sku_id, 1, price)
    val new_entries = entries + (sku_id -> new_entry)
    cart.cart_entries = new_entries
    cart.cart_id
  }

  def changeSkuQuantity(cart_id: UUID ,sku_id: String ,new_quant: Int)= {
    var entries = cart.cart_entries
    if(entries.contains(sku_id)){
      //If sku is already present  increase quantity by 1 set price as latest received and update the entries map
      var cart_entry = entries.get(sku_id).get
      var new_quantity = new_quant
      var new_price = cart_entry.price
      val new_entry = CartEntry(sku_id, new_quantity, new_price)
      entries + (sku_id -> new_entry)
     }
    else{
      //TODO : Throw Exception , item to be updated is not present in cart
    }
    /*for ((k,v) <- entries) printf("key: %s, value: %s\n", k, v)*/
  }

  def removeSkufromCart(cart_id: UUID, sku_id: String) = {

    var entries = cart.cart_entries
    var new_entries = entries - sku_id
    cart.cart_entries = new_entries
   }

  def getCartUniqueSkuCount(cart_id: UUID): Int = {
    val count = cart.cart_entries.size
     count
  }

  def checkoutCart(cart_id: UUID) = {
    val cartProducer = Producer[Cart]("Cart-Update-Topic")
    cartProducer.send(cart)
    //Consumer Test Code
    /*val consumer = SingleTopicConsumer("Cart-Update-Topic")
    var entries = cart.cart_entries
    var cart3 = new Cart(cart.cart_id,entries)
    cartProducer.send(cart3)
    entries += "ABC" -> new CartEntry("ABC",12,213)
    val kafkaCart = consumer.read()*/
  }

}

and Service code

package services

import java.util.UUID

import actors._
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scala.concurrent.duration.Duration



/**
  * Created by spineor on 9/12/16.
  */
class CartService {

  var system = ActorSystem("System")

  def addSkutoCart(cart_id :UUID, sku_id :String, price :Double) :String = {

    //get Actor for this cart_id
    //implicit val timeout: Timeout = 20 seconds
    val cart_actor = routeMessagesToCart(cart_id)
    cart_actor ! AddSkuToCartCmd(cart_id, sku_id ,price)
    cart_actor.path.name
  }

  def removeSkuFromCart(cart_id :UUID, sku_id :String)  = {

    //get Actor for this cart_id
    val cart_actor = routeMessagesToCart(cart_id)
    cart_actor ! RemoveSkufromCmd(cart_id, sku_id )
  }

  def changeSkuQuantity(cart_id: UUID,sku_id: String ,new_quantity: Int) = {
    val cart_actor = routeMessagesToCart(cart_id)
    cart_actor ! ChangeSkuQuantityCmd(cart_id, sku_id, new_quantity)
  }

  def checkoutCart(cart_id: UUID) ={
    val cart_actor = routeMessagesToCart(cart_id)
    cart_actor ! CheckoutCartCmd(cart_id)
  }

  /*This is cental method to locate cart Actors in the node
  in case a actor is not present or has terminated ,it will be created
  In case of a terminated cart actor ,its previous events will be replayed
  from the journal first and then the new message is applied
  The Cart actor name and its persistence id both are same as the Cart's unique UUID*/

  def routeMessagesToCart(cart_id: UUID): ActorRef = {
    var cartActor :ActorRef = null

    if(null == cart_id){
      //No cart id in request create a new Cart Actor for the request
      val new_cart_id = createNewCartId()
      cartActor = system.actorOf(Props[CartActor],new_cart_id.toString)
      cartActor
    }
    //Find if actor for the cart already exists,or is Terminated
    else{
      implicit val timeout = Timeout(5 seconds)
      val name = "/user/" + cart_id.toString
      val future = system.actorSelection("/user/" + cart_id.toString).resolveOne()

      future onComplete {
        case Success(v) => { cartActor = v
        }
        case Failure(e) => {
          //The cart actor no longer exists and must be recreated
          cartActor = system.actorOf(Props[CartActor],cart_id.toString)
        }
      }
     //Sleep for Actor to be created
     Thread.sleep(500)
     cartActor
    }

  }

  /*
  This method creates a new random UUID which is used to uniquely
  identify the cart and thus cart actors
  */

  def createNewCartId() :UUID = {
    val cart_id = UUID.randomUUID()
     cart_id
  }

  def getCartUniqueSkuCount(cart_id: UUID) ={

    val cart_actor = routeMessagesToCart(cart_id)

    implicit val timeout = Timeout(15 seconds)
    val count = cart_actor ? GetCartUniqueSkuCountCmd(cart_id)
    val result = Await.result(count, 15 second)
    result
    /*count onComplete {
      case Success(x) =>  println("Unique Items in cart : " + x)
      case Failure(t) => println("An error has occured: " + t.getMessage)
    }*/


  }



}

Now I am writing a test case for the service where i create a service object and and call the service method that sends the message to the actor

val service = new CartService()
val new_cart_id = UUID.fromString(service.addSkutoCart(null, "12", 12))

val count = service.getCartUniqueSkuCount(new_cart_id)

But this time the await times out and i receive a message

[INFO] [12/28/2016 16:07:10.441] [System-akka.actor.default-dispatcher-7] [akka://System/user/fd38f1b1-6f03-4997-8625-0ce7e0ef2626] Message [actors.GetCartUniqueSkuCountCmd] from Actor[akka://System/temp/$a] to Actor[akka://System/user/fd38f1b1-6f03-4997-8625-0ce7e0ef2626#279696660] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

From some googling I understand that it has somthing to do with the sender being closed over but I am not able to fix this . Any input is appreciated.

Here is the stacktrace :

java.util.concurrent.TimeoutException: Futures timed out after [15 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at

scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at services.CartService.getCartUniqueSkuCount(CartService.scala:103) at CartActorSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(CartActorSpec.scala:48) at CartActorSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(CartActorSpec.scala:44) at CartActorSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(CartActorSpec.scala:44) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at CartActorSpec.withFixture(CartActorSpec.scala:19) at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:950) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:962) at CartActorSpec.runTest(CartActorSpec.scala:19) at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1021) at CartActorSpec.runTests(CartActorSpec.scala:19) at org.scalatest.Suite$class.run(Suite.scala:1424) at CartActorSpec.org$scalatest$WordSpecLike$$super$run(CartActorSpec.scala:19) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)

PS I do have ImplictSender trait in my test class

class CartActorSpec extends TestKit(ActorSystem("CartActorSpec"))
  with WordSpecLike
  with Matchers
  with BeforeAndAfterAll
  with ImplicitSender  {...}
0

There are 0 answers