I'm writing a reactive Quarkus application using Kotlin. This is the my REST resource which receives a requests and attempts to create an Operation
from it.
@ApplicationScoped
@Path("/v1/operation")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class OperationResource(
val service: OperationService
) {
@POST
@Transactional
suspend fun create(request: CreateOperationRequest, @Context uri: UriInfo): RestResponse<Void> {
val id = service.create(request)
return RestResponse.created(uri.absolutePathBuilder.path(id.toString()).build())
}
}
This is the service which handles the business logic of the resource, persists the entity and emits a Kafka message.
@ApplicationScoped
class OperationService(
@Channel("operation-out")
val emitter: Emitter<Operation>,
val repository: OperationRepository
) {
suspend fun create(request: CreateOperationRequest): UUID {
val operation = Operation(UUID.randomUUID(), request)
repository.persist(operation)
emitter.send(operation)
return operation.id
}
}
The exception I'm getting when incorporating the @Transactional
annotation is:
io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread
.
I've experimented with the combination of @Transactional
and the @Blocking
annotations, incorporating them both at the resource layer and in the service layer without the ability to create a transactional flow where the message would be sent if and only if the entity is successfully persisted and vice versa.
@Transactional
is only meant for the database operation and it’s not a XA transaction.To solve the problem you have a couple of options:
isPersisted=false
in the message header (either to the same or a new topic) and use a 2nd consumer which will consume & saveOrUpdate the entity from that topic,@Scheduler
that periodically scans that table for resending those entities. Or make an endpoint where you can trigger resending manually.