diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index 4138c28..a075ffa 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -15,14 +15,12 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; -import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.transaction.KafkaTransactionManager; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ConcurrentHashMap; @Configuration @EnableKafka @@ -132,6 +130,6 @@ public KafkaTemplate kafkatemplateEndoftraitement(final Producer @Bean public Map kafkaWorkInProgress() { - return Collections.synchronizedMap(new HashMap<>()); + return new ConcurrentHashMap<>(); } } diff --git a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java index 5799d76..80e3dc2 100644 --- a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java +++ b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java @@ -72,12 +72,8 @@ public int getNbActiveThreads() { return this.nbActiveThreads.get(); } - public void incrementNbLignesTraitees() { - this.nbLignesTraitees.incrementAndGet(); - } - - public int getNbLignesTraitees() { - return this.nbLignesTraitees.get(); + public int incrementNbLignesTraiteesAndGet() { + return this.nbLignesTraitees.incrementAndGet(); } public void setIsOnError(boolean error) { diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index ab7a6fe..81c8673 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -75,7 +75,6 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class); String providerName = Utils.extractProvider(ligneKbart.key()); - workInProgress.get(filename).incrementNbLignesTraitees(); executorService.execute(() -> { try { log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + ligneKbart.key() + ";" + Thread.currentThread().getName()); @@ -89,7 +88,7 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { Header lastHeader = ligneKbart.headers().lastHeader("nbLinesTotal"); if (lastHeader != null) { int nbLignesTotal = Integer.parseInt(new String(lastHeader.value())); - if (nbLignesTotal == workInProgress.get(filename).getNbLignesTraitees()) { + if (nbLignesTotal == workInProgress.get(filename).incrementNbLignesTraiteesAndGet()) { if (workInProgress.get(filename).getSemaphore().tryAcquire()) { workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); handleFichier(filename); @@ -152,7 +151,7 @@ private void handleFichier(String filename) { log.error("Le nom du fichier " + filename + " n'est pas correct. " + e); emailService.sendProductionErrorEmail(filename, e.getMessage()); } finally { - log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).getNbLignesTraitees()); + log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).incrementNbLignesTraiteesAndGet()); workInProgress.remove(filename); } } diff --git a/src/main/java/fr/abes/bestppn/model/entity/ExecutionReport.java b/src/main/java/fr/abes/bestppn/model/entity/ExecutionReport.java index c77fea3..812e0dc 100644 --- a/src/main/java/fr/abes/bestppn/model/entity/ExecutionReport.java +++ b/src/main/java/fr/abes/bestppn/model/entity/ExecutionReport.java @@ -8,7 +8,7 @@ public class ExecutionReport { @Getter @Setter - private int nbtotalLines; + private AtomicInteger nbtotalLines; private AtomicInteger nbBestPpnFind; @@ -20,8 +20,13 @@ public ExecutionReport() { nbBestPpnFind = new AtomicInteger(0); nbLinesWithInputDataErrors = new AtomicInteger(0); nbLinesWithErrorsInBestPPNSearch = new AtomicInteger(0); - nbtotalLines = 0; + nbtotalLines = new AtomicInteger(0); } + + public void setNbtotalLines(int nbTotalLignes) { + this.nbtotalLines.set(nbTotalLignes); + } + public int getNbBestPpnFind() { return nbBestPpnFind.get(); } @@ -35,7 +40,7 @@ public int getNbLinesWithErrorsInBestPPNSearch() { } public int getNbLinesOk(){ - return nbtotalLines - nbLinesWithErrorsInBestPPNSearch.get() - nbLinesWithInputDataErrors.get(); + return nbtotalLines.get() - nbLinesWithErrorsInBestPPNSearch.get() - nbLinesWithInputDataErrors.get(); } public void incrementNbBestPpnFind() { @@ -46,13 +51,6 @@ public void incrementNbLinesWithInputDataErrors() { nbLinesWithInputDataErrors.incrementAndGet(); } - public void clear(){ - nbtotalLines = 0; - nbBestPpnFind.set(0); - nbLinesWithInputDataErrors.set(0); - nbLinesWithErrorsInBestPPNSearch.set(0); - } - public void incrementNbLinesWithErrorsInBestPPNSearch() { nbLinesWithErrorsInBestPPNSearch.incrementAndGet(); } diff --git a/src/main/java/fr/abes/bestppn/service/ExecutionReportService.java b/src/main/java/fr/abes/bestppn/service/ExecutionReportService.java deleted file mode 100644 index 2faa4d4..0000000 --- a/src/main/java/fr/abes/bestppn/service/ExecutionReportService.java +++ /dev/null @@ -1,32 +0,0 @@ -package fr.abes.bestppn.service; - -import fr.abes.bestppn.model.entity.ExecutionReport; -import lombok.Getter; -import org.springframework.stereotype.Service; - -@Service -public class ExecutionReportService { - @Getter - private ExecutionReport executionReport = new ExecutionReport(); - - public void setNbtotalLines(int nbtotalLines) { - this.executionReport.setNbtotalLines(nbtotalLines); - } - public void addNbBestPpnFind(){ - executionReport.incrementNbBestPpnFind(); - } - - public void addNbLinesWithInputDataErrors(){ - executionReport.incrementNbLinesWithInputDataErrors(); - } - - public void addNbLinesWithErrorsInBestPPNSearch(){ - executionReport.incrementNbLinesWithErrorsInBestPPNSearch(); - } - - public void clearExecutionReport() { - executionReport.clear(); - } - - -} diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java index 1fe4db5..f17e3c7 100644 --- a/src/main/java/fr/abes/bestppn/service/KbartService.java +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -50,8 +50,14 @@ public void processConsumerRecord(LigneKbartDto ligneFromKafka, String providerN BestPpn bestPpn = service.getBestPpn(ligneFromKafka, providerName, isForced, false); switch (Objects.requireNonNull(bestPpn.getDestination())) { case BEST_PPN_BACON -> ligneFromKafka.setBestPpn(bestPpn.getPpn()); - case PRINT_PPN_SUDOC -> workInProgress.get(filename).addPpnToCreate(getLigneKbartImprime(bestPpn, ligneFromKafka)); + case PRINT_PPN_SUDOC -> { + //on ne lance la création dans le Sudoc que pour les monographie + if (ligneFromKafka.getPublicationType().equals("monograph")) { + workInProgress.get(filename).addPpnToCreate(getLigneKbartImprime(bestPpn, ligneFromKafka)); + } + } case NO_PPN_FOUND_SUDOC -> { + //on ne lance la création dans le Sudoc que pour les monographie if (ligneFromKafka.getPublicationType().equals("monograph")) { workInProgress.get(filename).addPpnFromKbartToCreate(ligneFromKafka); }