Error trying to make an optimistic transaction to ElastiCache using quarkus

89 views Asked by At

I trying to make an optimistic transaction using quarkus, quarkus-redis-client. sometimes the transaction failed. it is unclear what is the reason the transaction failed.

when it is failed the WithTransaction function throw this error:

java.util.concurrent.CompletionException: ERR EXEC without MULTI\n\tat io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:79)\n\tat io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)\n\tat com.deliveries.sanity.SanityCheckLambda.handleRequest(SanityCheckLambda.java:59)\n\tat com.deliveries.sanity.SanityCheckLambda.handleRequest(SanityCheckLambda.java:24)\n\tat io.quarkus.amazon.lambda.runtime.AmazonLambdaRecorder$1.processRequest(AmazonLambdaRecorder.java:167)\n\tat io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop$1.run(AbstractLambdaPollLoop.java:137)\n\tat [email protected]/java.lang.Thread.run(Thread.java:833)\n\tat org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)\n\tat org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)\nCaused by: ERR EXEC without MULTI\n



java.lang.UnsupportedOperationException: This type doesn't hold a Bulk type
    at io.vertx.redis.client.Response.toBuffer(Response.java:211)
    at io.vertx.mutiny.redis.client.Response.toBuffer(Response.java:180)
    at io.quarkus.redis.runtime.datasource.ReactiveJsonCommandsImpl.lambda$jsonGet$3(ReactiveJsonCommandsImpl.java:95)
    at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:36)
    at io.smallrye.mutiny.vertx.AsyncResultUni.lambda$subscribe$1(AsyncResultUni.java:35)
    at io.smallrye.mutiny.vertx.DelegatingHandler.handle(DelegatingHandler.java:25)
    at io.quarkus.redis.runtime.client.ObservableRedis.lambda$send$1(ObservableRedis.java:54)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.Eventually$1.onSuccess(Eventually.java:44)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196)
    at io.vertx.core.impl.future.Eventually.onSuccess(Eventually.java:41)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
    at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
    at io.vertx.redis.client.impl.RedisClusterConnection.lambda$send$7(RedisClusterConnection.java:330)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
    at io.vertx.redis.client.impl.RedisStandaloneConnection.handle(RedisStandaloneConnection.java:409)
    at io.vertx.redis.client.impl.RESPParser.handleResponse(RESPParser.java:316)
    at io.vertx.redis.client.impl.RESPParser.handleSimpleString(RESPParser.java:231)
    at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:82)
    at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:24)
    at io.vertx.core.net.impl.NetSocketImpl.lambda$new$1(NetSocketImpl.java:100)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
    at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:414)
    at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
    at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
    at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:390)
    at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:157)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at [email protected]/java.lang.Thread.run(Thread.java:833)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)

this is the function i use to make the optimistic locking sometimes it is failed and sometimes succeed.

 public Uni<OptimisticLockingTransactionResult<Boolean>> setWithTransactions(String id, FullDelivery fullDelivery, long ttl) throws ExecutionException, InterruptedException {
    Function<ReactiveRedisDataSource, Uni<Boolean>> conditionFunction = (ds ->
            ds.json().jsonGetObject(this.fullDeliveryKeyName(id))
                    .onItemOrFailure().transform((result,error) -> {
                        if (Objects.nonNull(error)) {
                            return false;
                        }
                        if (result == null) {
                            return true;
                        }
                        return fullDelivery.getTimes().getEntryTime() >=
                                result.mapTo(FullDelivery.class)
                                        .getTimes()
                                        .getEntryTime();
                    })
    );

    BiFunction<Boolean, ReactiveTransactionalRedisDataSource, Uni<Void>> setFunction = ((greaterEntryTime, tx) -> {
        ReactiveTransactionalJsonCommands<String> json = tx.json(String.class);
        ReactiveTransactionalKeyCommands<String> key = tx.key(String.class);

        if (Boolean.TRUE.equals(greaterEntryTime)) {
            return Uni.join().all(
                            json.jsonSet(this.fullDeliveryKeyName(id), "$", fullDelivery),
                            key.expire(this.fullDeliveryKeyName(id), ttl)
                    )
                    .andCollectFailures()
                    .replaceWithVoid();
        } else {
            logger.warn("a more recent update was found in cache for full delivery #" + id);
            return tx.discard();
        }
    });

    return this.redisDataSource
            .withTransaction(conditionFunction, setFunction, this.fullDeliveryKeyName(id));
}                     


                                                                                      
                                                                                                                  
0

There are 0 answers