From d8fb76920b73f859cb3c8f939ce92f66c3b1d6ea Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Thu, 23 Nov 2023 08:32:41 +0100 Subject: [PATCH] CDE-297 : Refactor : Correction exception et modification algorithme --- .../bestppn/configuration/KafkaConfig.java | 3 +- .../fr/abes/bestppn/kafka/TopicConsumer.java | 99 ++++++++++++------- .../fr/abes/bestppn/kafka/TopicProducer.java | 3 +- .../fr/abes/bestppn/service/EmailService.java | 4 +- .../fr/abes/bestppn/service/KbartService.java | 24 +---- .../fr/abes/bestppn/service/WsService.java | 4 - 6 files changed, 71 insertions(+), 66 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index b5bea0e..8e182d1 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -99,8 +99,7 @@ public ProducerFactory producerFactoryLigneKbartImpri @Bean public ProducerFactory producerFactory() { - DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); - return factory; + return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 8780346..01e2442 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -6,8 +6,10 @@ import fr.abes.bestppn.service.EmailService; import fr.abes.bestppn.service.ExecutionReportService; import fr.abes.bestppn.service.KbartService; +import fr.abes.bestppn.service.LogFileService; import fr.abes.bestppn.utils.Utils; import jakarta.annotation.PostConstruct; +import jakarta.validation.constraints.Email; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; @@ -18,9 +20,7 @@ import java.io.IOException; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -48,19 +48,23 @@ public class TopicConsumer { private boolean isOnError = false; - private final List
headerList = new ArrayList<>(); + private final Set
headerList = new HashSet<>(); private ExecutorService executorService; private final ProviderRepository providerRepository; - private final ExecutionReportService executionReportService; - public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService) { + private final LogFileService logFileService; + + private int totalLine = 0; + + public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService) { this.service = service; this.emailService = emailService; this.providerRepository = providerRepository; this.executionReportService = executionReportService; + this.logFileService = logFileService; } @PostConstruct @@ -71,49 +75,68 @@ public void initExecutor() { * Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent. * @param lignesKbart message kafka récupéré par le Consumer Kafka */ - @KafkaListener(topics = {"${topic.name.source.kbart}, ${topic.name.source.kbart.errors}, ${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") + @KafkaListener(topics = {"${topic.name.source.kbart}", "${topic.name.source.kbart.errors}", "${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") public void listenKbartFromKafka(ConsumerRecord lignesKbart) throws Exception { log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset()); + //initialisation des paramètres extractDataFromHeader(lignesKbart.headers().toArray()); ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j - String providerName = Utils.extractProvider(filename); - Optional providerOpt = providerRepository.findByProvider(providerName); - if (lignesKbart.topic().equals(topicKbart)) { - executorService.execute(() -> { + try { + String providerName = Utils.extractProvider(filename); + Optional providerOpt = providerRepository.findByProvider(providerName); + //traitement de chaque ligne kbart + if (lignesKbart.topic().equals(topicKbart)) { + executorService.execute(() -> { + try { + service.processConsumerRecord(lignesKbart, providerName, isForced); + } catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) { + isOnError = true; + log.error(e.getMessage()); + emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); + executionReportService.addNbLinesWithInputDataErrors(); + } catch (IllegalPpnException | BestPpnException e) { + isOnError = true; + log.error(e.getMessage()); + emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); + executionReportService.addNbLinesWithErrorsInBestPPNSearch(); + } + }); + } + //réception du nombre total de lignes, indique la fin du package en cours + if (lignesKbart.topic().equals(topicKbartNbLines)) { + Thread.sleep(1000); + totalLine = Integer.parseInt(lignesKbart.value()); + //on laisse les threads terminer leur traitement + executorService.awaitTermination(1, TimeUnit.HOURS); try { - service.processConsumerRecord(lignesKbart, providerName, isForced); - } catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) { - isOnError = true; - log.error(e.getMessage()); - emailService.addLineToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithInputDataErrors(); - } catch (IllegalPpnException |BestPpnException e) { + if (!isOnError) { + service.commitDatas(providerOpt, providerName, filename, headerList); + //quel que soit le résultat du traitement, on envoie le rapport par mail + log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine); + emailService.sendMailWithAttachment(filename); + logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced); + emailService.clearMailAttachment(); + executionReportService.clearExecutionReport(); + totalLine = 0; + } else { + isOnError = false; + } + } catch (IllegalPackageException | IllegalDateException e) { isOnError = true; log.error(e.getMessage()); - emailService.addLineToMailAttachementWithErrorMessage(e.getMessage()); + emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); executionReportService.addNbLinesWithErrorsInBestPPNSearch(); } - }); - } - if (lignesKbart.topic().equals(topicKbartNbLines)) { - executorService.awaitTermination(1, TimeUnit.HOURS); - try { - if (!isOnError) - service.commitDatas(providerOpt, providerName, filename, headerList, isForced); - } catch (IllegalPackageException | IllegalDateException e) { - isOnError = true; - log.error(e.getMessage()); - emailService.addLineToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithErrorsInBestPPNSearch(); - } catch (IllegalProviderException e) { + } + if (lignesKbart.topic().equals(topicKbartErrors)) { + executorService.shutdownNow(); isOnError = true; - log.error("Erreur dans les données en entrée, provider incorrect"); - emailService.addLineToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithInputDataErrors(); } - } - if (lignesKbart.topic().equals(topicKbartErrors)) { - executorService.shutdownNow(); + } catch (IllegalProviderException e) { + isOnError = true; + log.error("Erreur dans les données en entrée, provider incorrect"); + emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); + executionReportService.addNbLinesWithInputDataErrors(); } } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index 1704baf..95955f3 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -180,7 +181,7 @@ public byte[] value() { * Envoie un message de fin de traitement sur le topic kafka endOfTraitment_kbart2kafka * @param headerList list de Header (contient le nom du package et la date) */ - public void sendEndOfTraitmentReport(List
headerList) throws ExecutionException, InterruptedException { + public void sendEndOfTraitmentReport(Set
headerList) throws ExecutionException, InterruptedException { try { ProducerRecord record = new ProducerRecord<>(topicEndOfTraitment, null, "", "OK", headerList); kafkatemplateEndoftraitement.send(record); diff --git a/src/main/java/fr/abes/bestppn/service/EmailService.java b/src/main/java/fr/abes/bestppn/service/EmailService.java index 4810ed1..919152d 100644 --- a/src/main/java/fr/abes/bestppn/service/EmailService.java +++ b/src/main/java/fr/abes/bestppn/service/EmailService.java @@ -40,13 +40,13 @@ public class EmailService { private final PackageKbartDto mailAttachment = new PackageKbartDto(); - public void addLineToMailAttachementWithErrorMessage(String messageError) { + public void addLineKbartToMailAttachementWithErrorMessage(String messageError) { LigneKbartDto ligneVide = new LigneKbartDto(); ligneVide.setErrorType(messageError); mailAttachment.addKbartDto(ligneVide); } - public void addPackageToMailAttachment(LigneKbartDto dto) { + public void addLineKbartToMailAttachment(LigneKbartDto dto) { mailAttachment.addKbartDto(dto); } diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java index d76b3e9..d97cf09 100644 --- a/src/main/java/fr/abes/bestppn/service/KbartService.java +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -18,10 +18,7 @@ import java.io.IOException; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ExecutionException; @Service @@ -35,8 +32,6 @@ public class KbartService { private final EmailService serviceMail; - private final LogFileService logFileService; - @Getter private final List kbartToSend = Collections.synchronizedList(new ArrayList<>()); @@ -51,14 +46,11 @@ public class KbartService { private final ExecutionReportService executionReportService; - private String totalLine = ""; - - public KbartService(ObjectMapper mapper, BestPpnService service, TopicProducer producer, EmailService serviceMail, LogFileService logFileService, ProviderService providerService, ExecutionReportService executionReportService) { + public KbartService(ObjectMapper mapper, BestPpnService service, TopicProducer producer, EmailService serviceMail, ProviderService providerService, ExecutionReportService executionReportService) { this.mapper = mapper; this.service = service; this.producer = producer; this.serviceMail = serviceMail; - this.logFileService = logFileService; this.providerService = providerService; this.executionReportService = executionReportService; } @@ -90,10 +82,10 @@ public void processConsumerRecord(ConsumerRecord lignesKbart, St log.info("Bestppn déjà existant sur la ligne : " + lignesKbart + ",PPN : " + ligneFromKafka.getBestPpn()); kbartToSend.add(ligneFromKafka); } - serviceMail.addPackageToMailAttachment(ligneFromKafka); + serviceMail.addLineKbartToMailAttachment(ligneFromKafka); } - public void commitDatas(Optional providerOpt, String providerName, String filename, List
headerList, boolean isForced) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, MessagingException, IOException { + public void commitDatas(Optional providerOpt, String providerName, String filename, Set
headerList) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, MessagingException, IOException { ProviderPackage provider = providerService.handlerProvider(providerOpt, filename, providerName); producer.sendKbart(kbartToSend, provider, filename); @@ -104,13 +96,7 @@ public void commitDatas(Optional providerOpt, String providerName, Str ppnToCreate.clear(); ppnFromKbartToCreate.clear(); - log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine); - serviceMail.sendMailWithAttachment(filename); - producer.sendEndOfTraitmentReport(headerList); // Appel le producer pour l'envoi du message de fin de traitement. - logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced); - - serviceMail.clearMailAttachment(); - executionReportService.clearExecutionReport(); + producer.sendEndOfTraitmentReport(headerList); } private static LigneKbartImprime getLigneKbartImprime(PpnWithDestinationDto ppnWithDestinationDto, LigneKbartDto ligneFromKafka) { diff --git a/src/main/java/fr/abes/bestppn/service/WsService.java b/src/main/java/fr/abes/bestppn/service/WsService.java index 8ae0036..18754b8 100644 --- a/src/main/java/fr/abes/bestppn/service/WsService.java +++ b/src/main/java/fr/abes/bestppn/service/WsService.java @@ -84,12 +84,10 @@ public String getCall(String url, Map params) throws RestClientE return restTemplate.getForObject(formedUrl.toString(), String.class); } - @ExecutionTime public ResultWsSudocDto callOnlineId2Ppn(String type, String id, @Nullable String provider) throws RestClientException, IllegalArgumentException { return getResultWsSudocDto(type, id, provider, urlOnlineId2Ppn); } - @ExecutionTime public ResultWsSudocDto callPrintId2Ppn(String type, String id, @Nullable String provider) throws RestClientException, IllegalArgumentException { return getResultWsSudocDto(type, id, provider, urlPrintId2Ppn); } @@ -113,7 +111,6 @@ private ResultWsSudocDto getResultWsSudocDto(String type, String id, @Nullable S return result; } - @ExecutionTime public ResultDat2PpnWebDto callDat2Ppn(String date, String author, String title) throws JsonProcessingException { SearchDatWebDto searchDatWebDto = new SearchDatWebDto(title); if (author != null && !author.isEmpty()) { @@ -125,7 +122,6 @@ public ResultDat2PpnWebDto callDat2Ppn(String date, String author, String title) return mapper.readValue(postCall(urlDat2Ppn, mapper.writeValueAsString(searchDatWebDto)), ResultDat2PpnWebDto.class); } - @ExecutionTime public ResultWsSudocDto callDoi2Ppn(String doi, @Nullable String provider) throws JsonProcessingException { Map params = new HashMap<>(); params.put("doi", doi);