After reading the documentation, I feel like all I needed to do was add the software.amazon.awssdk:sts dependency and Spring would autoconfigure the StsWebIdentityTokenFileCredentialsProvider, but I always get the following error when writing a message to Kinesis via KPL:

2023-08-07 | 10:34:20.256 | kpl-daemon-0003  | WARN  | c.a.s.k.producer.LogInputStreamReader    | Trace:                                  | Span:                  | [2023-08-07 14:34:20.256507] [0x00000071][0x00007f379cdfa6c0] [warning] [AWS Log: ERROR](AWSXmlClient)HTTP response code: 403
Resolved remote host IP address: xx.xxx.xxx.xxx
Request ID: xxxxxxxxxxxxxxx
Exception name: InvalidClientTokenId
Error message: The security token included in the request is invalid.
4 response headers:
content-length : 306
content-type : text/xml
date : Mon, 07 Aug 2023 14:34:19 GMT
x-amzn-requestid : xxxxxxxxxxxxxxxxxxxx
2023-08-07 | 10:34:20.256 | kpl-daemon-0003  | ERROR | c.a.s.k.producer.LogInputStreamReader    | Trace:                                  | Span:                  | [2023-08-07 14:34:20.256566] [0x00000071][0x00007f379cdfa6c0] [error] [pipeline.h:228] Failed to get StreamARN using STS GetCallerIdentity | Code: InvalidClientTokenId | Message: The security token included in the request is invalid. | Request was: Action=GetCallerIdentity&Version=2011-06-15
2023-08-07 | 10:34:20.256 | kpl-daemon-0003  | WARN  | c.a.s.k.producer.LogInputStreamReader    | Trace:                                  | Span:                  | terminate called after throwing an instance of 'boost::wrapexcept<boost::exception_detail::error_info_injector<boost::log::v2s_mt_posix::system_error> >'
2023-08-07 | 10:34:20.257 | kpl-daemon-0003  | WARN  | c.a.s.k.producer.LogInputStreamReader    | Trace:                                  | Span:                  |   what():  Failed to set TLS value: Invalid argument
2023-08-07 | 10:34:21.276 | kpl-daemon-0005  | ERROR | c.a.s.kinesis.producer.KinesisProducer   | Trace:                                  | Span:                  | Error in child process
java.lang.RuntimeException: EOF reached during read
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:532)
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:508)
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:504)
        at com.amazonaws.services.kinesis.producer.Daemon.readSome(Daemon.java:553)
        at com.amazonaws.services.kinesis.producer.Daemon.receiveMessage(Daemon.java:243)
        at com.amazonaws.services.kinesis.producer.Daemon.access$500(Daemon.java:61)
        at com.amazonaws.services.kinesis.producer.Daemon$3.run(Daemon.java:298)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
2023-08-07 | 10:34:21.285 | -pool-1-thread-0 | ERROR | o.s.integration.handler.LoggingHandler   | Trace:                                  | Span:                  | org.springframework.integration.aws.support.AwsRequestFailureException, failedMessage=GenericMessage [payload=byte[27], headers={...omitted for security purposes...
        at org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleResponse(AbstractAwsMessageHandler.java:148)
        at org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.lambda$handleMessageInternal$0(AbstractAwsMessageHandler.java:117)
        at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
        at org.springframework.integration.aws.outbound.KplMessageHandler$2.onFailure(KplMessageHandler.java:488)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1124)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
        at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:55)
        at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$2.run(KinesisProducer.java:200)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: EOF reached during read
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
        ... 13 more
Caused by: java.lang.RuntimeException: EOF reached during read
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:532)
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:508)
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:504)
        at com.amazonaws.services.kinesis.producer.Daemon.readSome(Daemon.java:553)
        at com.amazonaws.services.kinesis.producer.Daemon.receiveMessage(Daemon.java:243)
        at com.amazonaws.services.kinesis.producer.Daemon.access$500(Daemon.java:61)
        at com.amazonaws.services.kinesis.producer.Daemon$3.run(Daemon.java:298)
        ... 3 more


So, it appears that this bean isn't created via autoconfiguration and that a different credential provider is. What else needs to be done to ensure the StsWebIdentityTokenFileCredentialsProvider is autoconfigured with the correct properties?

We have service accounts set up in EKS, and I've been able to kubectl exec into the container to ensure that the AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE environment variables are mapped correctly.

1

There are 1 answers

0
Keith Bennett On BEST ANSWER

After many trials and errors, here is the solution that finally works:

import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.stream.binder.kinesis.config.KinesisBinderConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import software.amazon.awssdk.regions.providers.AwsRegionProvider;

@AutoConfiguration
@AutoConfigureAfter(CredentialsProviderAutoConfiguration.class)
@AutoConfigureBefore(KinesisBinderConfiguration.class)
@Slf4j
public class KinesisProducerAutoConfiguration {

  @Value("${spring.cloud.aws.credentials.sts.role-arn}")
  private String roleArn;

  @Value("${spring.cloud.aws.credentials.sts.web-identity-token-file}")
  private String webIdentityTokenFile;

  @Bean
  @Primary
  @ConditionalOnMissingBean
  @ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
  public KinesisProducerConfiguration kinesisProducerConfiguration(
      AwsRegionProvider awsRegionProvider) {
    log.debug("Creating KinesisProducerConfiguration...");
    var kinesisProducerConfiguration = new KinesisProducerConfiguration();
    var webIdentityTokenCredentialsProvider =
        WebIdentityTokenCredentialsProvider.builder()
            .roleArn(roleArn)
            .roleSessionName(UUID.randomUUID().toString())
            .webIdentityTokenFile(webIdentityTokenFile)
            .build();
    kinesisProducerConfiguration.setCredentialsProvider(webIdentityTokenCredentialsProvider);
    kinesisProducerConfiguration.setRegion(awsRegionProvider.getRegion().id());
    return kinesisProducerConfiguration;
  }
}

And in my src/main/resources/bootstrap.yml file I have the following defined:

spring:
  cloud:
    aws:
      credentials:
        sts:
          role-arn: ${AWS_ROLE_ARN}
          web-identity-token-file: ${AWS_WEB_IDENTITY_TOKEN_FILE}

I learned that the default Spring Cloud Stream Binder for Kinesis implementation does not work that can be found here. This is because version 0.15.7 of the KPL library requires the use of STS (read more about this requirement here). We are deploying our containers to EKS, and previously we set up service accounts that provide the environment variable values mapped into the yml properties in bootstrap.yml you can see posted in the solution. One of the odd things I noticed is that on this line, you can see that the secret access key and access key id arguments are passed in the wrong order when constructing the BasicAwsCredentials object, but even then the solution as it's currently coded in the Kinesis binder won't work. I know this because I tried a similar adapter-style solution with the BasicAwsCredentials arguments passed in the correct order within the autoconfiguration class I posted in the solution.