@Timed
public Mono<Void> process() {
var fluxSources = storeRepository.findAllSources();
var fluxItemsToPublish = itemRepository.findAllNonMarketPlaceItems();
var fluxPickupMappingJson =
fluxSources
.filter(SourceIdValidator::isValidSource)
.buffer(6)
.flatMap(this::getMappingData, 1)
.onErrorContinue(
(throwable, data) ->
logger.error("Error getting mapping data for sources : {}", data));
return fluxPickupMappingJson
.concatMap(dbCompareService::process)
.filter(deltas -> !deltas.isEmpty())
.doOnNext(d -> System.out.println("Response: " + d))
.doOnNext(deltas -> logger.info("affected sources: {}", deltas))
.flatMap(deltas -> this.publishDeltas(deltas, fluxItemsToPublish))
.then();
}
@Service
public class FcPickupMappingDbCompareService {
private FcPickupMappingRepository pickupMappingRepository;
private LocationMappingsApiResponseMapper mapper;
private static final Logger logger = LogManager.getLogger();
private final ReactiveTransactionManager transactionManager;
private final TransactionalOperator operator;
public FcPickupMappingDbCompareService(
FcPickupMappingRepository pickupMappingRepository,
LocationMappingsApiResponseMapper mapper,
ReactiveTransactionManager transactionManager,
TransactionalOperator operator) {
this.pickupMappingRepository = pickupMappingRepository;
this.mapper = mapper;
this.transactionManager = transactionManager;
this.operator = operator;
}
public Mono<List<String>> process(String locationMappingApiJson) {
System.out.println("JSON: " + locationMappingApiJson);
System.out.println("Thread Name: " + Thread.currentThread().getName());
return mapper
.fromJson(locationMappingApiJson)
.flatMapMany(fcPickupMap -> Flux.fromIterable(fcPickupMap.entrySet()))
.flatMap(mapping -> compareAndCollectSources(mapping.getKey(), mapping.getValue()))
.collectList();
}
private Mono<String> compareAndCollectSources(
String source, List<LocationMappingJson> newMappingJson) {
var existingDAO =
pickupMappingRepository
.findAllByReceivingLocation(source)
.filter(dao -> !dao.fulfillmentLocation().equals(dao.receivingLocation()))
.collect(Collectors.toList());
var newMappingDAO =
newMappingJson.stream()
.filter(LoaderUtil::isFCMapping)
.filter(LocationMappingJsonValidator::isValid)
.map(FcPickupMappingDAOFactory::from)
.collect(Collectors.toList());
return existingDAO
.filter(mappingFromDB -> isChangeDetected(mappingFromDB, newMappingDAO))
.flatMap(mappingFromDB -> replaceExistingWithNew(source, newMappingDAO));
}
private Mono<String> replaceExistingWithNew(
String receivingLocation, List<FcPickupMappingDAO> newLocationIds) {
return pickupMappingRepository
.deleteAllByReceivingLocation(receivingLocation)
.flatMap(deletedRows -> iterateAndUpsert(receivingLocation, newLocationIds))
.as(operator::transactional)
.doOnError(
err ->
logger.error(
"PickupMappingDbCompareService - error updating mapping sources for receivingLocation: {}",
receivingLocation))
.onErrorResume(err -> Mono.empty());
}
private Mono<String> iterateAndUpsert(String receivingLocation, List<FcPickupMappingDAO> newMappingDAOs) {
return Flux.fromIterable(new HashSet<>(newMappingDAOs))
.filter(dao -> !dao.fulfillmentLocation().equals(dao.receivingLocation()))
.flatMap(pickupMappingRepository::upsert).then(Mono.just(receivingLocation));
}
}
Problem:
When dbCompareService::process is being called, I am doing some DB operations such as reading from DB and based on some validations, I am deleting and doing some upserts. So, now since I am using ReactiveCrudRepository, elements from upstream publisher (which is fluxPickupMappingJson in this case) is emitting elements eagerly not waiting for completing the inner publisher. So, in order to solve this issue I have used this -> concatMap(dbCompareService::process) and it worked fine. However, I just want to know whether this seems fine or Is there any other approach to solve this issue?
In a nutshell - Since, I have write queries (upsert in this case) in dbCompareService::process method so I need to complete one operation before sending the next one in.
Please let me know if there are any other solutions to solve this problem.