Stack: Micronaut-Kafka, Proto3, gRPC, BloomRPC
Goal: reiceve a grpc request from BloomRPC and post it to a kafka topic
Context: I coded a gRPC endpoint which receives a simple call successfully but when trying to post such object to Kafka topic I get
12:38:18.775 [DefaultDispatcher-worker-1] ERROR i.m.r.intercept.RecoveryInterceptor - Type [com.mybank.producer.DebitProducer$Intercepted] executed with error: Exception sending producer record for method [void sendRequestMessage(String key,DebitRequest message)]: Error serializing object to JSON: Direct self-reference leading to cycle (through reference chain: com.mybank.endpoint.DebitRequest["unknownFields"]->com.google.protobuf.UnknownFieldSet["defaultInstanceForType"])
io.micronaut.messaging.exceptions.MessagingClientException: Exception sending producer record for method [void sendRequestMessage(String key,DebitRequest message)]: Error serializing object to JSON: Direct self-reference leading to cycle (through reference chain: com.mybank.endpoint.DebitRequest["unknownFields"]->com.google.protobuf.UnknownFieldSet["defaultInstanceForType"])
at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.wrapException(KafkaClientIntroductionAdvice.java:564)
I guess somehow my issue is related to "...Error serializing object to JSON". Do I really have to convert such object to json? Can I not just post it to kafka topic? What I am missing here? Well, I can easily fix this issue by creating a new object and setting each property to it accordingly to the request object like this:
endpoint (controller) method
override suspend fun sendDebit(request: DebitRequest): DebitReply {
var dtoDebiter: Debiter = Debiter()
dtoDebiter.id = request.id.toString()
dtoDebiter.name = request.name
transactionService.postDebitTransaction(dtoDebiter)
return DebitReply.newBuilder().setMessage(postStatus).build()
}
Model created only for conversion (doesn't seem weird this solution???)
class Debiter {
lateinit var id: String
lateinit var name: String
}
So, my straight question is: can I reuse the same autogenerated stub resulted from the proto file as a model both for receiving the request and posting it to kafka topic? If so, what is wrong in code bellow? If not, what is the recommended approach?
Full code:
transaction.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionProto";
option objc_class_prefix = "HLW";
package com.mybank.endpoint;
service Account {
rpc SendDebit (DebitRequest) returns (DebitReply) {}
}
message DebitRequest {
int64 id = 1;
string name = 2;
}
message DebitReply {
string message = 1;
}
transaction endpoint (controller)
package com.mybank.endpoint
import com.mybank.dto.Debiter
import com.mybank.service.TransactionService
import javax.inject.Singleton
@Singleton
@Suppress("unused")
class TransactionEndpoint(val transactionService: TransactionService) : AccountGrpcKt.AccountCoroutineImplBase(){
override suspend fun sendDebit(request: DebitRequest): DebitReply {
var postStatus: String = transactionService.postDebitTransaction(request)
return DebitReply.newBuilder().setMessage(postStatus).build()
}
}
transaction service (not really relevant here)
package com.mybank.service
import com.mybank.dto.Debiter
import com.mybank.dto.Transaction
import com.mybank.dto.Transactions
import com.mybank.endpoint.DebitRequest
import com.mybank.producer.DebitProducer
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class TransactionService(){
@Inject
@Named("debitProducer")
lateinit var debitProducer : DebitProducer
fun postDebitTransaction(debit: DebitRequest) : String{
debitProducer.sendRequestMessage ("1", debit)
return "posted"
}
}
kafka producer
package com.mybank.producer
import com.mybank.dto.Transaction
import com.mybank.dto.Transactions
import com.mybank.endpoint.DebitRequest
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaClient
public interface DebitProducer {
@Topic("debit")
fun sendRequestMessage(@KafkaKey key: String?, message: DebitRequest?) {
}
}
build.gradle
plugins {
id "org.jetbrains.kotlin.jvm" version "1.3.72"
id "org.jetbrains.kotlin.kapt" version "1.3.72"
id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
id "application"
id 'com.google.protobuf' version '0.8.13'
}
version "0.2"
group "account-control"
repositories {
mavenLocal()
jcenter()
}
configurations {
// for dependencies that are needed for development only
developmentOnly
}
dependencies {
kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
kapt("io.micronaut:micronaut-inject-java")
kapt("io.micronaut:micronaut-validation")
implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
implementation("io.micronaut:micronaut-runtime")
// implementation("io.micronaut.grpc:micronaut-grpc-runtime")
implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")
//Kafka
implementation("io.micronaut.kafka:micronaut-kafka")
//vertx
implementation("io.micronaut.sql:micronaut-vertx-mysql-client")
//implementation("io.micronaut.configuration:micronaut-vertx-mysql-client")
compile 'io.vertx:vertx-lang-kotlin:3.9.4'
//mongodb
implementation("org.mongodb:mongodb-driver-reactivestreams:4.1.1")
runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
kaptTest("io.micronaut:micronaut-inject-java")
testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
testImplementation("io.micronaut.test:micronaut-test-junit5")
testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")
testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}
test.classpath += configurations.developmentOnly
mainClassName = "account-control.Application"
test {
useJUnitPlatform()
}
allOpen {
annotation("io.micronaut.aop.Around")
}
compileKotlin {
kotlinOptions {
jvmTarget = '11'
//Will retain parameter names for Java reflection
javaParameters = true
}
}
//compileKotlin.dependsOn(generateProto)
compileTestKotlin {
kotlinOptions {
jvmTarget = '11'
javaParameters = true
}
}
tasks.withType(JavaExec) {
classpath += configurations.developmentOnly
jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/grpckt'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
}
generateProtoTasks {
all()*.plugins {
grpc {}
grpckt {}
}
}
}
in case it is relevant, here is a piece of the stub autogenerated by proto
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: transaction.proto
package com.mybank.endpoint;
/**
* Protobuf type {@code com.mybank.endpoint.DebitRequest}
*/
public final class DebitRequest extends
com.google.protobuf.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:com.mybank.endpoint.DebitRequest)
DebitRequestOrBuilder {
private static final long serialVersionUID = 0L;
// Use DebitRequest.newBuilder() to construct.
private DebitRequest(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private DebitRequest() {
name_ = "";
}
@java.lang.Override
@SuppressWarnings({"unused"})
protected java.lang.Object newInstance(
UnusedPrivateParameter unused) {
return new DebitRequest();
}
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private DebitRequest(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
this();
...
*** First edition
@KafkaClient(
id="debit-client",
acks = KafkaClient.Acknowledge.ALL,
properties = [Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")]
//How add these two properties in order to use Protobuf Serializer
//kafka.producers.*.key-serializer
//kafka.producers.*.value-serializer
)
public interface DebitProducer {
@Topic("debit")
fun sendRequestMessage(@KafkaKey key: String?, message: DebitRequest?) {
}
*** Second Edition
I tried added the serializer via application.yaml
micronaut:
application:
name: account-control
grpc:
server:
port: 8082
kafka:
producers:
product-client:
value:
serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
gradle.build
//kafka-protobuf-serializer
implementation("io.confluent:kafka-protobuf-serializer:6.0.0")
and now I get during gradle build
Execution failed for task ':extractIncludeProto'.
> Could not resolve all files for configuration ':compileProtoPath'.
> Could not resolve com.squareup.wire:wire-schema:3.2.2.
Required by:
project : > io.confluent:kafka-protobuf-serializer:6.0.0 > io.confluent:kafka-protobuf-provider:6.0.0
> The consumer was configured to find a component, preferably only the resources files. However we cannot choose between the following variants of com.squareup.wire:wire-schema:3.2.2:
- jvm-api
- jvm-runtime
- metadata-api
All of them match the consumer attributes:
- Variant 'jvm-api' capability com.squareup.wire:wire-schema:3.2.2 declares a component, packaged as a jar:
- Unmatched attributes:
- Provides release status but the consumer didn't ask for it
- Provides an API but the consumer didn't ask for it
- Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but the consumer didn't ask for it
- Variant 'jvm-runtime' capability com.squareup.wire:wire-schema:3.2.2 declares a component, packaged as a jar:
- Unmatched attributes:
- Provides release status but the consumer didn't ask for it
- Provides a runtime but the consumer didn't ask for it
- Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'jvm' but the consumer didn't ask for it
- Variant 'metadata-api' capability com.squareup.wire:wire-schema:3.2.2:
- Unmatched attributes:
- Doesn't say anything about its elements (required them preferably only the resources files)
- Provides release status but the consumer didn't ask for it
- Provides a usage of 'kotlin-api' but the consumer didn't ask for it
- Provides attribute 'org.jetbrains.kotlin.platform.type' with value 'common' but the consumer didn't ask for it
* Try:
Run with --info or --debug option to get more log output. Run with --scan to get full insights.
* Exception is:
org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':extractIncludeProto'.
at org.gradle.api.internal.tasks.execution.CatchExceptionTaskExecuter.execute(CatchExceptionTaskExecuter.java:38)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.executeTask(EventFiringTaskExecuter.java:77)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.call(EventFiringTaskExecuter.java:55)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter$1.call(EventFiringTaskExecuter.java:52)
at org.gradle.internal.operations.DefaultBuildOperationExecutor$CallableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:416)
at org.gradle.internal.operations.DefaultBuildOperationExecutor$CallableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:406)
at org.gradle.internal.operations.DefaultBuildOperationExecutor$1.execute(DefaultBuildOperationExecutor.java:165)
at org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:250)
at org.gradle.internal.operations.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:158)
at org.gradle.internal.operations.DefaultBuildOperationExecutor.call(DefaultBuildOperationExecutor.java:102)
at org.gradle.internal.operations.DelegatingBuildOperationExecutor.call(DelegatingBuildOperationExecutor.java:36)
at org.gradle.api.internal.tasks.execution.EventFiringTaskExecuter.execute(EventFiringTaskExecuter.java:52)
at org.gradle.execution.plan.LocalTaskNodeExecutor.execute(LocalTaskNodeExecutor.java:41)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$InvokeNodeExecutorsAction.execute(DefaultTaskExecutionGraph.java:370)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$InvokeNodeExecutorsAction.execute(DefaultTaskExecutionGraph.java:357)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$BuildOperationAwareExecutionAction.execute(DefaultTaskExecutionGraph.java:350)
at org.gradle.execution.taskgraph.DefaultTaskExecutionGraph$BuildOperationAwareExecutionAction.execute(DefaultTaskExecutionGraph.java:336)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.lambda$run$0(DefaultPlanExecutor.java:127)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.execute(DefaultPlanExecutor.java:191)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.executeNextNode(DefaultPlanExecutor.java:182)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.run(DefaultPlanExecutor.java:124)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
Caused by: org.gradle.api.internal.artifacts.ivyservice.DefaultLenientConfiguration$ArtifactResolveException: Could not resolve all files for configuration ':compileProtoPath'.
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.rethrowFailure(DefaultConfiguration.java:1265)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.access$1800(DefaultConfiguration.java:141)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration$ConfigurationFileCollection.visitContents(DefaultConfiguration.java:1242)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration$ConfigurationFileCollection.visitContents(DefaultConfiguration.java:1235)
at org.gradle.api.internal.artifacts.configurations.DefaultConfiguration.visitContents(DefaultConfiguration.java:489)
at org.gradle.api.internal.file.AbstractFileCollection.visitStructure(AbstractFileCollection.java:265)
at org.gradle.api.internal.file.CompositeFileCollection.visitContents(CompositeFileCollection.java:152)
at org.gradle.api.internal.file.AbstractFileCollection.visitStructure(AbstractFileCollection.java:265)
at org.gradle.internal.fingerprint.impl.DefaultFileCollectionSnapshotter.snapshot(DefaultFileCollectionSnapshotter.java:50)
at org.gradle.internal.fingerprint.impl.AbstractFileCollectionFingerprinter.fingerprint(AbstractFileCollectionFingerprinter.java:47)
at ...
ultPlanExecutor.java:182)
at org.gradle.execution.plan.DefaultPlanExecutor$ExecutorWorker.run(DefaultPlanExecutor.java:124)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)