Stack: Micronaut-Kafka, Proto3, gRPC, BloomRPC

Goal: reiceve a grpc request from BloomRPC and post it to a kafka topic

Context: I coded a gRPC endpoint which receives a simple call successfully but when trying to post such object to Kafka topic I get

12:38:18.775 [DefaultDispatcher-worker-1] ERROR i.m.r.intercept.RecoveryInterceptor - Type [com.mybank.producer.DebitProducer$Intercepted] executed with error: Exception sending producer record for method [void sendRequestMessage(String key,DebitRequest message)]: Error serializing object to JSON: Direct self-reference leading to cycle (through reference chain: com.mybank.endpoint.DebitRequest["unknownFields"]->com.google.protobuf.UnknownFieldSet["defaultInstanceForType"])
io.micronaut.messaging.exceptions.MessagingClientException: Exception sending producer record for method [void sendRequestMessage(String key,DebitRequest message)]: Error serializing object to JSON: Direct self-reference leading to cycle (through reference chain: com.mybank.endpoint.DebitRequest["unknownFields"]->com.google.protobuf.UnknownFieldSet["defaultInstanceForType"])
    at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.wrapException(KafkaClientIntroductionAdvice.java:564)

I guess somehow my issue is related to "...Error serializing object to JSON". Do I really have to convert such object to json? Can I not just post it to kafka topic? What I am missing here? Well, I can easily fix this issue by creating a new object and setting each property to it accordingly to the request object like this:

endpoint (controller) method

override suspend fun sendDebit(request: DebitRequest): DebitReply {

    var dtoDebiter: Debiter = Debiter() 
    dtoDebiter.id = request.id.toString()
    dtoDebiter.name = request.name

    transactionService.postDebitTransaction(dtoDebiter)
    return DebitReply.newBuilder().setMessage(postStatus).build()
}

Model created only for conversion (doesn't seem weird this solution???)

class Debiter {
    lateinit var id: String
    lateinit var name: String
}

So, my straight question is: can I reuse the same autogenerated stub resulted from the proto file as a model both for receiving the request and posting it to kafka topic? If so, what is wrong in code bellow? If not, what is the recommended approach?

Full code:

transaction.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionProto";
option objc_class_prefix = "HLW";

package com.mybank.endpoint;

service Account {
  rpc SendDebit (DebitRequest) returns (DebitReply) {}
}

message DebitRequest {
  int64 id = 1;
  string name = 2;
}

message DebitReply {
  string message = 1;
}

transaction endpoint (controller)

package com.mybank.endpoint

import com.mybank.dto.Debiter
import com.mybank.service.TransactionService
import javax.inject.Singleton

@Singleton
@Suppress("unused")
class TransactionEndpoint(val transactionService: TransactionService) : AccountGrpcKt.AccountCoroutineImplBase(){

    override suspend fun sendDebit(request: DebitRequest): DebitReply {

        var postStatus: String = transactionService.postDebitTransaction(request)
        return DebitReply.newBuilder().setMessage(postStatus).build()
    }

}

transaction service (not really relevant here)

package com.mybank.service

import com.mybank.dto.Debiter
import com.mybank.dto.Transaction
import com.mybank.dto.Transactions
import com.mybank.endpoint.DebitRequest
import com.mybank.producer.DebitProducer
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton

@Singleton
class TransactionService(){

    @Inject
    @Named("debitProducer")
    lateinit var debitProducer : DebitProducer

    fun postDebitTransaction(debit: DebitRequest) : String{

        debitProducer.sendRequestMessage ("1", debit)

        return "posted"
    }
}

kafka producer

package com.mybank.producer

import com.mybank.dto.Transaction
import com.mybank.dto.Transactions
import com.mybank.endpoint.DebitRequest
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic

@KafkaClient
public interface DebitProducer {

    @Topic("debit")
    fun sendRequestMessage(@KafkaKey key: String?, message: DebitRequest?) {
    }

}

build.gradle

plugins {
    id "org.jetbrains.kotlin.jvm" version "1.3.72"
    id "org.jetbrains.kotlin.kapt" version "1.3.72"
    id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
    id "application"
    id 'com.google.protobuf' version '0.8.13'
}

version "0.2"
group "account-control"

repositories {
    mavenLocal()
    jcenter()
}

configurations {
    // for dependencies that are needed for development only
    developmentOnly
}

dependencies {
    kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
    kapt("io.micronaut:micronaut-inject-java")
    kapt("io.micronaut:micronaut-validation")

    implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
    implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
    implementation("io.micronaut:micronaut-runtime")
//    implementation("io.micronaut.grpc:micronaut-grpc-runtime")
    implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
    implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
    implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")

    //Kafka
    implementation("io.micronaut.kafka:micronaut-kafka")

    //vertx
    implementation("io.micronaut.sql:micronaut-vertx-mysql-client")
    //implementation("io.micronaut.configuration:micronaut-vertx-mysql-client")
    compile 'io.vertx:vertx-lang-kotlin:3.9.4'

    //mongodb
    implementation("org.mongodb:mongodb-driver-reactivestreams:4.1.1")

    runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
    runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")

    kaptTest("io.micronaut:micronaut-inject-java")

    testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
    testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
    testImplementation("io.micronaut.test:micronaut-test-junit5")
    testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")

    testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
    testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}

test.classpath += configurations.developmentOnly

mainClassName = "account-control.Application"

test {
    useJUnitPlatform()
}

allOpen {
    annotation("io.micronaut.aop.Around")
}

compileKotlin {
    kotlinOptions {
        jvmTarget = '11' 
        //Will retain parameter names for Java reflection
        javaParameters = true 
    }
}
//compileKotlin.dependsOn(generateProto)

compileTestKotlin {
    kotlinOptions {
        jvmTarget = '11' 
        javaParameters = true 
    }
}

tasks.withType(JavaExec) {
    classpath += configurations.developmentOnly
    jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}

sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/grpckt'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}

protobuf {
    protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
    plugins {
        grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
        grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
            grpckt {}
        }
    }
}

in case it is relevant, here is a piece of the stub autogenerated by proto

    // Generated by the protocol buffer compiler.  DO NOT EDIT!
    // source: transaction.proto
    
    package com.mybank.endpoint;
    
    /**
     * Protobuf type {@code com.mybank.endpoint.DebitRequest}
     */
    public final class DebitRequest extends
        com.google.protobuf.GeneratedMessageV3 implements
        // @@protoc_insertion_point(message_implements:com.mybank.endpoint.DebitRequest)
        DebitRequestOrBuilder {
    private static final long serialVersionUID = 0L;
      // Use DebitRequest.newBuilder() to construct.
      private DebitRequest(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
        super(builder);
      }
      private DebitRequest() {
        name_ = "";
      }
    
      @java.lang.Override
      @SuppressWarnings({"unused"})
      protected java.lang.Object newInstance(
          UnusedPrivateParameter unused) {
        return new DebitRequest();
      }
    
      @java.lang.Override
      public final com.google.protobuf.UnknownFieldSet
      getUnknownFields() {
        return this.unknownFields;
      }
      private DebitRequest(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws com.google.protobuf.InvalidProtocolBufferException {
        this();
...

*** First edition

@KafkaClient(
        id="debit-client",
        acks = KafkaClient.Acknowledge.ALL,
        properties = [Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")]
        //How add these two properties in order to use Protobuf Serializer
        //kafka.producers.*.key-serializer
        //kafka.producers.*.value-serializer
)
public interface DebitProducer {

    @Topic("debit")
    fun sendRequestMessage(@KafkaKey key: String?, message: DebitRequest?) {
    }

*** Second Edition

I tried added the serializer via application.yaml

micronaut:
    application:
        name: account-control
grpc:
    server:
        port: 8082
kafka:
    producers:
        product-client:
            value:
                serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

gradle.build

//kafka-protobuf-serializer
    implementation("io.confluent:kafka-protobuf-serializer:6.0.0")

and now I get during gradle build

Execution failed for task ':extractIncludeProto'.
> Could not resolve all files for configuration ':compileProtoPath'.
   > Could not resolve com.squareup.wire:wire-schema:3.2.2.
     Required by:
         project : > io.confluent:kafka-protobuf-serializer:6.0.0 > io.confluent:kafka-protobuf-provider:6.0.0
      > The consumer was configured to find a component, preferably only the resources files. However we cannot choose between the following variants of com.squareup.wire:wire-schema:3.2.2:
          - jvm-api
          - jvm-runtime
          - metadata-api
        All of them match the consumer attributes:
          - Variant 'jvm-api' capability com.squareup.wire:wire-schema:3.2.2 declares a component, packaged as a jar:
              - Unmatched attributes:
                  - Provides release status but the consumer didn't ask for it
                  - Provides an API but the consumer didn't ask for it
                  - Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but the consumer didn't ask for it
          - Variant 'jvm-runtime' capability com.squareup.wire:wire-schema:3.2.2 declares a component, packaged as a jar:
              - Unmatched attributes:
                  - Provides release status but the consumer didn't ask for it
                  - Provides a runtime but the consumer didn't ask for it
                  - Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but the consumer didn't ask for it
          - Variant 'metadata-api' capability com.squareup.wire:wire-schema:3.2.2:
              - Unmatched attributes:
                  - Doesn't say anything about its elements (required them preferably only the resources files)
                  - Provides release status but the consumer didn't ask for it
                  - Provides a usage of 'kotlin-api' but the consumer didn't ask for it
                  - Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'common' but the consumer didn't ask for it

* Try:
Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Exception is:
org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':extractIncludeProto'.
    at org.gradle.api.internal.tasks.execution.CatchExceptionTaskExecuter.execute(CatchExceptionTaskExecuter.java:38)
    at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.executeTask(EventFiringTaskExecuter.java:77)
    at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.call(EventFiringTaskExecuter.java:55)
    at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.call(EventFiringTaskExecuter.java:52)
    at org.gradle.internal.operations.DefaultBuildOperationExecutor$CallableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:416)
    at org.gradle.internal.operations.DefaultBuildOperationExecutor$CallableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:406)
    at org.gradle.internal.operations.DefaultBuildOperationExecutor$1.execute(DefaultBuildOperationExecutor.java:165)
    at org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:250)
    at org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:158)
    at org.gradle.internal.operations.DefaultBuildOperationExecutor.call(DefaultBuildOperationExecutor.java:102)
    at org.gradle.internal.operations.DelegatingBuildOperationExecutor.call(DelegatingBuildOperationExecutor.java:36)
    at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter.execute(EventFiringTaskExecuter.java:52)
    at org.gradle.execution.plan.LocalTaskNodeExecutor.execute(LocalTaskNodeExecutor.java:41)
    at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$InvokeNodeExecutorsAction.execute(DefaultTaskExecutionGraph.java:370)
    at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$InvokeNodeExecutorsAction.execute(DefaultTaskExecutionGraph.java:357)
    at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$BuildOperationAwareExecutionAction.execute(DefaultTaskExecutionGraph.java:350)
    at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$BuildOperationAwareExecutionAction.execute(DefaultTaskExecutionGraph.java:336)
    at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.lambda$run$0(DefaultPlanExecutor.java:127)
    at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.execute(DefaultPlanExecutor.java:191)
    at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.executeNextNode(DefaultPlanExecutor.java:182)
    at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.run(DefaultPlanExecutor.java:124)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
Caused by: org.gradle.api.internal.artifacts.ivyservice.DefaultLenientConfiguration$ArtifactResolveException: Could not resolve all files for configuration ':compileProtoPath'.
    at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.rethrowFailure(DefaultConfiguration.java:1265)
    at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.access$1800(DefaultConfiguration.java:141)
    at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration$ConfigurationFileCollection.visitContents(DefaultConfiguration.java:1242)
    at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration$ConfigurationFileCollection.visitContents(DefaultConfiguration.java:1235)
    at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.visitContents(DefaultConfiguration.java:489)
    at org.gradle.api.internal.file.AbstractFileCollection.visitStructure(AbstractFileCollection.java:265)
    at org.gradle.api.internal.file.CompositeFileCollection.visitContents(CompositeFileCollection.java:152)
    at org.gradle.api.internal.file.AbstractFileCollection.visitStructure(AbstractFileCollection.java:265)
    at org.gradle.internal.fingerprint.impl.DefaultFileCollectionSnapshotter.snapshot(DefaultFileCollectionSnapshotter.java:50)
    at org.gradle.internal.fingerprint.impl.AbstractFileCollectionFingerprinter.fingerprint(AbstractFileCollectionFingerprinter.java:47)
    at ...

ultPlanExecutor.java:182)
    at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.run(DefaultPlanExecutor.java:124)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
0

There are 0 answers