Spring WebFlux Security Context is Empty After Setting with ReactiveSecurityContextHolder

32 views Asked by At

I'm working with Spring Boot and Spring Security for a reactive application, specifically dealing with authentication within a Kafka consumer. Despite setting the security context using ReactiveSecurityContextHolder.withSecurityContext(Mono.just(securityContext)), immediately accessing the context within the same reactive chain results in an empty context. I'm following the recommended practices for reactive applications. Below is the snippet where the context is set and accessed:

public <T> Mono<Void> authenticateAndVerifyContext(ReceiverRecord<String, T> record) {

        return extractCredentialsFromBasicAuth(record)
                .flatMap(this::authenticate)
                .flatMap(authentication -> {
                    SecurityContext securityContext = new SecurityContextImpl(authentication);
                    log.info("securityContext: {}", securityContext);
                    return Mono.deferContextual(view -> Mono.empty())
                               .contextWrite(ReactiveSecurityContextHolder.withSecurityContext(Mono.just(securityContext)))
                               .then(ReactiveSecurityContextHolder.getContext()
                                                                  .flatMap(context -> {
                                                                      if (context.getAuthentication() != null && context.getAuthentication().isAuthenticated()) {
                                                                          log.info("Security context successfully updated. User: {}", context.getAuthentication().getName());
                                                                      } else {
                                                                          log.error("Authentication failed");
                                                                      }
                                                                      return Mono.empty();
                                                                  })
                                                                  .switchIfEmpty(Mono.defer(() -> {
                                                                      log.error("Security context is empty");
                                                                      return Mono.empty();
                                                                  })));
                }).then();
    }

    private <T> Mono<UsernamePasswordAuthenticationToken> extractCredentialsFromBasicAuth(
            final ReceiverRecord<String, T> record) {

        log.info("kafka authorization: {}",
                 new String(record.headers().lastHeader(AUTHORIZATION_KEY).value(), StandardCharsets.UTF_8));

        return Mono.justOrEmpty(record.headers().lastHeader(AUTHORIZATION_KEY))
                   .map(header -> new String(header.value(), StandardCharsets.UTF_8))
                   .map(this::decodeBasicAuthHeader)
                   .flatMap(this::createAuthenticationToken);
    }

    private String decodeBasicAuthHeader(final String headerValue) {

        return new String(Base64.getDecoder().decode(headerValue.substring(6)), StandardCharsets.UTF_8);
    }


    private Mono<UsernamePasswordAuthenticationToken> createAuthenticationToken(final String credentials) {

        final String[] parts = credentials.split(":", 2);
        if (parts.length == 2) {
            return Mono.just(new UsernamePasswordAuthenticationToken(parts[0], parts[1]));
        } else {
            return Mono.error(new IllegalArgumentException("Invalid basic authentication token"));
        }
    }

This behavior occurs despite seemingly correct usage of .contextWrite and ReactiveSecurityContextHolder.getContext(). I'm using Spring Boot 3.1.8 and Spring Security 6.1.6. Has anyone experienced similar issues or can offer insight into why the security context appears empty?

I tried setting the security context within a reactive flow using ReactiveSecurityContextHolder.withSecurityContext(Mono.just(securityContext)) and expected to retrieve this context later in the same flow. However, upon attempting to access the context immediately after setting it, using ReactiveSecurityContextHolder.getContext(), the context appears to be empty. This unexpected behavior occurs in a reactive Kafka consumer setup, despite following the recommended practices for Spring WebFlux and Spring Security reactive applications.

1

There are 1 answers

0
Alejo On

I finally understood my mistake; reactive programming still feels a bit counter-intuitive to me. The error was in the order in which I was writing the security context. To make it work correctly and for my entire flow to take the new security context, I had to place it at the end of the chain, which I understand is executed first.

I discovered this by following this documentation, where they show that in a 'handleRequest' method they place 'contextWrite' at the end: https://spring.io/blog/2023/03/28/context-propagation-with-project-reactor-1-the-basics

This is what my corrected code looks like (It's worth noting that this is a dummy and concise code to test authentication manually):

private final ReactiveAuthenticationManager authenticationManager;

public <T> Mono<Void> authenticateAndVerifyContext(ReceiverRecord<String, T> record) {

    final Mono<SecurityContext> securityContext = initializeSecurityContext(record);

    return Mono.just(record)
               .then(ReactiveSecurityContextHolder
                           .getContext()
                           .flatMap(context -> {
                               if (context.getAuthentication() != null && context.getAuthentication().isAuthenticated()) {
                                   log.info("Username: {}", context.getAuthentication().getName());
                               } else {
                                   log.error("Authentication failed!");
                               }
                               return Mono.empty();
                           })
                           .switchIfEmpty(Mono.defer(() -> {
                               log.error("Security context is empty!");
                               return Mono.empty();
                           })))
               .contextWrite(ReactiveSecurityContextHolder.withSecurityContext(securityContext))
               .then();
}
  
  public <T> Mono<SecurityContext> initializeSecurityContext(final ReceiverRecord<String, T> record) {

        return authenticate(record).map(SecurityContextImpl::new);
    }
  
  private <T> Mono<Authentication> authenticate(final ReceiverRecord<String, T> record) {

        return extractCredentialsFromBasicAuth(record)
                .flatMap(this::authenticate)
                .flatMap(this::checkRole);
    }
  
  private Mono<Authentication> authenticate(final UsernamePasswordAuthenticationToken authToken) {

        return authenticationManager.authenticate(authToken)
                                    .onErrorMap(e -> new BadCredentialsException(
                                            "Authentication failed for user " + authToken.getName()));
    }
  
  private <T> Mono<UsernamePasswordAuthenticationToken> extractCredentialsFromBasicAuth(
            final ReceiverRecord<String, T> record) {

        return Mono.justOrEmpty(record.headers().lastHeader(AUTHORIZATION_KEY))
                   .map(header -> new String(header.value(), StandardCharsets.UTF_8))
                   .map(this::decodeBasicAuthHeader)
                   .flatMap(this::createAuthenticationToken);
    }
  
  private String decodeBasicAuthHeader(final String headerValue) {

        return new String(Base64.getDecoder().decode(headerValue.substring(6)), StandardCharsets.UTF_8);
    }
  
  private Mono<UsernamePasswordAuthenticationToken> createAuthenticationToken(final String credentials) {

        final String[] parts = credentials.split(":", 2);
        if (parts.length == 2) {
            return Mono.just(new UsernamePasswordAuthenticationToken(parts[0], parts[1]));
        } else {
            return Mono.error(new BadCredentialsException("Invalid basic authentication token"));
        }
    }
  
  private Mono<Authentication> checkRole(final Authentication authentication) {

        return Flux.fromIterable(authentication.getAuthorities())
                   .map(GrantedAuthority::getAuthority)
                   .any(role -> REQUIRED_ROLES.stream().anyMatch(requiredRole -> requiredRole.name().equals(role)))
                   .flatMap(hasRequiredRole -> hasRequiredRole ? Mono.just(authentication) : Mono.empty())
                   .switchIfEmpty(Mono.error(new AccessDeniedException("Not authorized")));
    }