I am trying to use Supplier/Consumer to produce and consume messages from Kinesis data stream. Is there a way to add partition key dynamically?
private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
@Bean
public Supplier<Message<String>> produceMessages() {
return () -> this.messages.poll();
}
@Override
public void produce(main.Test request, StreamObserver<Test> response) {
Message input = MessageBuilder.withPayload(request.getMessage())
.setHeader("partitionKey", "los").build();
this.messages.offer(input);
response.onCompleted();
}
application.properties
spring.cloud.stream.bindings.produceMessages-out-0.producer.partitionKeyExpression=headers['partitionKey']
We don't know what is
channel1
, but according to Spring Cloud Stream docs it has to be like this:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties
You can find sample here: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kinesis-samples/kinesis-produce-consume