How to add fixed delay for each Task in monix without building up delay

289 views Asked by At

I'm consuming messages from kafka using monix.

I want to process the message 10 sec after the message is read from the topic.

This delay of 10 sec minute shouldn't block reading more messages.

I've tried to test this behaviour using the following code using the Task.delayExecution to delay by 10 sec

I've also tried Observable.delayOnExecution and Observable.delayOnNext

import monix.eval.Task
import monix.reactive.Observable

object TestApp extends App {
  import java.util.Date
  case class SomeClass(
                        value: Int, consumingDate: Date,
                        handlerExecutionDate: Date = null
                      )
  import scala.concurrent.duration._
  import scala.concurrent.Await
  import monix.execution.Scheduler.Implicits.global

  val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date)))
    .mapEval(i =>
      Task.delay(
        SomeClass(
          value = i.value,
          consumingDate = i.consumingDate
        )
      ).delayExecution(10 seconds)
    )
    .foreachL{ x =>
      val r = SomeClass(
        x.value,
        x.consumingDate,
        new Date()
      )
      println(r)
    }.runToFuture
  Await.result(res, 100.seconds)
}

But the above code adds up delay for each message. 1st message delays by 10 sec, but 2nd delays by 20 sec, 3rd by 30 sec and so on.

Is it possible to do something like this using monix?

I'm considering other alternatives to monix based solution, like using an In-memory Queue. The consumer will keep pushing to the Queue until it's limit is reached and then.

Update :

I've found a solution using Task.eval(<current_time>).restartUntil(<condition>)

Adding the code for that below.

package com.agoda.cusco.campaign.data.collector.consumer

import monix.eval.Task
import monix.reactive.Observable

object TestApp extends App {
  import java.util.Date
  case class SomeClass(
                        value: Int, consumingDate: Date,
                        handlerExecutionDate: Date = null
                      )
  import scala.concurrent.duration._
  import scala.concurrent.Await
  import monix.execution.Scheduler.Implicits.global

  val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date(System.currentTimeMillis + i*100))))
    .mapEval(message =>
      Task.eval(new Date()).restartUntil(currentTime => (currentTime.getSeconds - message.consumingDate.getSeconds) > 10).map(_ => message)
    )
    .foreachL{ x =>
      val r = SomeClass(
        x.value,
        x.consumingDate,
        new Date()
      )
      println(r)
    }.runToFuture
  Await.result(res, 100.seconds)
}

I'm not completely sure if it's the ideal solution as it seems to be doing active CPU computation to make it work.

Would like to check if there are any better alternatives.

0

There are 0 answers