Instrument a Source.queue

67 views Asked by At

I would like to have a Source.queue (or something analogous to it to push items on a materialized graph) that is instrumented to tell me the current level of saturation of the queue.

I'd like to do that without (re-)implementing the functionality provided by the QueueSource graph stage.

One possible solution I came up with is the following:

object InstrumentedSource {

  final class InstrumentedSourceQueueWithComplete[T](
      delegate: SourceQueueWithComplete[T],
      bufferSize: Int,
  )(implicit executionContext: ExecutionContext)
      extends SourceQueueWithComplete[T] {

    override def complete(): Unit = delegate.complete()

    override def fail(ex: Throwable): Unit = delegate.fail(ex)

    override def watchCompletion(): Future[Done] = delegate.watchCompletion()

    private val buffered = new AtomicLong(0)

    private[InstrumentedSource] def onDequeue(): Unit = {
      val _ = buffered.decrementAndGet()
    }

    object BufferSaturationRatioGauge extends RatioGauge {
      override def getRatio: RatioGauge.Ratio = RatioGauge.Ratio.of(buffered.get(), bufferSize)
    }

    lazy val bufferSaturationGauge: RatioGauge = BufferSaturationRatioGauge

    override def offer(elem: T): Future[QueueOfferResult] = {
      val result = delegate.offer(elem)
      result.foreach {
        case QueueOfferResult.Enqueued =>
          val _ = buffered.incrementAndGet()
        case _ => // do nothing
      }
      result
    }
  }

  def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy)(
      implicit executionContext: ExecutionContext,
      materializer: Materializer,
  ): Source[T, InstrumentedSourceQueueWithComplete[T]] = {
    val (queue, source) = Source.queue[T](bufferSize, overflowStrategy).preMaterialize()
    val instrumentedQueue = new InstrumentedSourceQueueWithComplete[T](queue, bufferSize)
    source.mapMaterializedValue(_ => instrumentedQueue).map { item =>
      instrumentedQueue.onDequeue()
      item
    }
  }

}

This mostly appears to work from some manual testing (apart from the fact that buffered is at most eventually consistent with the actual number of items in the queue, which should be fine in my case), but I was wondering if there is are solutions that perhaps make better use of built-in functionalities that I might have missed.

0

There are 0 answers