I'm currently learning Quarkus/Microservices, starting by making a simple REST-interface with a Postgres database. My database has two tables (Event, Tag) with an m:n relationship. To create a new Event I need to fetch all the tags first before persisting it, but I can't find a way to execute these two calls consecutively.
Since I can't return an Uni<Uni> or make the REST-endpoint blocking, I tried to solve this using reactive messaging with SmallRye.
So the POST-method looks like this:
@Channel("set-tags-request")
lateinit var emitter: Emitter<Event>
@POST
@Path("/create1")
@Produces(MediaType.APPLICATION_JSON)
fun createEvent(newEvent : Event): Uni<Response> {
val eventTags = newEvent.tags
newEvent.tags = mutableListOf()
return eventRepository.persistAndFlush(newEvent)
.invoke{ it ->
it.tags = eventTags
emitter.send(Message.of(it, Metadata.of(TracingMetadata.withPrevious(Context.current()))))
}
.map { persistedEvent ->
LOG.warn("Sending first Message")
Response.created(URI("/events/${persistedEvent.id}")).build()
}
}
A new Event without Tags is created. Then a SmallRye message is sent to execute the function for updating the entities Tags.
The message is received by this @Blocking
function:
@Channel("set-tags-response")
private lateinit var emitter: Emitter<Event>
@Incoming("set-tags-request")
@Blocking
fun setEventTags(newEvent: Event) {
LOG.warn("Received first message")
val eventTags: MutableList<Tag> = mutableListOf()
tagRepository.getAll()
.invoke{ allTags ->
LOG.warn("Got all tags")
for (newEventTag in newEvent.tags) {
val tagEntity = allTags.firstOrNull { it.name == newEventTag.name }
if (tagEntity != null) {
eventTags.add(tagEntity)
} else {
eventTags.add(newEventTag)
}
}
newEvent.tags = eventTags
LOG.warn("Sending second message")
// Call another @Blocking function to persist the updated Event
emitter.send(Message.of(newEvent, Metadata.of(TracingMetadata.withPrevious(Context.current()))))
}.emitOn(Infrastructure.getDefaultExecutor()).runSubscriptionOn(Infrastructure.getDefaultExecutor()).subscribe()
}
Until here everything works. The first message with topic set-tags-request
arrives. But the code after that doesn't get executed, so neither LOG.warn("Sending second message")
nor LOG.warn("Got all tags")
are triggered.
I also tried waiting for tagRepository.getAll()
using .await().indefinitely
, but then I receive an error message:
java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [281]: 'vert.x-eventloop-thread-2' current Thread [286]: 'vert.x-worker-thread-1'
I'd be very thankful if someone could help me, as I've been banging my head against the wall for the last few days.