Corda: CollectSignaturesFlow produces UnsupportedOperationException

148 views Asked by At

Found that CollectSignaturesFlow produces the UnsupportedOperationException because at some point Krio is unable to serialize one of the collections used in the flow. The p2p CollectSignatureFlow , which is used in it, works fine, nothing breaks. I also implemented gathering signatures in more simplier way (yeah, not that many checks, but it is just for testing purposes), and it does the job.

The code:

   @InitiatingFlow
    @StartableByRPC
    class CheckoutClientFlow(
            private val person: Person = ClientAccountFlows.defaultPerson,
            private val partiesAndAmounts: Map<CordaX500Name, Long>
    ) : FlowLogic<SignedTransaction>() {
        override val progressTracker: ProgressTracker = tracker()
        private var selfIncluded = false
        @Suspendable
        override fun call(): SignedTransaction {
            progressTracker.currentStep = NotaryInfo
            val notary: Party = getPreferredNotary(serviceHub)
            progressTracker.currentStep = LocateOtherParties
            val parties = locateParties(partiesAndAmounts.keys).toMutableSet()
            if (parties.contains(ourIdentity)) {
                selfIncluded = true
                parties.remove(ourIdentity)
            }
            val inputs = subFlow(GatherStateAndRefFlow(person, parties))
            val otherPartiesSessions = parties.map { initiateFlow(it) }
            progressTracker.currentStep = CreateTransaction
            val inputsWithParties = inputs.mapKeys { entry -> locateParty(entry.key) }
            val txBuilder = createTxBuilder(notary, inputsWithParties)
            var onceSignedTransaction = serviceHub.signInitialTransaction(txBuilder)
            progressTracker.currentStep = SigningByOtherParty
            val signedTx = subFlow(CollectSignaturesFlow(onceSignedTransaction, otherPartiesSessions))
//            otherPartiesSessions.forEach {
//                onceSignedTransaction += subFlow(CollectSignatureFlow(onceSignedTransaction, it))
//            }
            progressTracker.currentStep = Finalize
            return subFlow(FinalityFlow(transaction = onceSignedTransaction, sessions = otherPartiesSessions))
        }
    }
}

The test:

@Test
    fun `subtract if both parties are present other`() {
        val person = defaultPerson
        val partiesAndAmounts = mapOf(Pair(aNodeName, defaultMovableAmount), Pair(bNodeName, defaultMovableAmount))
        val resultFuture = a.startFlow(ExchangeValueMultipleParties.CheckoutClientFlow(
                person = defaultPerson,
                partiesAndAmounts = partiesAndAmounts
        ))
        network.runNetwork()
        val res1 = resultFuture.get()
        assertNotNull(res1)
        assertNotEquals(0, res1.inputs.size)
        assertEquals(partiesAndAmounts.size, res1.tx.outputs.size)
        assertEquals(partiesAndAmounts.size, res1.sigs.size)
        val stateA = a.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
        assertNotNull(stateA)
//        assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateA.state.data.balance))
        val stateB = b.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
        assertNotNull(stateB)
//        assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateB.state.data.balance))
    }

The stack trace:

[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
    at java.util.AbstractCollection.add(AbstractCollection.java:262)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
    at java.util.AbstractCollection.add(AbstractCollection.java:262)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]

It seems that Kryo is trying to call a method add() that is not supported by AbstractCollection, to reconstruct some collection used in a flow, thought the method is implemented.

Here is the implementation in AbstractCollection.

    public boolean add(E var1) {
        throw new UnsupportedOperationException();
    }

Also when a debug is ran thread starvation may be observed:

[WARN] 19:55:47,111 [HikariPool-3 housekeeper] pool.HikariPool. - HikariPool-3 - Thread starvation or clock leap detected (housekeeper delta=1m35s902ms765µs600ns).
[WARN] 19:55:47,111 [HikariPool-1 housekeeper] pool.HikariPool. - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m49s546ms362µs501ns).
[WARN] 19:55:47,113 [HikariPool-2 housekeeper] pool.HikariPool. - HikariPool-2 - Thread starvation or clock leap detected (housekeeper delta=1m37s789ms188µs200ns).
[WARN] 19:55:47,114 [HikariPool-4 housekeeper] pool.HikariPool. - HikariPool-4 - Thread starvation or clock leap detected (housekeeper delta=1m33s803ms330µs901ns).
0

There are 0 answers