I trying to make an optimistic transaction using quarkus, quarkus-redis-client. sometimes the transaction failed. it is unclear what is the reason the transaction failed.
when it is failed the WithTransaction function throw this error:
java.util.concurrent.CompletionException: ERR EXEC without MULTI\n\tat io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:79)\n\tat io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)\n\tat com.deliveries.sanity.SanityCheckLambda.handleRequest(SanityCheckLambda.java:59)\n\tat com.deliveries.sanity.SanityCheckLambda.handleRequest(SanityCheckLambda.java:24)\n\tat io.quarkus.amazon.lambda.runtime.AmazonLambdaRecorder$1.processRequest(AmazonLambdaRecorder.java:167)\n\tat io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop$1.run(AbstractLambdaPollLoop.java:137)\n\tat [email protected]/java.lang.Thread.run(Thread.java:833)\n\tat org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)\n\tat org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)\nCaused by: ERR EXEC without MULTI\n
java.lang.UnsupportedOperationException: This type doesn't hold a Bulk type
at io.vertx.redis.client.Response.toBuffer(Response.java:211)
at io.vertx.mutiny.redis.client.Response.toBuffer(Response.java:180)
at io.quarkus.redis.runtime.datasource.ReactiveJsonCommandsImpl.lambda$jsonGet$3(ReactiveJsonCommandsImpl.java:95)
at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:36)
at io.smallrye.mutiny.vertx.AsyncResultUni.lambda$subscribe$1(AsyncResultUni.java:35)
at io.smallrye.mutiny.vertx.DelegatingHandler.handle(DelegatingHandler.java:25)
at io.quarkus.redis.runtime.client.ObservableRedis.lambda$send$1(ObservableRedis.java:54)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.Eventually$1.onSuccess(Eventually.java:44)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196)
at io.vertx.core.impl.future.Eventually.onSuccess(Eventually.java:41)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
at io.vertx.redis.client.impl.RedisClusterConnection.lambda$send$7(RedisClusterConnection.java:330)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.redis.client.impl.RedisStandaloneConnection.handle(RedisStandaloneConnection.java:409)
at io.vertx.redis.client.impl.RESPParser.handleResponse(RESPParser.java:316)
at io.vertx.redis.client.impl.RESPParser.handleSimpleString(RESPParser.java:231)
at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:82)
at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:24)
at io.vertx.core.net.impl.NetSocketImpl.lambda$new$1(NetSocketImpl.java:100)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:414)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:390)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:157)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at [email protected]/java.lang.Thread.run(Thread.java:833)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)
this is the function i use to make the optimistic locking sometimes it is failed and sometimes succeed.
public Uni<OptimisticLockingTransactionResult<Boolean>> setWithTransactions(String id, FullDelivery fullDelivery, long ttl) throws ExecutionException, InterruptedException {
Function<ReactiveRedisDataSource, Uni<Boolean>> conditionFunction = (ds ->
ds.json().jsonGetObject(this.fullDeliveryKeyName(id))
.onItemOrFailure().transform((result,error) -> {
if (Objects.nonNull(error)) {
return false;
}
if (result == null) {
return true;
}
return fullDelivery.getTimes().getEntryTime() >=
result.mapTo(FullDelivery.class)
.getTimes()
.getEntryTime();
})
);
BiFunction<Boolean, ReactiveTransactionalRedisDataSource, Uni<Void>> setFunction = ((greaterEntryTime, tx) -> {
ReactiveTransactionalJsonCommands<String> json = tx.json(String.class);
ReactiveTransactionalKeyCommands<String> key = tx.key(String.class);
if (Boolean.TRUE.equals(greaterEntryTime)) {
return Uni.join().all(
json.jsonSet(this.fullDeliveryKeyName(id), "$", fullDelivery),
key.expire(this.fullDeliveryKeyName(id), ttl)
)
.andCollectFailures()
.replaceWithVoid();
} else {
logger.warn("a more recent update was found in cache for full delivery #" + id);
return tx.discard();
}
});
return this.redisDataSource
.withTransaction(conditionFunction, setFunction, this.fullDeliveryKeyName(id));
}