While exploring distributed publish subscribe in new API akka.actor.typed, we see that distributed publish subscribe is achieved by representing each pub sub topic with an actor, akka.actor.typed.pubsub.Topic. In order to simulate the same , have created a local akka cluster with 3 seed nodes and one of them acting as a publisher and other two are consumer. While consuming, we see that both the subscribers are receiving same data i.e. deduplication is not happening. Please find the sample code for Publisher:

object PubApp {

  def pubsubImplementor() = {

    val pubsubGurdian = Behaviors.setup[Unit] { (context) =>
      val publisher = context.spawn(PublisherEnd(), "publisher")

      Thread.sleep(20000) //Waiting for subscriber to go up
      (1 to 20).map { i =>
        Thread.sleep(1000)
        publisher ! Data(i)
      }
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2551
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(pubsubGurdian, "SamikCluster", config)
  }


  def main(args: Array[String]): Unit = {
    pubsubImplementor()
  }

}
object PublisherEnd {

  case class Data(i : Int) extends Command
  
  def apply():Behavior[Command] = Behaviors.setup{(context) =>
    
    val topic = context.spawn(Topic[Command]("pub-sub"), "publisherActor")

    Behaviors.receive{(context,message) =>
      message match {
       
        case m@Data(i) =>
          context.log.info(s"message: ${m.i} sent to Sink")
          topic ! Topic.Publish(m)
          Behaviors.same
       
      }

    }
  }
}

Please find the sample code for subscribers/consumers:

object SubApp1 {

  def subImplementer() = {
    val subGurdian = Behaviors.setup[Unit] { (context) =>

      val subscriber1 = context.spawn(SubsriberEnd(), "subscriber1")
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2552
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(subGurdian, "SamikCluster", config)
  }

  def main(args: Array[String]): Unit = {
    subImplementer()
  }
}

object SubApp2 {

  def subImplementer() = {
    val subGurdian = Behaviors.setup[Unit] { (context) =>

      val subscriber2 = context.spawn(SubsriberEnd(), "subscriber2")
      Behaviors.same
    }

    implicit val config = ConfigFactory.parseString(
      s"""
         |akka.remote.artery.canonical.port = 2553
         |""".stripMargin
    ).withFallback(ConfigFactory.load("distributedpubsubtopic.conf"))
    ActorSystem(subGurdian, "SamikCluster", config)
  }

  def main(args: Array[String]): Unit = {
    subImplementer()
  }
}
object SubsriberEnd {


  def apply(): Behavior[Command] = Behaviors.setup[Command] { (context) =>

    val topic = context.spawn(Topic[Command]("pub-sub"), "subscriberActor")
    topic ! Topic.Subscribe(context.self)


    Behaviors.receive { (context, message) =>
      message match {
        case m@Data(i) =>
          context.log.info(s"[${context.self.path}] received message: ${m.i} in Sink")
          Behaviors.same
      }

    }
  }

}

The output that I am getting from publisher:

09:40:07.566 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 1 sent to Sink
09:40:08.567 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 2 sent to Sink
09:40:09.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 3 sent to Sink
09:40:10.570 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 4 sent to Sink
09:40:11.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 5 sent to Sink
09:40:12.576 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 6 sent to Sink
09:40:13.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 7 sent to Sink
09:40:14.581 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 8 sent to Sink
09:40:15.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 9 sent to Sink
09:40:16.582 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 10 sent to Sink
09:40:17.583 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 11 sent to Sink
09:40:18.585 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 12 sent to Sink
09:40:19.586 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 13 sent to Sink
09:40:20.590 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 14 sent to Sink
09:40:21.591 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 15 sent to Sink
09:40:22.591 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 16 sent to Sink
09:40:23.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 17 sent to Sink
09:40:24.596 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 18 sent to Sink
09:40:25.599 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 19 sent to Sink
09:40:26.602 [SamikCluster-akka.actor.default-dispatcher-17] INFO com.target.firefly.distributedpubsubtopic.PublisherEnd$ - message: 20 sent to Sink

Below are the consumers output: Consumer1 i.e. SubApp1

09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 9 in Sink
09:40:16.583 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 15 in Sink
09:40:22.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 17 in Sink
09:40:24.598 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber1] received message: 20 in Sink

Consumer2 i.e. SubApp2

09:40:07.734 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 1 in Sink
09:40:08.570 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 2 in Sink
09:40:09.572 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 3 in Sink
09:40:10.572 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 4 in Sink
09:40:11.574 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 5 in Sink
09:40:12.578 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 6 in Sink
09:40:13.582 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 7 in Sink
09:40:14.584 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 8 in Sink
09:40:15.585 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 9 in Sink
09:40:16.585 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 10 in Sink
09:40:17.584 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 11 in Sink
09:40:18.587 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 12 in Sink
09:40:19.588 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 13 in Sink
09:40:20.592 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 14 in Sink
09:40:21.593 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 15 in Sink
09:40:22.594 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 16 in Sink
09:40:23.598 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 17 in Sink
09:40:24.599 [SamikCluster-akka.actor.default-dispatcher-5] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 18 in Sink
09:40:25.600 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 19 in Sink
09:40:26.604 [SamikCluster-akka.actor.default-dispatcher-3] INFO com.target.firefly.distributedpubsubtopic.SubsriberEnd$ - [akka://SamikCluster/user/subscriber2] received message: 20 in Sink

Same message is getting published in both the subscribers(acting like a broadcast) which we do not want. Can you please tell me how we can achieve deduplication so that same message should not get published twice in different subscribers.

1

There are 1 answers

3
Levi Ramsey On

Distributed Pub Sub is designed for the one-to-many use case: it will attempt to deliver the messages published to a topic to every subscriber to that topic at the time of publishing (approximately). It is not designed to deduplicate.

The closest you could get with Distributed Pub Sub would be for a logical topic to have multiple subtopics in addition to the main topic. The publisher publishes to the main topic and an actor (probably a cluster singleton) subscribes to that topic and republishes every message to one of the subtopics (alternatively the producer can produce to the subtopic).

Note that Distributed Pub Sub does not make any useful guarantee about how often a message sent through it will be processed: it's at-most-once per subscriber, but every message may be delivered and processed by arbitrarily many subscribers, so it's an at-least-zero-times guarantee.