I am in the middle of migrating code from Spring Boot 2.7.7/Spring Cloud 2021.0.5 to Spring Boot 3.0.1/Spring Cloud 2022.0.0. As part of this migration, I now am using io.micrometer:micrometer-tracing-bridge-otel. While I have my code functionally working, I have noticed that the traceId is no longer propagated across HTTP REST or Kinesis messaging service boundaries (i.e., my HTTP REST client/server and Kinesis messaging producer/consumer microservices log separate traceIds). Is there an additional dependency or dependencies I need to add to my projects to ensure that the traceId gets propagated across service boundaries?

As FYI, with Spring Boot 2.7.7/Spring Cloud 2021.0.5, I was able to propagate the traceId across Kinesis producers and consumers with the following configuration (no separate configuration was needed to propagate the traceId across HTTP REST boundaries) - adding the b3 header was the key to getting traceId propagation working:

spring:
  cloud:
    stream:
      bindings:
        myEvent-out-0:
          content-type: application/*+avro
          destination: my-event
      kinesis:
        binder:
          auto-create-stream: true
          headers:
            - b3
          kpl-kcl-enabled: true

I noticed here it states that "by default we don't support joined spans (this means that when you have e.g. an HTTP span, you will no longer see the same span being there on the client and sender side, you will see two separate spans now)." So, when it states that by default it's not supported, does this mean that this is an optional configuration? This statement confuses me, and I'm wondering why the decision was to not join spans as the default auto-configuration when this is clearly what is needed for log correlation across a distributed architecture.

So, in summary, I am seeking guidance regarding how to configure Micrometer Tracing so that traceIds are always propagated across service boundaries, whether they are HTTP REST or message boundaries. What is the bare minimum configuration required to get this working?

UPDATE

Through extensive testing, I figured out that if you declare a RestTemplateBuilder, this causes traceId propagation to not work across HTTP REST boundaries. So, if you have a bean defined like the following and you have run across this post because your traceIds are not propagating correctly, REMOVE IT!!!

@Bean
public RestTemplateBuilder restTemplateBuilder() {
  return new RestTemplateBuilder();
}

UPDATE 2

I have added the following bean to my producer microservice in an attempt to add the traceparent header since it doesn't get added automatically yet with a StreamBridge configuration:

@Bean
@GlobalChannelInterceptor(patterns = {"*-out-0"})
public ChannelInterceptor customInterceptor(Tracer tracer) {
  return new ChannelInterceptor() {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
      var payload = message.getPayload();
      var headers = message.getHeaders();
      return MessageBuilder.withPayload(payload)
          .setHeader("traceparent", tracer.currentSpan().context().traceId())
          .copyHeaders(headers)
          .build();
    }
  };
}

With this bean added, I see the message header arriving at the consumer microservice (I log the received headers out), but the trace id in my consumer microservice's logs does not match what is received via the traceparent header.

UPDATE 3

Is the following bean what I should implement on the consumer side for *-in-0 patterns?

@Bean
@GlobalChannelInterceptor(patterns = {"*-in-0"})
public ChannelInterceptor customInterceptor(Tracer tracer, ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor) {
  return new ChannelInterceptor() {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
      observationPropagationChannelInterceptor.preSend(message,channel);
      return message;
    }
  };
}

UPDATE 4

So, here's the latest status.

Producer

Note that I hardcoded the traceparent value for testing purposes. I was concerned that the code I had in place before only calculated the trace id and not the full traceparent value.

@Bean
@GlobalChannelInterceptor(patterns = {"*-out-0"})
public ChannelInterceptor customInterceptor(Tracer tracer) {
  return new ChannelInterceptor() {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
      var payload = message.getPayload();
      var headers = message.getHeaders();
      return MessageBuilder.withPayload(payload)
          .setHeader(
              "traceparent",
              "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") // tracer.currentSpan().context().traceId())
          .copyHeaders(headers)
          .build();
    }
  };
}
spring:
  cloud:
    stream:
      bindings:
        myEvent-out-0:
          content-type: application/*+avro
          destination: my-event
      function:
        autodetect: false
      kinesis:
        binder:
          auto-create-stream: true
          headers:
            - traceparent
          kpl-kcl-enabled: true
  integration:
    management:
      observation-patterns: myEvent-out-0

Consumer

@Bean
public ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor(
    ObservationRegistry observationRegistry) {
  return new ObservationPropagationChannelInterceptor(observationRegistry);
}
spring:
  cloud:
    function:
      definition: myEvent
    stream:
      bindings:
        myEvent-in-0:
          consumer:
            # Note that these values are the defaults
            back-off-initial-interval: 1000
            back-off-max-interval: 10000
            back-off-multiplier: 2.0
            max-attempts: 3
          content-type: application/*+avro
          destination: my-event
          error-handler-definition: errorHandler
          group: my-event-group
        myEvent-out-0:
          content-type: application/*+avro
          destination: my-event-result
      kinesis:
        binder:
          auto-create-stream: true
          headers:
            - traceparent
          kpl-kcl-enabled: true
        bindings:
          myEvent-in-0: 
            consumer:
              checkpoint-mode: record
              listener-mode: record
  integration:
    management:
      observation-patterns: "*"

Log output in consumer

2023-02-01 | 10:59:51.477 | TaskExecutor-250 | DEBUG |com.example.MyEventStreamEventProcessorImpl | Trace: 4f909d6f9bfd860f6d8b6b54cb9245d8 | Span: e560d7749587860b | Received message header key 'traceparent' and value '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01'

As you can see in the log output, the traceparent header is actually sent by the producer and is received by the consumer as a Kinesis header, but it is not used to populate the trace id value that you see in the log.

What else needs to be done in the consumer so that the sent traceparent value is used to populate the trace id value on the consumer? I believe I have implemented everything that has been recommended thus far.

UPDATE 5

@Bean
public Function<Message<MyEvent>, Message<MyEvent>> myEvent(MyEventProcessor myEventProcessor) {
  return myEventProcessor::processMyEvent;
}

UPDATE 6

The traceparent does not get populated in the producer unless I have the following bean specified. Note that I have to construct the traceparent header value myself as I couldn't find how to obtain it from an existing Spring class. Without this bean in place, the traceparent header populated, but it had a different trace id value on the consumer side than what I see in the consumer logs.

  @Bean
  @GlobalChannelInterceptor(patterns = {"*-out-0"})
  public ChannelInterceptor customInterceptor(Tracer tracer) {
    return new ChannelInterceptor() {
      @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
        var payload = message.getPayload();
        var headers = message.getHeaders();
        return MessageBuilder.withPayload(payload)
            .setHeader(
                "traceparent",
                "00-"
                    + tracer.currentSpan().context().traceId()
                    + "-"
                    + tracer.currentSpan().context().spanId()
                    + "-"
                    + (tracer.currentSpan().context().sampled() == null
                            || !tracer.currentSpan().context().sampled()
                        ? "00"
                        : "01"))
            .copyHeaders(headers)
            .build();
      }
    };
  }

UPDATE 7

I replaced the previous bean with the following one, and I now get the stack trace below, so this approach doesn't seem to be working unless I have something missing.

  @Bean
  public NewDestinationBindingCallback newDestinationBindingCallback(ObservationRegistry observationRegistry) {
    return (channelName, channel, producerProperties, extendedProducerProperties) -> {
      ((AbstractMessageChannel)channel).registerObservationRegistry(observationRegistry);
    };
  }
Caused by: java.lang.NullPointerException: null
    at java.base/java.util.Objects.requireNonNull(Objects.java:208)
    at io.micrometer.common.ImmutableKeyValue.<init>(ImmutableKeyValue.java:38)
    at io.micrometer.common.KeyValue.of(KeyValue.java:48)
    at io.micrometer.common.KeyValues.of(KeyValues.java:282)
    at org.springframework.integration.support.management.observation.DefaultMessageSenderObservationConvention.getLowCardinalityKeyValues(DefaultMessageSenderObservationConvention.java:42)
    at org.springframework.integration.support.management.observation.DefaultMessageSenderObservationConvention.getLowCardinalityKeyValues(DefaultMessageSenderObservationConvention.java:29)
    at io.micrometer.observation.SimpleObservation.start(SimpleObservation.java:134)
    at io.micrometer.observation.Observation.observe(Observation.java:557)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithObservation(AbstractMessageChannel.java:338)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:321)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:183)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:144)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:140)

UPDATE 8

OK, the following bean definition works! I no longer have to specify the @GlobalChannelInterceptor on the producer side!

  @Bean
  public NewDestinationBindingCallback newDestinationBindingCallback(
      ObservationRegistry observationRegistry) {
    return (channelName, channel, producerProperties, extendedProducerProperties) -> {
      var abstractMessageChannel = ((AbstractMessageChannel) channel);
      abstractMessageChannel.setBeanName(channelName);
      abstractMessageChannel.registerObservationRegistry(observationRegistry);
    };
  }
1

There are 1 answers

61
Artem Bilan On BEST ANSWER

Not sure if that will help you, but what I have learned that Spring Boot 3 uses a W3C propagation by default: https://github.com/micrometer-metrics/tracing/wiki/Spring-Cloud-Sleuth-3.1-Migration-Guide.

Therefore, the header you need to embed into Kinesis Producer record is exactly traceparent.

See more info in Spring Boot docs: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.micrometer-tracing

UPDATE

Top enable observation on the MessageChannel associated with your Kinesis Producer from binder, you need to add this configuration property:

spring.integration.management.observation-patterns=myEvent-out-0

And then respective tracing header are going to be populated into the message which will be handled by the KinesisMessageHandler. And, therefore, produced in the Kinesis record body.

Here is a sample to demonstrate how to propagate tracing via Kinesis Binder: https://github.com/artembilan/sandbox/tree/master/kinesis-binder-observation-demo

UPDATE 2

To make a tracing propagation via observation on the consumer side in Spring Cloud Stream we have to add this bean:

    @Bean
    ConsumerEndpointCustomizer<MessageProducerSupport> consumerEndpointCustomizer(
            ObservationRegistry observationRegistry) {

        return (endpoint, destinationName, group) -> endpoint.registerObservationRegistry(observationRegistry);
    }

The point is that Spring Cloud Stream does not register endpoints as beans, so we have to instrument them manually. Not sure yet how this can be done from the framework side automatically...

UPDATE 3

To propagate a trace on the consumer side we have to do some fix in the Kinesis Binder. Here is some workaround which demonstrate that trace is supplied between producer and consumer:

@Bean
public Consumer<Message<String>> kinesisConsumer(QueueChannel testBuffer, ObservationRegistry observationRegistry) {
    return message ->
            IntegrationObservation.HANDLER.observation(
                            null,
                            DefaultMessageReceiverObservationConvention.INSTANCE,
                            () -> new MessageReceiverContext(message, "traceHandler"),
                            observationRegistry)
                    .observe(() -> testBuffer.send(message));
}

@Bean
public QueueChannel testBuffer() {
    return new QueueChannel();
}

And I see this in logs:

2023-02-01T15:44:21.766-05:00 DEBUG [,63dacf25bcfd0eca4ec1769d243f5ab3,4ec1769d243f5ab3] 29052 --- [oundedElastic-1] o.s.i.a.outbound.KinesisMessageHandler   : org.springframework.integration.aws.outbound.KinesisMessageHandler@2caeea83 received message: GenericMessage [payload=byte[112], headers={http_requestMethod=GET, http_requestUrl=/test?name=foo, id=a571a72e-d13a-e6cf-5779-a02f863748a5, contentType=application/json, traceparent=00-63dacf25bcfd0eca4ec1769d243f5ab3-4ec1769d243f5ab3-00, timestamp=1675284261766}]
2023-02-01T15:44:24.558-05:00 DEBUG [,63dacf25bcfd0eca4ec1769d243f5ab3,b09fdf7c0e74342a] 29052 --- [esis-consumer-1] o.s.integration.channel.QueueChannel     : preSend on channel 'bean 'testBuffer'; defined in: 'com.example.kinesisbinderobservationdemo.KinesisBinderObservationDemoApplication'; from source: 'public org.springframework.integration.channel.QueueChannel com.example.kinesisbinderobservationdemo.KinesisBinderObservationDemoApplication.testBuffer()'', message: MutableMessage [payload=foo, headers={aws_shard=shardId-000000000000, traceparent=00-63dacf25bcfd0eca4ec1769d243f5ab3-b09fdf7c0e74342a-00, id=4dbe462e-1cd8-f52b-19d2-0f22f0444fcd, sourceData={SequenceNumber: 49637618706029534393522719555344717515294078114444869634,ApproximateArrivalTimestamp: Wed Feb 01 15:44:21 EST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=112 cap=112],PartitionKey: 1711100227,EncryptionType: NONE}, contentType=application/json, aws_receivedPartitionKey=1711100227, aws_receivedStream=my-event, aws_receivedSequenceNumber=49637618706029534393522719555344717515294078114444869634, timestamp=1675284264554}]

UPDATE 4

To wrap a function call into an Observation, you have to do like this:

@Bean
public Function<Message<MyEvent>, Message<MyEvent>> myEvent(MyEventProcessor myEventProcessor,
        ObservationRegistry observationRegistry) {

    return message ->
            IntegrationObservation.HANDLER.observation(
                            null,
                            DefaultMessageReceiverObservationConvention.INSTANCE,
                            () -> new MessageReceiverContext(message, "tracedFunction"),
                            observationRegistry)
                    .observe(() -> myEventProcessor.processMyEvent(message));
}