Spring WebFlux: Reactive MongoDB

1.4k views Asked by At

I'm new to Spring Reactor, so i want to refactor this simply spring data (on kotlin) method:

fun save(user: User): Mono<User> {
    if (findByEmail(user.email).block() != null) {
        throw UserAlreadyExistsException()
    }

    user.password = passwordEncoder.encode(user.password)
    return userRepository.save(user)
}

Thanks

3

There are 3 answers

0
Simon Baslé On

(forgive me if the Kotlin syntax is wrong and if I'm doing things in Java style :o)

fun save(user: User): Mono<User> {
    //we'll prepare several helpful Monos and finally combine them.
    //as long as we don't subscribe to them, nothing happens.

    //first we want to short-circuit if the user is found (by email).
    //the mono below will onError in that case, or be empty
    Mono<User> failExistingUser = findByEmail(user.email)
        .map(u -> { throw new UserAlreadyExistsException(); });

    //later we'll need to encode the password. This is likely to be
    //a blocking call that takes some time, so we isolate that call
    //in a Mono that executes on the Elastic Scheduler. Note this
    //does not execute immediately, since it's not subscribed to yet...
    Mono<String> encodedPassword = Mono
        .fromCallable(() -> passwordEncoder.encode(user.password))
        .subscribeOn(Schedulers.elastic());

    //lastly the save part. We want to combine the original User with
    //the result of the encoded password.
    Mono<User> saveUser = user.toMono() //this is a Kotlin extension
        .and(encodedPassword, (u, p) -> {
            u.password = p;
            return u;
        })
        //Once this is done and the user has been updated, save it
        .flatMap(updatedUser -> userRepository.save(updatedUser));

   //saveUser above is now a Mono that represents the completion of
   //password encoding, user update and DB save.

   //what we return is a combination of our first and last Monos.
   //when something subscribes to this combination:
   // - if the user is found, the combination errors
   // - otherwise, it subscribes to saveUser, which triggers the rest of the process
   return failExistingUser.switchIfEmpty(saveUser);
}

A shortened version without intermediary variables nor comments:

fun save(user: User): Mono<User> {
    return findByEmail(u.email)
        .map(u -> { throw new UserAlreadyExistsException(); })
        .switchIfEmpty(user.toMono())
        .and(Mono.fromCallable(() -> passwordEncoder.encode(user.password))
                 .subscribeOn(Schedulers.elastic()),
             (u, p) -> {
                u.password = p;
                return u;
             })
        .flatMap(updatedUser -> userRepository.save(updatedUser));
}
0
x80486 On

Something like this should work:

  open fun save(req: ServerRequest): Mono<ServerResponse> {
    logger.info { "${req.method()} ${req.path()}" }
    return req.bodyToMono<User>().flatMap {
      // You might need to "work out" this if since I don't know what you are doing
      if (null != findByEmail(it.email).block()) {
        throw UserAlreadyExistsException()
      }
      it.password = passwordEncoder.encode(it.password)
      repository.save(it).flatMap {
        logger.debug { "Entity saved successfully! Result: $it" }
        ServerResponse.created(URI.create("${req.path()}/${it.id}")).build()
      }
    }
  }

Notice I'm using MicroUtils/kotlin-logging. Delete the log statements if you don't know or just don't want them.

Basically, you need to "consume" (a.k.a. subscribe) first to what's coming in the ServerRequest in order to access the content.

Alternatively, instead of throwing an exception, you can also have an actual flow that handles that scenario; something like:

open fun ...
  return ServerResponse.ok()
      // Keep doing stuff here...if something is wrong
      .switchIfEmpty(ServerResponse.notFound().build())
}

You can adjust the example to your User type in case you really want to pass it instead of a ServerRequest.

0
PaleNisko On

You can use hasElement() function in Mono. Take a look at this extension functions to Mono:

inline fun <T> Mono<T>.errorIfEmpty(crossinline onError: () -> Throwable): Mono<T> {
        return this.hasElement()
                .flatMap { if (it) this else Mono.error(onError()) }
}

inline fun <T> Mono<T>.errorIfNotEmpty(crossinline onError: (T) -> Throwable): Mono<T> {
    return this.hasElement()
            .flatMap { if (it) Mono.error(onError.invoke(this.block()!!)) else this }
}

The problem with switchIfEmpty is that it always evaluate expression passed in argument - writing such code will always produce Foo object:

mono.switchIfEmpty(Foo())

You can write your own extension to lazy evaluate expression passed in argument:

inline fun <T> Mono<T>.switchIfEmpty(crossinline default: () -> Mono<T>): Mono<T> {
    return this.hasElement()
            .flatMap { if (it) this else default() }
}

Here are two more extension functions - you can use them to check whether password is correct:

inline fun <T> Mono<T>.errorIf(crossinline predicate: (T) -> Boolean, crossinline throwable: (T) -> Throwable): Mono<T> {
    return this.flatMap { if (predicate(it)) Mono.error(throwable(it)) else Mono.just(it) }
}

inline fun <T> Mono<T>.errorIfNot(crossinline predicate: (T) -> Boolean, crossinline throwable: (T) -> Throwable): Mono<T> {
    return this.errorIf(predicate = { !predicate(it) }, throwable = throwable)
}