I'm consuming messages from kafka using monix.
I want to process the message 10 sec after the message is read from the topic.
This delay of 10 sec minute shouldn't block reading more messages.
I've tried to test this behaviour using the following code using the Task.delayExecution to delay by 10 sec
I've also tried Observable.delayOnExecution
and Observable.delayOnNext
import monix.eval.Task
import monix.reactive.Observable
object TestApp extends App {
import java.util.Date
case class SomeClass(
value: Int, consumingDate: Date,
handlerExecutionDate: Date = null
)
import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global
val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date)))
.mapEval(i =>
Task.delay(
SomeClass(
value = i.value,
consumingDate = i.consumingDate
)
).delayExecution(10 seconds)
)
.foreachL{ x =>
val r = SomeClass(
x.value,
x.consumingDate,
new Date()
)
println(r)
}.runToFuture
Await.result(res, 100.seconds)
}
But the above code adds up delay for each message. 1st message delays by 10 sec, but 2nd delays by 20 sec, 3rd by 30 sec and so on.
Is it possible to do something like this using monix?
I'm considering other alternatives to monix based solution, like using an In-memory Queue. The consumer will keep pushing to the Queue until it's limit is reached and then.
Update :
I've found a solution using Task.eval(<current_time>).restartUntil(<condition>)
Adding the code for that below.
package com.agoda.cusco.campaign.data.collector.consumer
import monix.eval.Task
import monix.reactive.Observable
object TestApp extends App {
import java.util.Date
case class SomeClass(
value: Int, consumingDate: Date,
handlerExecutionDate: Date = null
)
import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global
val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date(System.currentTimeMillis + i*100))))
.mapEval(message =>
Task.eval(new Date()).restartUntil(currentTime => (currentTime.getSeconds - message.consumingDate.getSeconds) > 10).map(_ => message)
)
.foreachL{ x =>
val r = SomeClass(
x.value,
x.consumingDate,
new Date()
)
println(r)
}.runToFuture
Await.result(res, 100.seconds)
}
I'm not completely sure if it's the ideal solution as it seems to be doing active CPU computation to make it work.
Would like to check if there are any better alternatives.