From 1ecea3148a056f9310c45ede120db6081aca3e96 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Thu, 23 Nov 2023 14:57:39 +0100 Subject: [PATCH] CDE-297 : Ajout semaphore Refactor divers --- .../bestppn/configuration/KafkaConfig.java | 2 +- .../abes/bestppn/entity/ExecutionReport.java | 10 +- .../fr/abes/bestppn/kafka/TopicConsumer.java | 134 ++++++++---------- .../service/ExecutionReportService.java | 5 +- .../fr/abes/bestppn/service/KbartService.java | 7 +- .../abes/bestppn/service/LogFileService.java | 20 ++- src/main/resources/application-dev.properties | 2 + .../resources/application-prod.properties | 4 +- .../resources/application-test.properties | 4 +- 9 files changed, 96 insertions(+), 92 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index b22d57a..d2e720a 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -160,7 +160,7 @@ public KafkaTemplate kafkatemplateEndoftraitement(final Producer } @Bean - public Semaphore semaphore() { + public Semaphore semaphoreNbLines() { return new Semaphore(1); } } diff --git a/src/main/java/fr/abes/bestppn/entity/ExecutionReport.java b/src/main/java/fr/abes/bestppn/entity/ExecutionReport.java index cc74a1a..768f416 100644 --- a/src/main/java/fr/abes/bestppn/entity/ExecutionReport.java +++ b/src/main/java/fr/abes/bestppn/entity/ExecutionReport.java @@ -22,7 +22,11 @@ public ExecutionReport() { } public int getNbtotalLines() { - return nbtotalLines; + return this.nbtotalLines; + } + + public void setNbtotalLines(int nbtotalLines) { + this.nbtotalLines = nbtotalLines; } public int getNbBestPpnFind() { @@ -37,6 +41,10 @@ public int getNbLinesWithErrorsInBestPPNSearch() { return nbLinesWithErrorsInBestPPNSearch.get(); } + public int getNbLinesOk(){ + return nbtotalLines - nbLinesWithErrorsInBestPPNSearch.get() - nbLinesWithInputDataErrors.get(); + } + public void incrementNbBestPpnFind() { nbBestPpnFind.incrementAndGet(); } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 2860933..e5bfb9e 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -27,13 +27,6 @@ public class TopicConsumer { private final KbartService service; - @Value("${topic.name.source.kbart}") - private String topicKbart; - @Value("${topic.name.source.nbLines}") - private String topicKbartNbLines; - @Value("${topic.name.source.kbart.errors}") - private String topicKbartErrors; - @Value("${spring.kafka.concurrency.nbThread}") private int nbThread; private final EmailService emailService; @@ -52,17 +45,16 @@ public class TopicConsumer { private final LogFileService logFileService; - private int totalLine = 0; + private final Semaphore semaphoreNbLines; - private final Semaphore semaphore; - public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService, Semaphore semaphore) { + public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService, Semaphore semaphoreNbLines) { this.service = service; this.emailService = emailService; this.providerRepository = providerRepository; this.executionReportService = executionReportService; this.logFileService = logFileService; - this.semaphore = semaphore; + this.semaphoreNbLines = semaphoreNbLines; } @PostConstruct @@ -80,90 +72,75 @@ public void listenKbartFromKafka(ConsumerRecord lignesKbart) { log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset() + " / value : " + lignesKbart.value()); try { //traitement de chaque ligne kbart - if (lignesKbart.topic().equals(topicKbart)) { - this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray()); - ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j - String providerName = Utils.extractProvider(filename); - executorService.execute(() -> { - try { - service.processConsumerRecord(lignesKbart, providerName, isForced); - } catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) { - isOnError = true; - log.error(e.getMessage()); - emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithInputDataErrors(); - } catch (IllegalPpnException | BestPpnException e) { - isOnError = true; - log.error(e.getMessage()); - emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithErrorsInBestPPNSearch(); - } - }); - } + this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray()); + ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j + String providerName = Utils.extractProvider(filename); + executorService.execute(() -> { + try { + service.processConsumerRecord(lignesKbart, providerName, isForced); + } catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) { + addDataError(e.getMessage()); + } catch (IllegalPpnException | BestPpnException e) { + addBestPPNSearchError(e.getMessage()); + } + }); } catch (IllegalProviderException e) { - isOnError = true; - log.error("Erreur dans les données en entrée, provider incorrect"); - emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithInputDataErrors(); + addDataError(e.getMessage()); } - } - @KafkaListener(topics = {"${topic.name.source.nbLines}"}, groupId = "nbLinesLocal", containerFactory = "kafkaNbLinesListenerContainerFactory") + + @KafkaListener(topics = {"${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.nbLines}", containerFactory = "kafkaNbLinesListenerContainerFactory") public void listenNbLines(ConsumerRecord nbLines) { try { - semaphore.acquire(); - log.info("Permit acquis"); + semaphoreNbLines.acquire(); + log.debug("Permit acquis"); if (this.filename.equals(extractFilenameFromHeader(nbLines.headers().toArray()))) { log.info("condition vérifiée : " + nbLines.value()); - totalLine = Integer.parseInt(nbLines.value()); + executionReportService.setNbtotalLines(Integer.parseInt(nbLines.value())); if (!isOnError) { String providerName = Utils.extractProvider(filename); Optional providerOpt = providerRepository.findByProvider(providerName); service.commitDatas(providerOpt, providerName, filename); //quel que soit le résultat du traitement, on envoie le rapport par mail - log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine); + log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + executionReportService.getExecutionReport().getNbtotalLines()); emailService.sendMailWithAttachment(filename); - logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced); - emailService.clearMailAttachment(); - executionReportService.clearExecutionReport(); - totalLine = 0; + logFileService.createExecutionReport(filename, executionReportService.getExecutionReport(), isForced); } else { isOnError = false; } } } catch (IllegalPackageException | IllegalDateException e) { - isOnError = true; - log.error(e.getMessage()); - emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithErrorsInBestPPNSearch(); - } catch (IllegalProviderException e) { - isOnError = true; - log.error("Erreur dans les données en entrée, provider incorrect"); - emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithInputDataErrors(); - } catch (ExecutionException | InterruptedException | IOException e) { - isOnError = true; - log.error(e.getMessage()); - emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithInputDataErrors(); + addBestPPNSearchError(e.getMessage()); + } catch (IllegalProviderException | ExecutionException | InterruptedException | IOException e) { + addDataError(e.getMessage()); } finally { - semaphore.release(); - log.info("semaphore libéré"); + semaphoreNbLines.release(); + log.debug("semaphore libéré"); + emailService.clearMailAttachment(); + executionReportService.clearExecutionReport(); + service.clearListesKbart(); } } - /* @KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "errorsLocal", containerFactory = "kafkaKbartListenerContainerFactory") + @KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory") public void listenErrors(ConsumerRecord error) { - executorService.shutdownNow(); - isOnError = true; - log.error(error.value()); - emailService.addLineKbartToMailAttachementWithErrorMessage(error.value()); - emailService.sendMailWithAttachment(filename); - logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced); - emailService.clearMailAttachment(); - executionReportService.clearExecutionReport(); - } */ + try { + if (this.filename.equals(extractFilenameFromHeader(error.headers().toArray()))) { + //erreur lors du chargement du fichier détectée, on arrête tous les threads en cours pour ne pas envoyer de lignes dans le topic + executorService.shutdown(); + isOnError = true; + log.error(error.value()); + emailService.addLineKbartToMailAttachementWithErrorMessage(error.value()); + logFileService.createExecutionReport(filename, executionReportService.getExecutionReport(), isForced); + emailService.clearMailAttachment(); + executionReportService.clearExecutionReport(); + service.clearListesKbart(); + } + } catch (IOException e) { + addDataError(e.getMessage()); + } + } private String extractFilenameFromHeader(Header[] headers) { String nomFichier = ""; @@ -177,4 +154,19 @@ private String extractFilenameFromHeader(Header[] headers) { } return nomFichier; } + + + private void addBestPPNSearchError(String message) { + isOnError = true; + log.error(message); + emailService.addLineKbartToMailAttachementWithErrorMessage(message); + executionReportService.addNbLinesWithErrorsInBestPPNSearch(); + } + + private void addDataError(String message) { + isOnError = true; + log.error(message); + emailService.addLineKbartToMailAttachementWithErrorMessage(message); + executionReportService.addNbLinesWithInputDataErrors(); + } } diff --git a/src/main/java/fr/abes/bestppn/service/ExecutionReportService.java b/src/main/java/fr/abes/bestppn/service/ExecutionReportService.java index 848f73a..ce228e1 100644 --- a/src/main/java/fr/abes/bestppn/service/ExecutionReportService.java +++ b/src/main/java/fr/abes/bestppn/service/ExecutionReportService.java @@ -9,10 +9,9 @@ public class ExecutionReportService { @Getter private ExecutionReport executionReport = new ExecutionReport(); - public int getNbLinesOk(){ - return executionReport.getNbtotalLines() - executionReport.getNbLinesWithErrorsInBestPPNSearch() - executionReport.getNbLinesWithInputDataErrors(); + public void setNbtotalLines(int nbtotalLines) { + this.executionReport.setNbtotalLines(nbtotalLines); } - public void addNbBestPpnFind(){ executionReport.incrementNbBestPpnFind(); } diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java index 93c3cef..6c7e7d2 100644 --- a/src/main/java/fr/abes/bestppn/service/KbartService.java +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -87,16 +87,17 @@ public void processConsumerRecord(ConsumerRecord lignesKbart, St public void commitDatas(Optional providerOpt, String providerName, String filename) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, IOException { ProviderPackage provider = providerService.handlerProvider(providerOpt, filename, providerName); - producer.sendKbart(kbartToSend, provider, filename); producer.sendPrintNotice(ppnToCreate, filename); producer.sendPpnExNihilo(ppnFromKbartToCreate, provider, filename); + clearListesKbart(); + producer.sendEndOfTraitmentReport(filename); + } + public void clearListesKbart() { kbartToSend.clear(); ppnToCreate.clear(); ppnFromKbartToCreate.clear(); - - producer.sendEndOfTraitmentReport(filename); } private static LigneKbartImprime getLigneKbartImprime(PpnWithDestinationDto ppnWithDestinationDto, LigneKbartDto ligneFromKafka) { diff --git a/src/main/java/fr/abes/bestppn/service/LogFileService.java b/src/main/java/fr/abes/bestppn/service/LogFileService.java index fac2e9e..44c0ff2 100644 --- a/src/main/java/fr/abes/bestppn/service/LogFileService.java +++ b/src/main/java/fr/abes/bestppn/service/LogFileService.java @@ -1,5 +1,6 @@ package fr.abes.bestppn.service; +import fr.abes.bestppn.entity.ExecutionReport; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -20,13 +21,10 @@ public class LogFileService { /** * Méthode qui créé le rapport d'execution dans un fichier log indépendant du reste de l'application * @param fileName le nom du fichier - * @param totalLines le nombre total de lignes pour le fichier concerné - * @param linesOk le nombre de lignes OK pour le fichier concerné - * @param linesWithInputDataErrors le nombre de lignes contenant des erreurs de données - * @param linesWithErrorsInBestPPNSearch le nombre total de lignes contenant des erreurs lors de la recherche du bestPpn + * @param executionReport le rapport d'exécution qui sert à alimenter le fichier * @throws IOException exception levée */ - public void createExecutionReport(String fileName, int totalLines, int linesOk, int linesWithInputDataErrors, int linesWithErrorsInBestPPNSearch, boolean injectKafka) throws IOException { + public void createExecutionReport(String fileName, ExecutionReport executionReport, boolean isForced) throws IOException { try { // Création du fichier de log Logger logger = Logger.getLogger("ExecutionReport"); @@ -37,17 +35,17 @@ public void createExecutionReport(String fileName, int totalLines, int linesOk, SimpleFormatter formatter = new SimpleFormatter(); fh.setFormatter(formatter); logger.setUseParentHandlers(false); // désactive l'affichage du log dans le terminal - logger.info("TOTAL LINES : " + totalLines + System.lineSeparator() - + "LINES OK : " + linesOk + System.lineSeparator() - + "LINES WITH INPUT DATA ERRORS : " + linesWithInputDataErrors + System.lineSeparator() - + "LINES WITH ERRORS IN BESTPPN SEARCH : " + linesWithErrorsInBestPPNSearch + System.lineSeparator() - + "FORCE_OPTION : " + injectKafka + System.lineSeparator()); + logger.info("TOTAL LINES : " + executionReport.getNbtotalLines() + System.lineSeparator() + + "LINES OK : " + executionReport.getNbLinesOk() + System.lineSeparator() + + "LINES WITH INPUT DATA ERRORS : " + executionReport.getNbLinesWithInputDataErrors() + System.lineSeparator() + + "LINES WITH ERRORS IN BESTPPN SEARCH : " + executionReport.getNbLinesWithErrorsInBestPPNSearch() + System.lineSeparator() + + "FORCE_OPTION : " + isForced + System.lineSeparator()); // Fermeture du fichier de log fh.close(); // Copie le fichier existant vers le répertoire temporaire en ajoutant sa date de création - if (source != null && Files.exists(source)) { + if (Files.exists(source)) { // Vérification du chemin et création si inexistant String tempLogWithSeparator = "tempLog" + File.separator; diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 4985a7a..c926880 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -42,5 +42,7 @@ mail.ws.url= mail.ws.recipient= topic.groupid.source.kbart=lignesKbart +topic.groupid.source.nbLines=nbLines +topic.groupid.source.errors=errors logging.level.fr.abes.bestppn=INFO \ No newline at end of file diff --git a/src/main/resources/application-prod.properties b/src/main/resources/application-prod.properties index 6a527f4..4346a6c 100644 --- a/src/main/resources/application-prod.properties +++ b/src/main/resources/application-prod.properties @@ -40,4 +40,6 @@ spring.sql.bacon.init.mode=never mail.ws.url= mail.ws.recipient= -topic.groupid.source.kbart=lignesKbart \ No newline at end of file +topic.groupid.source.kbart=lignesKbart +topic.groupid.source.nbLines=nbLines +topic.groupid.source.errors=errors \ No newline at end of file diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 6a527f4..4346a6c 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -40,4 +40,6 @@ spring.sql.bacon.init.mode=never mail.ws.url= mail.ws.recipient= -topic.groupid.source.kbart=lignesKbart \ No newline at end of file +topic.groupid.source.kbart=lignesKbart +topic.groupid.source.nbLines=nbLines +topic.groupid.source.errors=errors \ No newline at end of file