While fetching 4000 or more values from a database (postgresql) it is not possible to filter the elements. (using Reactive streams)

101 views Asked by At

It is necessary to filter the flow of values from the database (filtering must be performed in a reactive, non-blocking style). You need to get the incoming element, take the email field from it and validate this value. After the first correct result. Stop processing.

However. to get a valid value, you may have to process several thousand elements before you get the first suitable value.

public class User {

    @Id
    Long id;

    String email;
    
    String phoneHome;
}

there is a search by phone number

private Mono<Response> findUserByHomePhone(Response response) {

    return Mono.just(response)
            .flatMap(this::retrieveUserList);
}
private Mono<Response> retrieveUserList (Response response) {

    String phone = retrievePhoneFromResponse (response);
  
    return Mono.from(userService.getByPhone (phone)
                    .groupBy(Customer::getId)
                    .flatMap(this::processGroupedObjects)
                    .switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response)))
            .as(Log.of(log,
                    "Search by phone {}",
                    phone (response))::info);
}
  • userService.getByPhone (phone) - through the service class (@Transactional is installed above it), the method defined in ReactiveCrudRepository is accessed, This method initiates a request to the database. Request of the type:
select * from users by phone = $1
  • .groupBy(Customer::getId) - we iterate through the flow of elements, each element of the flow is a User. We perform grouping by id.

  • .flatMap(this::processGroupedObjects)

    • processing grouped objects. this is where freezes are possible. in Debug mode, I couldn't check all the elements. since the first few thousand elements have as email -> null.
  • .switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response))) - as soon as there is a user whose email field is filled in, we return the result.

private Mono< Response > getFirsFoundElement(Signal<? extends User> signal, Response response) {

    boolean isFoundElement = signal.hasValue();

    if (isFoundElement) {
        return Mono.just(response);
    }

    return Mono.error(new Exception());
}


private Flux<User> processGroupedObjects(GroupedFlux<Long, User> group) {

    return group
            .mapNotNull(this::checkEmailOnNull);
}

private User checkEmailOnNull (User user) {

    String email = user.getEmail();

    if (Strings.isEmpty(email)){
        return null;
    }

    return user;
}

However, approximately at this point (process Group Objects(GroupedFlux<Lang, User> group)) - a freezes occurs, I do not observe errors in the console.

The number of source elements in the database is from 4 000 to 25 000 (the algorithm worked for 3000).

I found this: GroupedFlux

Grouping is best suited for when you have a medium to low number of groups. The groups must also imperatively be consumed (such as by a flatMap) so that groupBy continues fetching data from upstream and feeding more groups. Sometimes, these two constraints multiply and lead to hangs, such as when you have a high cardinality and the concurrency of the flatMap consuming the groups is too low.

however, it is not clear to me what was meant in the documentation, although I think my case is described here, but I am not sure about it.

Мaybe anyone has any ideas why freezes occur, how it could be fixed?

2

There are 2 answers

3
Igor Artamonov On BEST ANSWER

As you found out, grouping doesn't work for a flow with many groups. So I suggest avoiding the groups at all.

As your goal is just to find any user with an email it's simple:

userService.getByPhone (phone)
  .filter((user) -> user.getEmail() != null)
  .next()

If you want to get emails for all users you don't need a groupBy too, just use distinct:

userService.getByPhone (phone)
  .filter((user) -> user.getEmail() != null)
  .distinct((user) -> user.getId()) 
  
0
skyho On

I organized the filter like this :


 private Mono<Response> filterUsersByEmail(Response response) {

        String phone = retrieveMobilePhoneFromResponse(response);

       return userService
                .findByMobilePhone(phone)
                .filter(user -> isaBoolean(user))
                .next()
                .switchIfEmpty(
                        Mono.defer(() -> Mono.error(
                        new Exception())
                )
                )
               .flatMap(user -> Mono.just(response));

}

 private static boolean isaBoolean(User user) {

        String email = user.getEmail();
        boolean b = !Strings.isEmpty(email);
        return b;
    }