Should I use concatMap in reactive for DB call?

21 views Asked by At
@Timed
  public Mono<Void> process() {
    var fluxSources = storeRepository.findAllSources();
    var fluxItemsToPublish = itemRepository.findAllNonMarketPlaceItems();

    var fluxPickupMappingJson =
        fluxSources
            .filter(SourceIdValidator::isValidSource)
            .buffer(6)
            .flatMap(this::getMappingData, 1)
            .onErrorContinue(
                (throwable, data) ->
                    logger.error("Error getting mapping data for sources : {}", data));

    return fluxPickupMappingJson
        .concatMap(dbCompareService::process)
        .filter(deltas -> !deltas.isEmpty())
        .doOnNext(d -> System.out.println("Response: " + d))
        .doOnNext(deltas -> logger.info("affected sources: {}", deltas))
        .flatMap(deltas -> this.publishDeltas(deltas, fluxItemsToPublish))
        .then();
  }
@Service
public class FcPickupMappingDbCompareService {
  private FcPickupMappingRepository pickupMappingRepository;
  private LocationMappingsApiResponseMapper mapper;
  private static final Logger logger = LogManager.getLogger();
  private final ReactiveTransactionManager transactionManager;
  private final TransactionalOperator operator;

  public FcPickupMappingDbCompareService(
      FcPickupMappingRepository pickupMappingRepository,
      LocationMappingsApiResponseMapper mapper,
      ReactiveTransactionManager transactionManager,
      TransactionalOperator operator) {
    this.pickupMappingRepository = pickupMappingRepository;
    this.mapper = mapper;
    this.transactionManager = transactionManager;
    this.operator = operator;
  }

  public Mono<List<String>> process(String locationMappingApiJson) {
    System.out.println("JSON: " + locationMappingApiJson);
    System.out.println("Thread Name: " + Thread.currentThread().getName());
    return mapper
        .fromJson(locationMappingApiJson)
        .flatMapMany(fcPickupMap -> Flux.fromIterable(fcPickupMap.entrySet()))
        .flatMap(mapping -> compareAndCollectSources(mapping.getKey(), mapping.getValue()))
        .collectList();
  }

  private Mono<String> compareAndCollectSources(
      String source, List<LocationMappingJson> newMappingJson) {
    var existingDAO =
        pickupMappingRepository
            .findAllByReceivingLocation(source)
            .filter(dao -> !dao.fulfillmentLocation().equals(dao.receivingLocation()))
            .collect(Collectors.toList());

    var newMappingDAO =
        newMappingJson.stream()
            .filter(LoaderUtil::isFCMapping)
            .filter(LocationMappingJsonValidator::isValid)
            .map(FcPickupMappingDAOFactory::from)
            .collect(Collectors.toList());

    return existingDAO
        .filter(mappingFromDB -> isChangeDetected(mappingFromDB, newMappingDAO))
        .flatMap(mappingFromDB -> replaceExistingWithNew(source, newMappingDAO));
  }

  private Mono<String> replaceExistingWithNew(
      String receivingLocation, List<FcPickupMappingDAO> newLocationIds) {
    return pickupMappingRepository
        .deleteAllByReceivingLocation(receivingLocation)
        .flatMap(deletedRows -> iterateAndUpsert(receivingLocation, newLocationIds))
        .as(operator::transactional)
        .doOnError(
            err ->
                logger.error(
                    "PickupMappingDbCompareService - error updating mapping sources for receivingLocation: {}",
                    receivingLocation))
        .onErrorResume(err -> Mono.empty());
  }

  private Mono<String> iterateAndUpsert(String receivingLocation, List<FcPickupMappingDAO> newMappingDAOs) {
    return Flux.fromIterable(new HashSet<>(newMappingDAOs))
        .filter(dao -> !dao.fulfillmentLocation().equals(dao.receivingLocation()))
        .flatMap(pickupMappingRepository::upsert).then(Mono.just(receivingLocation));
  }
}

Problem: When dbCompareService::process is being called, I am doing some DB operations such as reading from DB and based on some validations, I am deleting and doing some upserts. So, now since I am using ReactiveCrudRepository, elements from upstream publisher (which is fluxPickupMappingJson in this case) is emitting elements eagerly not waiting for completing the inner publisher. So, in order to solve this issue I have used this -> concatMap(dbCompareService::process) and it worked fine. However, I just want to know whether this seems fine or Is there any other approach to solve this issue?

In a nutshell - Since, I have write queries (upsert in this case) in dbCompareService::process method so I need to complete one operation before sending the next one in.

Please let me know if there are any other solutions to solve this problem.

0

There are 0 answers