Executing blocking calls with Mutiny and PanacheRepository

86 views Asked by At

I'm currently learning Quarkus/Microservices, starting by making a simple REST-interface with a Postgres database. My database has two tables (Event, Tag) with an m:n relationship. To create a new Event I need to fetch all the tags first before persisting it, but I can't find a way to execute these two calls consecutively.

Since I can't return an Uni<Uni> or make the REST-endpoint blocking, I tried to solve this using reactive messaging with SmallRye.

So the POST-method looks like this:

@Channel("set-tags-request")
lateinit var emitter: Emitter<Event>

@POST
@Path("/create1")
@Produces(MediaType.APPLICATION_JSON)
fun createEvent(newEvent : Event): Uni<Response> {
    val eventTags = newEvent.tags

    newEvent.tags = mutableListOf()
    return eventRepository.persistAndFlush(newEvent)
        .invoke{ it ->
            it.tags = eventTags
            emitter.send(Message.of(it, Metadata.of(TracingMetadata.withPrevious(Context.current()))))
        }
        .map { persistedEvent ->
        LOG.warn("Sending first Message")
        Response.created(URI("/events/${persistedEvent.id}")).build()
    }
}

A new Event without Tags is created. Then a SmallRye message is sent to execute the function for updating the entities Tags.

The message is received by this @Blocking function:

@Channel("set-tags-response")
private lateinit var emitter: Emitter<Event>

@Incoming("set-tags-request")
@Blocking
fun setEventTags(newEvent: Event) {
    LOG.warn("Received first message")
    val eventTags: MutableList<Tag> = mutableListOf()

    tagRepository.getAll()
        .invoke{ allTags ->
            LOG.warn("Got all tags")
            for (newEventTag in newEvent.tags) {
                val tagEntity = allTags.firstOrNull { it.name == newEventTag.name }
                if (tagEntity != null) {
                    eventTags.add(tagEntity)
                } else {
                    eventTags.add(newEventTag)
                }
            }
            newEvent.tags = eventTags

            LOG.warn("Sending second message")

            // Call another @Blocking function to persist the updated Event
            emitter.send(Message.of(newEvent, Metadata.of(TracingMetadata.withPrevious(Context.current()))))
            }.emitOn(Infrastructure.getDefaultExecutor()).runSubscriptionOn(Infrastructure.getDefaultExecutor()).subscribe()
    }

Until here everything works. The first message with topic set-tags-request arrives. But the code after that doesn't get executed, so neither LOG.warn("Sending second message") nor LOG.warn("Got all tags") are triggered.

I also tried waiting for tagRepository.getAll() using .await().indefinitely, but then I receive an error message: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [281]: 'vert.x-eventloop-thread-2' current Thread [286]: 'vert.x-worker-thread-1'

I'd be very thankful if someone could help me, as I've been banging my head against the wall for the last few days.

0

There are 0 answers