How to transactionally write a Kafka record and persist it using JDBC in Quarkus/Kotlin

74 views Asked by At

I'm writing a reactive Quarkus application using Kotlin. This is the my REST resource which receives a requests and attempts to create an Operation from it.

@ApplicationScoped
@Path("/v1/operation")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class OperationResource(
    val service: OperationService
) {

    @POST
    @Transactional
    suspend fun create(request: CreateOperationRequest, @Context uri: UriInfo): RestResponse<Void> {
        val id = service.create(request)
        return RestResponse.created(uri.absolutePathBuilder.path(id.toString()).build())
    }
}

This is the service which handles the business logic of the resource, persists the entity and emits a Kafka message.

@ApplicationScoped
class OperationService(
    @Channel("operation-out")
    val emitter: Emitter<Operation>,
    val repository: OperationRepository
) {

    suspend fun create(request: CreateOperationRequest): UUID {
        val operation = Operation(UUID.randomUUID(), request)
        repository.persist(operation)
        emitter.send(operation)
        return operation.id
    }
}

The exception I'm getting when incorporating the @Transactional annotation is:

io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread.

I've experimented with the combination of @Transactional and the @Blocking annotations, incorporating them both at the resource layer and in the service layer without the ability to create a transactional flow where the message would be sent if and only if the entity is successfully persisted and vice versa.

1

There are 1 answers

2
Serkan On

@Transactional is only meant for the database operation and it’s not a XA transaction.

To solve the problem you have a couple of options:

  • save the entity first and use Debezium/Kafka-Connect to stream the entity to a Kafka topic (aka outbox pattern),
  • send the message first, maybe with isPersisted=false in the message header (either to the same or a new topic) and use a 2nd consumer which will consume & saveOrUpdate the entity from that topic,
  • not a real solution but rather a workaround: Use @Fallback on your current method which only fallbacks in the case sending the message fails. On the fallback method save the entity into a new (retry) table and either add a @Scheduler that periodically scans that table for resending those entities. Or make an endpoint where you can trigger resending manually.