From 15a57abc6a2800395adfa9178df0b40c4394b3dd Mon Sep 17 00:00:00 2001 From: EryneKL <97091460+EryneKL@users.noreply.github.com> Date: Wed, 15 Nov 2023 09:57:59 +0100 Subject: [PATCH 1/4] =?UTF-8?q?FEAT=20:=20CDE-277-Mesurer-et-noter-les-per?= =?UTF-8?q?formances-sur-plusieurs-fichiers=20:=20=20=20=20=20=20-=20ajout?= =?UTF-8?q?=20de=20d=C3=A9tails=20sur=20les=20log.debug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/fr/abes/bestppn/utils/ExecutionTimeAspect.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java b/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java index 643c7b2..657b7c0 100644 --- a/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java +++ b/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java @@ -5,6 +5,8 @@ import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; +import java.util.Arrays; + @Aspect @Slf4j public class ExecutionTimeAspect { @@ -18,7 +20,13 @@ public Object measureExecutionTime(ProceedingJoinPoint joinPoint) throws Throwab long endTime = System.currentTimeMillis(); double executionTime = (endTime - startTime) / 1000; + log.debug("------------------------------------------------------"); + log.debug("Classe : " + joinPoint.getSignature().getDeclaringTypeName()); + log.debug("Méthode : " + joinPoint.getSignature().getName()); + log.debug("Paramètres : " + Arrays.toString(joinPoint.getArgs())); log.debug("Temps d'exécution : " + executionTime + " secondes"); + log.debug("------------------------------------------------------"); + return result; } } From 688affd65c58acdd3bf6b5dfc00ab10f003c48cd Mon Sep 17 00:00:00 2001 From: Jerome Villiseck Date: Wed, 15 Nov 2023 13:21:26 +0100 Subject: [PATCH 2/4] cast double pr eviter troncature temps execution --- src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java b/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java index 643c7b2..872e4cd 100644 --- a/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java +++ b/src/main/java/fr/abes/bestppn/utils/ExecutionTimeAspect.java @@ -16,7 +16,7 @@ public Object measureExecutionTime(ProceedingJoinPoint joinPoint) throws Throwab Object result = joinPoint.proceed(); long endTime = System.currentTimeMillis(); - double executionTime = (endTime - startTime) / 1000; + double executionTime = (double) (endTime - startTime) / 1000; log.debug("Temps d'exécution : " + executionTime + " secondes"); return result; From b936c076e5d84edc2f04cf3f03e2030704446acc Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Mon, 20 Nov 2023 14:37:13 +0100 Subject: [PATCH 3/4] CDE-269 : Ajout multi threaded consumer --- .../bestppn/configuration/KafkaConfig.java | 1 + .../fr/abes/bestppn/kafka/TopicConsumer.java | 229 +--------------- .../fr/abes/bestppn/kafka/TopicProducer.java | 6 +- .../fr/abes/bestppn/service/KbartService.java | 254 ++++++++++++++++++ .../fr/abes/bestppn/service/WsService.java | 4 +- src/main/resources/application-dev.properties | 1 + .../resources/application-prod.properties | 1 + .../resources/application-test.properties | 1 + 8 files changed, 269 insertions(+), 228 deletions(-) create mode 100644 src/main/java/fr/abes/bestppn/service/KbartService.java diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index d1d6595..a0543bb 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -46,6 +46,7 @@ public ConsumerFactory consumerKbartFactory() { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return new DefaultKafkaConsumerFactory<>(props); } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index f1a4247..240922a 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -1,240 +1,27 @@ package fr.abes.bestppn.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; -import fr.abes.LigneKbartImprime; -import fr.abes.bestppn.dto.PackageKbartDto; -import fr.abes.bestppn.dto.kafka.LigneKbartDto; -import fr.abes.bestppn.dto.kafka.PpnWithDestinationDto; -import fr.abes.bestppn.entity.ExecutionReport; -import fr.abes.bestppn.entity.bacon.Provider; -import fr.abes.bestppn.entity.bacon.ProviderPackage; -import fr.abes.bestppn.exception.*; -import fr.abes.bestppn.repository.bacon.LigneKbartRepository; -import fr.abes.bestppn.repository.bacon.ProviderPackageRepository; -import fr.abes.bestppn.repository.bacon.ProviderRepository; -import fr.abes.bestppn.service.BestPpnService; -import fr.abes.bestppn.service.EmailService; -import fr.abes.bestppn.service.LogFileService; -import fr.abes.bestppn.utils.Utils; -import jakarta.mail.MessagingException; -import lombok.RequiredArgsConstructor; +import fr.abes.bestppn.service.KbartService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Header; -import org.apache.logging.log4j.ThreadContext; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; -import org.springframework.web.client.RestClientException; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutionException; @Slf4j @Service -@RequiredArgsConstructor public class TopicConsumer { - @Autowired - private ObjectMapper mapper; - - @Autowired - private BestPpnService service; - - @Autowired - private TopicProducer producer; - - @Autowired - private EmailService serviceMail; - - @Autowired - private LogFileService logFileService; - - private final List kbartToSend = new ArrayList<>(); - - private final List ppnToCreate = new ArrayList<>(); - - private final List ppnFromKbartToCreate = new ArrayList<>(); - - private final PackageKbartDto mailAttachment = new PackageKbartDto(); - - private final ProviderPackageRepository providerPackageRepository; - - private final ProviderRepository providerRepository; - - private final LigneKbartRepository ligneKbartRepository; - - private final List
headerList = new ArrayList<>(); + private final KbartService service; - private boolean isOnError = false; - - boolean injectKafka = false; - - private ExecutionReport executionReport = new ExecutionReport(); - - private String filename = ""; - - private String totalLine = ""; + public TopicConsumer(KbartService service) { + this.service = service; + } /** * Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent. * @param lignesKbart message kafka récupéré par le Consumer Kafka */ - @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory") + @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") public void listenKbartFromKafka(ConsumerRecord lignesKbart) throws Exception { - try { - String currentLine = ""; - for (Header header : lignesKbart.headers().toArray()) { - if (header.key().equals("FileName")) { - filename = new String(header.value()); - headerList.add(header); - if (filename.contains("_FORCE")) { - injectKafka = true; - } - } else if (header.key().equals("CurrentLine")) { - currentLine = new String(header.value()); - headerList.add(header); - } else if (header.key().equals("TotalLine")) { - totalLine = new String(header.value()); - executionReport.setNbtotalLines(Integer.parseInt(totalLine)); - headerList.add(header); - } - } - ThreadContext.put("package", (filename + "[line : " + currentLine + "]")); // Ajoute le numéro de ligne courante au contexte log4j2 pour inscription dans le header kafka - - String nbLine = currentLine + "/" + totalLine; - String providerName = Utils.extractProvider(filename); - Optional providerOpt = providerRepository.findByProvider(providerName); - if (lignesKbart.value().equals("OK")) { - if (!isOnError) { - ProviderPackage provider = handlerProvider(providerOpt, filename, providerName); - - producer.sendKbart(kbartToSend, provider, filename); - producer.sendPrintNotice(ppnToCreate, filename); - producer.sendPpnExNihilo(ppnFromKbartToCreate, provider, filename); - } else { - isOnError = false; - } - log.info("Nombre de best ppn trouvé : " + executionReport.getNbBestPpnFind() + "/" + totalLine); - serviceMail.sendMailWithAttachment(filename, mailAttachment); - producer.sendEndOfTraitmentReport(headerList); // Appel le producer pour l'envoi du message de fin de traitement. - logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReport.getNbLinesOk(), executionReport.getNbLinesWithInputDataErrors(), executionReport.getNbLinesWithErrorsInBestPPNSearch(), injectKafka); - kbartToSend.clear(); - ppnToCreate.clear(); - ppnFromKbartToCreate.clear(); - mailAttachment.clearKbartDto(); - executionReport.clear(); - } else { - LigneKbartDto ligneFromKafka = mapper.readValue(lignesKbart.value(), LigneKbartDto.class); - if (ligneFromKafka.isBestPpnEmpty()) { - log.info("Debut du calcul du bestppn sur la ligne : " + nbLine); - log.info(ligneFromKafka.toString()); - PpnWithDestinationDto ppnWithDestinationDto = service.getBestPpn(ligneFromKafka, providerName, injectKafka); - switch (ppnWithDestinationDto.getDestination()) { - case BEST_PPN_BACON -> { - ligneFromKafka.setBestPpn(ppnWithDestinationDto.getPpn()); - executionReport.addNbBestPpnFind(); - } - case PRINT_PPN_SUDOC -> { - LigneKbartImprime ligne = LigneKbartImprime.newBuilder() - .setPpn(ppnWithDestinationDto.getPpn()) - .setPublicationTitle(ligneFromKafka.getPublicationTitle()) - .setPrintIdentifier(ligneFromKafka.getPrintIdentifier()) - .setOnlineIdentifier(ligneFromKafka.getOnlineIdentifier()) - .setDateFirstIssueOnline(ligneFromKafka.getDateFirstIssueOnline()) - .setNumFirstVolOnline(ligneFromKafka.getNumFirstVolOnline()) - .setNumFirstIssueOnline(ligneFromKafka.getNumFirstIssueOnline()) - .setDateLastIssueOnline(ligneFromKafka.getDateLastIssueOnline()) - .setNumLastVolOnline(ligneFromKafka.getNumLastVolOnline()) - .setNumLastIssueOnline(ligneFromKafka.getNumLastIssueOnline()) - .setTitleUrl(ligneFromKafka.getTitleUrl()) - .setFirstAuthor(ligneFromKafka.getFirstAuthor()) - .setTitleId(ligneFromKafka.getTitleId()) - .setEmbargoInfo(ligneFromKafka.getEmbargoInfo()) - .setCoverageDepth(ligneFromKafka.getCoverageDepth()) - .setNotes(ligneFromKafka.getNotes()) - .setPublisherName(ligneFromKafka.getPublisherName()) - .setPublicationType(ligneFromKafka.getPublicationType()) - .setDateMonographPublishedPrint(ligneFromKafka.getDateMonographPublishedPrint()) - .setDateMonographPublishedOnline(ligneFromKafka.getDateMonographPublishedOnline()) - .setMonographVolume(ligneFromKafka.getMonographVolume()) - .setMonographEdition(ligneFromKafka.getMonographEdition()) - .setFirstEditor(ligneFromKafka.getFirstEditor()) - .setParentPublicationTitleId(ligneFromKafka.getParentPublicationTitleId()) - .setPrecedingPublicationTitleId(ligneFromKafka.getPrecedingPublicationTitleId()) - .setAccessType(ligneFromKafka.getAccessType()) - .build(); - ppnToCreate.add(ligne); - } - case NO_PPN_FOUND_SUDOC -> { - if (ligneFromKafka.getPublicationType().equals("monograph")) { - ppnFromKbartToCreate.add(ligneFromKafka); - } - } - } - //on envoie vers bacon même si on n'a pas trouvé de bestppn - kbartToSend.add(ligneFromKafka); - } else { - log.info("Bestppn déjà existant sur la ligne : " + nbLine + ", le voici : " + ligneFromKafka.getBestPpn()); - kbartToSend.add(ligneFromKafka); - } - mailAttachment.addKbartDto(ligneFromKafka); - } - } catch (IllegalProviderException e) { - isOnError = true; - log.error("Erreur dans les données en entrée, provider incorrect"); - addLineToMailAttachementWithErrorMessage(e.getMessage()); - executionReport.addNbLinesWithInputDataErrors(); - } catch (URISyntaxException | RestClientException | IllegalArgumentException | IOException | - IllegalPackageException | IllegalDateException e) { - isOnError = true; - log.error(e.getMessage()); - addLineToMailAttachementWithErrorMessage(e.getMessage()); - executionReport.addNbLinesWithInputDataErrors(); - } catch (IllegalPpnException | BestPpnException e) { - isOnError = true; - log.error(e.getMessage()); - addLineToMailAttachementWithErrorMessage(e.getMessage()); - executionReport.addNbLinesWithErrorsInBestPPNSearch(); - } catch (MessagingException | ExecutionException | InterruptedException | RuntimeException e) { - log.error(e.getMessage()); - producer.sendEndOfTraitmentReport(headerList); - logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReport.getNbLinesOk(), executionReport.getNbLinesWithInputDataErrors(), executionReport.getNbLinesWithErrorsInBestPPNSearch(), injectKafka); - } - } - - private ProviderPackage handlerProvider(Optional providerOpt, String filename, String providerName) throws IllegalPackageException, IllegalDateException { - String packageName = Utils.extractPackageName(filename); - Date packageDate = Utils.extractDate(filename); - if (providerOpt.isPresent()) { - Provider provider = providerOpt.get(); - - Optional providerPackageOpt = providerPackageRepository.findByPackageNameAndDatePAndProviderIdtProvider(packageName,packageDate,provider.getIdtProvider()); - if( providerPackageOpt.isPresent()){ - log.info("clear row package : " + providerPackageOpt.get()); - ligneKbartRepository.deleteAllByIdProviderPackage(providerPackageOpt.get().getIdProviderPackage()); - return providerPackageOpt.get(); - } else { - //pas d'info de package, on le crée - return providerPackageRepository.save(new ProviderPackage(packageName, packageDate, provider.getIdtProvider(), 'N')); - } - } else { - //pas de provider, ni de package, on les crée tous les deux - Provider newProvider = new Provider(providerName); - Provider savedProvider = providerRepository.save(newProvider); - log.info("Le provider " + savedProvider.getProvider() + " a été créé."); - ProviderPackage providerPackage = new ProviderPackage(packageName, packageDate, savedProvider.getIdtProvider(), 'N'); - return providerPackageRepository.save(providerPackage); - } - } - - private void addLineToMailAttachementWithErrorMessage(String messageError) { - LigneKbartDto ligneVide = new LigneKbartDto(); - ligneVide.setErrorType(messageError); - mailAttachment.addKbartDto(ligneVide); + log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset()); + service.processConsumerRecord(lignesKbart); } } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index fc15e73..1704baf 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -68,18 +68,14 @@ public TopicProducer(KafkaTemplate kafkaTemplateConne * @param filename : nom du fichier du traitement en cours */ @Transactional(transactionManager = "kafkaTransactionManagerKbartConnect", rollbackFor = {BestPpnException.class, JsonProcessingException.class}) - public void sendKbart(List kbart, ProviderPackage provider, String filename) throws JsonProcessingException, BestPpnException, ExecutionException, InterruptedException { - int numLigneCourante = 0; + public void sendKbart(List kbart, ProviderPackage provider, String filename) { for (LigneKbartDto ligne : kbart) { - numLigneCourante++; ligne.setIdProviderPackage(provider.getIdProviderPackage()); ligne.setProviderPackagePackage(provider.getPackageName()); ligne.setProviderPackageDateP(provider.getDateP()); ligne.setProviderPackageIdtProvider(provider.getProviderIdtProvider()); List
headerList = new ArrayList<>(); headerList.add(constructHeader("filename", filename.getBytes())); - if (numLigneCourante == kbart.size()) - headerList.add(constructHeader("OK", "true".getBytes())); sendObject(ligne, topicKbart, headerList); } if (!kbart.isEmpty()) diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java new file mode 100644 index 0000000..2814990 --- /dev/null +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -0,0 +1,254 @@ +package fr.abes.bestppn.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import fr.abes.LigneKbartImprime; +import fr.abes.bestppn.dto.PackageKbartDto; +import fr.abes.bestppn.dto.kafka.LigneKbartDto; +import fr.abes.bestppn.dto.kafka.PpnWithDestinationDto; +import fr.abes.bestppn.entity.ExecutionReport; +import fr.abes.bestppn.entity.bacon.Provider; +import fr.abes.bestppn.entity.bacon.ProviderPackage; +import fr.abes.bestppn.exception.*; +import fr.abes.bestppn.kafka.TopicProducer; +import fr.abes.bestppn.repository.bacon.LigneKbartRepository; +import fr.abes.bestppn.repository.bacon.ProviderPackageRepository; +import fr.abes.bestppn.repository.bacon.ProviderRepository; +import fr.abes.bestppn.utils.Utils; +import jakarta.mail.MessagingException; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.logging.log4j.ThreadContext; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.client.RestClientException; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +@Service +@Slf4j +public class KbartService { + private final ObjectMapper mapper; + + private final BestPpnService service; + + private final TopicProducer producer; + + private final EmailService serviceMail; + + private final LogFileService logFileService; + + private final List kbartToSend = new ArrayList<>(); + + private final List ppnToCreate = new ArrayList<>(); + + private final List ppnFromKbartToCreate = new ArrayList<>(); + + private final PackageKbartDto mailAttachment = new PackageKbartDto(); + + private final ProviderPackageRepository providerPackageRepository; + + private final ProviderRepository providerRepository; + + private final LigneKbartRepository ligneKbartRepository; + private final List
headerList = new ArrayList<>(); + + private boolean isOnError = false; + + boolean injectKafka = false; + + private ExecutionReport executionReport = new ExecutionReport(); + + private String filename = ""; + + private String totalLine = ""; + + public KbartService(ObjectMapper mapper, BestPpnService service, TopicProducer producer, EmailService serviceMail, LogFileService logFileService, ProviderPackageRepository providerPackageRepository, ProviderRepository providerRepository, LigneKbartRepository ligneKbartRepository) { + this.mapper = mapper; + this.service = service; + this.producer = producer; + this.serviceMail = serviceMail; + this.logFileService = logFileService; + this.providerPackageRepository = providerPackageRepository; + this.providerRepository = providerRepository; + this.ligneKbartRepository = ligneKbartRepository; + } + + @Transactional + public void processConsumerRecord(ConsumerRecord lignesKbart) throws ExecutionException, InterruptedException, IOException { + try { + String currentLine = extractDataFromHeader(lignesKbart.headers().toArray()); + ThreadContext.put("package", (filename + "[line : " + currentLine + "]")); // Ajoute le numéro de ligne courante au contexte log4j2 pour inscription dans le header kafka + + String nbLine = currentLine + "/" + totalLine; + String providerName = Utils.extractProvider(filename); + Optional providerOpt = providerRepository.findByProvider(providerName); + if (lignesKbart.value().equals("OK")) { + commitDatas(providerOpt, providerName); + } else { + handleLigneKbart(lignesKbart.value(), nbLine, providerName); + } + } catch (IllegalProviderException e) { + isOnError = true; + log.error("Erreur dans les données en entrée, provider incorrect"); + addLineToMailAttachementWithErrorMessage(e.getMessage()); + executionReport.addNbLinesWithInputDataErrors(); + } catch (URISyntaxException | RestClientException | IllegalArgumentException | IOException | + IllegalPackageException | IllegalDateException e) { + isOnError = true; + log.error(e.getMessage()); + addLineToMailAttachementWithErrorMessage(e.getMessage()); + executionReport.addNbLinesWithInputDataErrors(); + } catch (IllegalPpnException | BestPpnException e) { + isOnError = true; + log.error(e.getMessage()); + addLineToMailAttachementWithErrorMessage(e.getMessage()); + executionReport.addNbLinesWithErrorsInBestPPNSearch(); + } catch (MessagingException | ExecutionException | InterruptedException | RuntimeException e) { + log.error(e.getMessage()); + producer.sendEndOfTraitmentReport(headerList); + logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReport.getNbLinesOk(), executionReport.getNbLinesWithInputDataErrors(), executionReport.getNbLinesWithErrorsInBestPPNSearch(), injectKafka); + } + } + + private void handleLigneKbart(String lignesKbart, String nbLine, String providerName) throws IOException, IllegalPpnException, BestPpnException, URISyntaxException { + LigneKbartDto ligneFromKafka = mapper.readValue(lignesKbart, LigneKbartDto.class); + if (ligneFromKafka.isBestPpnEmpty()) { + log.info("Debut du calcul du bestppn sur la ligne : " + nbLine); + log.info(ligneFromKafka.toString()); + PpnWithDestinationDto ppnWithDestinationDto = service.getBestPpn(ligneFromKafka, providerName, injectKafka); + switch (ppnWithDestinationDto.getDestination()) { + case BEST_PPN_BACON -> { + ligneFromKafka.setBestPpn(ppnWithDestinationDto.getPpn()); + executionReport.addNbBestPpnFind(); + } + case PRINT_PPN_SUDOC -> { + ppnToCreate.add(getLigneKbartImprime(ppnWithDestinationDto, ligneFromKafka)); + } + case NO_PPN_FOUND_SUDOC -> { + if (ligneFromKafka.getPublicationType().equals("monograph")) { + ppnFromKbartToCreate.add(ligneFromKafka); + } + } + } + //on envoie vers bacon même si on n'a pas trouvé de bestppn + kbartToSend.add(ligneFromKafka); + } else { + log.info("Bestppn déjà existant sur la ligne : " + nbLine + ", le voici : " + ligneFromKafka.getBestPpn()); + kbartToSend.add(ligneFromKafka); + } + mailAttachment.addKbartDto(ligneFromKafka); + } + + private void commitDatas(Optional providerOpt, String providerName) throws IllegalPackageException, IllegalDateException, BestPpnException, ExecutionException, InterruptedException, MessagingException, IOException { + if (!isOnError) { + ProviderPackage provider = handlerProvider(providerOpt, filename, providerName); + + producer.sendKbart(kbartToSend, provider, filename); + producer.sendPrintNotice(ppnToCreate, filename); + producer.sendPpnExNihilo(ppnFromKbartToCreate, provider, filename); + } else { + isOnError = false; + } + log.info("Nombre de best ppn trouvé : " + executionReport.getNbBestPpnFind() + "/" + totalLine); + serviceMail.sendMailWithAttachment(filename, mailAttachment); + producer.sendEndOfTraitmentReport(headerList); // Appel le producer pour l'envoi du message de fin de traitement. + logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReport.getNbLinesOk(), executionReport.getNbLinesWithInputDataErrors(), executionReport.getNbLinesWithErrorsInBestPPNSearch(), injectKafka); + kbartToSend.clear(); + ppnToCreate.clear(); + ppnFromKbartToCreate.clear(); + mailAttachment.clearKbartDto(); + executionReport.clear(); + } + + private static LigneKbartImprime getLigneKbartImprime(PpnWithDestinationDto ppnWithDestinationDto, LigneKbartDto ligneFromKafka) { + LigneKbartImprime ligne = LigneKbartImprime.newBuilder() + .setPpn(ppnWithDestinationDto.getPpn()) + .setPublicationTitle(ligneFromKafka.getPublicationTitle()) + .setPrintIdentifier(ligneFromKafka.getPrintIdentifier()) + .setOnlineIdentifier(ligneFromKafka.getOnlineIdentifier()) + .setDateFirstIssueOnline(ligneFromKafka.getDateFirstIssueOnline()) + .setNumFirstVolOnline(ligneFromKafka.getNumFirstVolOnline()) + .setNumFirstIssueOnline(ligneFromKafka.getNumFirstIssueOnline()) + .setDateLastIssueOnline(ligneFromKafka.getDateLastIssueOnline()) + .setNumLastVolOnline(ligneFromKafka.getNumLastVolOnline()) + .setNumLastIssueOnline(ligneFromKafka.getNumLastIssueOnline()) + .setTitleUrl(ligneFromKafka.getTitleUrl()) + .setFirstAuthor(ligneFromKafka.getFirstAuthor()) + .setTitleId(ligneFromKafka.getTitleId()) + .setEmbargoInfo(ligneFromKafka.getEmbargoInfo()) + .setCoverageDepth(ligneFromKafka.getCoverageDepth()) + .setNotes(ligneFromKafka.getNotes()) + .setPublisherName(ligneFromKafka.getPublisherName()) + .setPublicationType(ligneFromKafka.getPublicationType()) + .setDateMonographPublishedPrint(ligneFromKafka.getDateMonographPublishedPrint()) + .setDateMonographPublishedOnline(ligneFromKafka.getDateMonographPublishedOnline()) + .setMonographVolume(ligneFromKafka.getMonographVolume()) + .setMonographEdition(ligneFromKafka.getMonographEdition()) + .setFirstEditor(ligneFromKafka.getFirstEditor()) + .setParentPublicationTitleId(ligneFromKafka.getParentPublicationTitleId()) + .setPrecedingPublicationTitleId(ligneFromKafka.getPrecedingPublicationTitleId()) + .setAccessType(ligneFromKafka.getAccessType()) + .build(); + return ligne; + } + + private String extractDataFromHeader(Header[] headers) { + String currentLine = ""; + for (Header header : headers) { + if (header.key().equals("FileName")) { + filename = new String(header.value()); + headerList.add(header); + if (filename.contains("_FORCE")) { + injectKafka = true; + } + } else if (header.key().equals("CurrentLine")) { + currentLine = new String(header.value()); + headerList.add(header); + } else if (header.key().equals("TotalLine")) { + totalLine = new String(header.value()); + executionReport.setNbtotalLines(Integer.parseInt(totalLine)); + headerList.add(header); + } + } + return currentLine; + } + + private ProviderPackage handlerProvider(Optional providerOpt, String filename, String providerName) throws IllegalPackageException, IllegalDateException { + String packageName = Utils.extractPackageName(filename); + Date packageDate = Utils.extractDate(filename); + if (providerOpt.isPresent()) { + Provider provider = providerOpt.get(); + + Optional providerPackageOpt = providerPackageRepository.findByPackageNameAndDatePAndProviderIdtProvider(packageName,packageDate,provider.getIdtProvider()); + if( providerPackageOpt.isPresent()){ + log.info("clear row package : " + providerPackageOpt.get()); + ligneKbartRepository.deleteAllByIdProviderPackage(providerPackageOpt.get().getIdProviderPackage()); + return providerPackageOpt.get(); + } else { + //pas d'info de package, on le crée + return providerPackageRepository.save(new ProviderPackage(packageName, packageDate, provider.getIdtProvider(), 'N')); + } + } else { + //pas de provider, ni de package, on les crée tous les deux + Provider newProvider = new Provider(providerName); + Provider savedProvider = providerRepository.save(newProvider); + log.info("Le provider " + savedProvider.getProvider() + " a été créé."); + ProviderPackage providerPackage = new ProviderPackage(packageName, packageDate, savedProvider.getIdtProvider(), 'N'); + return providerPackageRepository.save(providerPackage); + } + } + + private void addLineToMailAttachementWithErrorMessage(String messageError) { + LigneKbartDto ligneVide = new LigneKbartDto(); + ligneVide.setErrorType(messageError); + mailAttachment.addKbartDto(ligneVide); + } + +} diff --git a/src/main/java/fr/abes/bestppn/service/WsService.java b/src/main/java/fr/abes/bestppn/service/WsService.java index 59f3a3d..8ae0036 100644 --- a/src/main/java/fr/abes/bestppn/service/WsService.java +++ b/src/main/java/fr/abes/bestppn/service/WsService.java @@ -64,7 +64,7 @@ public String getRestCall(String url, String... params) throws RestClientExcepti formedUrl.append("/"); formedUrl.append(param); } - log.info(formedUrl.toString()); + log.debug(formedUrl.toString()); return restTemplate.getForObject(formedUrl.toString(), String.class); } @@ -80,7 +80,7 @@ public String getCall(String url, Map params) throws RestClientE } formedUrl.deleteCharAt(formedUrl.length() - 1); } - log.info(formedUrl.toString()); + log.debug(formedUrl.toString()); return restTemplate.getForObject(formedUrl.toString(), String.class); } diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 6b36a75..d721e89 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -3,6 +3,7 @@ spring.kafka.producer.bootstrap-servers= spring.kafka.consumer.bootstrap-servers= spring.kafka.registry.url= spring.kafka.auto.register.schema=false +spring.kafka.concurrency.nbThread= url.onlineId2Ppn= url.printId2Ppn= diff --git a/src/main/resources/application-prod.properties b/src/main/resources/application-prod.properties index 128ce49..6a527f4 100644 --- a/src/main/resources/application-prod.properties +++ b/src/main/resources/application-prod.properties @@ -3,6 +3,7 @@ spring.kafka.producer.bootstrap-servers= spring.kafka.consumer.bootstrap-servers= spring.kafka.registry.url= spring.kafka.auto.register.schema=false +spring.kafka.concurrency.nbThread= url.onlineId2Ppn= url.printId2Ppn= diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 128ce49..6a527f4 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -3,6 +3,7 @@ spring.kafka.producer.bootstrap-servers= spring.kafka.consumer.bootstrap-servers= spring.kafka.registry.url= spring.kafka.auto.register.schema=false +spring.kafka.concurrency.nbThread= url.onlineId2Ppn= url.printId2Ppn= From 1ac2eb297dafe26aeaf259ffb3ddfd4312e04c7f Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Mon, 20 Nov 2023 15:09:26 +0100 Subject: [PATCH 4/4] CDE-269 : ajout transaction timeout Modif niveaux log environnement de DEV --- src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java | 4 ++++ src/main/resources/application-dev.properties | 4 +++- src/main/resources/application.properties | 2 ++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index d1d6595..c2d93f6 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -38,6 +38,9 @@ public class KafkaConfig { @Value("${spring.kafka.auto.register.schema}") private boolean autoRegisterSchema; + @Value("${spring.kafka.producer.transaction-timeout}") + private Integer transactionTimeout; + @Bean public ConsumerFactory consumerKbartFactory() { Map props = new HashMap<>(); @@ -67,6 +70,7 @@ public Map producerConfigsWithTransaction() { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, autoRegisterSchema); + props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout); return props; } diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 6b36a75..c4d53ce 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -33,7 +33,7 @@ spring.jpa.bacon.database-platform=org.hibernate.dialect.OracleDialect spring.jpa.bacon.properties.hibernate.dialect=org.hibernate.dialect.OracleDialect spring.jpa.bacon.generate-ddl=false spring.jpa.bacon.hibernate.ddl-auto=none -spring.jpa.bacon.show-sql=true +spring.jpa.bacon.show-sql=false spring.sql.bacon.init.mode=never # Mailing @@ -41,3 +41,5 @@ mail.ws.url= mail.ws.recipient= topic.groupid.source.kbart=lignesKbart + +logging.level.fr.abes.bestppn=INFO \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 9df4eb6..1397f55 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -25,8 +25,10 @@ auto.create.topics.enable=true spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.transaction-id-prefix=tx- +spring.kafka.producer.transaction-timeout=1800000 spring.kafka.consumer.properties.isolation.level=read_committed + # Topic Kafka topic.name.target.kbart=bacon.kbart.withppn.toload topic.name.target.noticeimprime=bacon.kbart.sudoc.imprime.tocreate