diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 36bf791..a68e2ad 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; @Slf4j @Service @@ -64,15 +62,47 @@ public void initExecutor() { */ @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { - String filename = ligneKbart.key(); - try { - //traitement de chaque ligne kbart - if (!this.workInProgress.containsKey(ligneKbart.key())) { - //nouveau fichier trouvé dans le topic, on initialise les variables partagées - workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); + if (!this.workInProgress.containsKey(filename)) { + //nouveau fichier trouvé dans le topic, on initialise les variables partagées + workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); + } + if (conditionOnLastMessage(filename, ligneKbart)) + traiterMessage(ligneKbart, filename); + else { + scheduleTimeOutCheck(filename); + } + } + + /** + * + * @param filename + * @param ligneKbart + * @return + */ + private boolean conditionOnLastMessage(String filename, ConsumerRecord ligneKbart) { + return workInProgress.get(filename).getNbLignesTraitees().get() <= Integer.parseInt(new String(ligneKbart.headers().lastHeader("nbLinesTotal").value())); + } + + /** + * Méthode permettant d'effectuer certaines actions si un message n'est pas reçu au bout d'un certain temps sur un fichier donné + */ + private void scheduleTimeOutCheck(String filename) { + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + scheduledExecutorService.schedule(() -> { + //si pas de message reçu dans les 5 minutes suivant la réception du message précédent, on supprime l'objet temporaire, et on lève une erreur + workInProgress.get(filename).setIsOnError(true); + log.error("Une erreur s'est produit dans le processus de traitement des messages, veuillez relancer le traitement."); + if (workInProgress.get(filename).getSemaphore().tryAcquire()) { + handleFichier(filename); } + }, 5, TimeUnit.SECONDS); + } + + private void traiterMessage(ConsumerRecord ligneKbart, String filename) { + try { + //traitement de chaque ligne kbart LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class); String providerName = Utils.extractProvider(ligneKbart.key()); executorService.execute(() -> { @@ -102,7 +132,6 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { if (lastHeader != null) { int nbLignesTotal = Integer.parseInt(new String(lastHeader.value())); if (nbLignesTotal == workInProgress.get(filename).incrementNbLignesTraiteesAndGet()) { - if (workInProgress.get(filename).getSemaphore().tryAcquire()) { workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); handleFichier(filename); diff --git a/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java b/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java index 73f6e87..6172eb6 100644 --- a/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java +++ b/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java @@ -159,11 +159,11 @@ public String getAuthor() { @JsonIgnore public String getAnneeFromDate_monograph_published_print() { - return this.dateMonographPublishedPrint; + return this.dateMonographPublishedPrint.substring(0, 4); } @JsonIgnore public String getAnneeFromDate_monograph_published_online() { - return this.dateMonographPublishedOnline; + return this.dateMonographPublishedOnline.substring(0, 4); } } diff --git a/src/main/java/fr/abes/bestppn/service/BestPpnService.java b/src/main/java/fr/abes/bestppn/service/BestPpnService.java index b44e0ef..ffa4c48 100644 --- a/src/main/java/fr/abes/bestppn/service/BestPpnService.java +++ b/src/main/java/fr/abes/bestppn/service/BestPpnService.java @@ -124,10 +124,10 @@ private void feedPpnListFromDat(LigneKbartDto kbart, Map ppnEle if (isSendLogs) messages.add(message); ResultWsSudocDto resultCallWs = null; if (!kbart.getAnneeFromDate_monograph_published_online().isEmpty()) { - log.debug("Appel dat2ppn : date_monograph_published_online : " + kbart.getAnneeFromDate_monograph_published_online() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); + log.debug("Appel dat2ppn : date_monograph_published_online : " + kbart.getDateMonographPublishedOnline() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); resultCallWs = service.callDat2Ppn(kbart.getAnneeFromDate_monograph_published_online(), kbart.getAuthor(), kbart.getPublicationTitle(), providerName); } else if (ppnElecScoredList.isEmpty() && !kbart.getAnneeFromDate_monograph_published_print().isEmpty()) { - log.debug("Appel dat2ppn : date_monograph_published_print : " + kbart.getAnneeFromDate_monograph_published_print() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); + log.debug("Appel dat2ppn : date_monograph_published_print : " + kbart.getDateMonographPublishedPrint() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); resultCallWs = service.callDat2Ppn(kbart.getAnneeFromDate_monograph_published_print(), kbart.getAuthor(), kbart.getPublicationTitle(), providerName); } if(resultCallWs != null && !resultCallWs.getPpns().isEmpty()) { diff --git a/src/main/java/fr/abes/bestppn/service/WsService.java b/src/main/java/fr/abes/bestppn/service/WsService.java index a49acc2..4ec27dc 100644 --- a/src/main/java/fr/abes/bestppn/service/WsService.java +++ b/src/main/java/fr/abes/bestppn/service/WsService.java @@ -115,6 +115,15 @@ private ResultWsSudocDto getResultWsSudocDto(String type, String id, @Nullable S return result; } + /** + * Appel ws dat2ppn + * @param date date au format YYYY à rechercher + * @param author auteur à rechercher + * @param title titre à rechercher + * @param providerName nom du provider + * @return résultat de l'appel à dat2ppn + * @throws JsonProcessingException erreur construction objet résultat + */ public ResultWsSudocDto callDat2Ppn(String date, String author, String title, String providerName) throws JsonProcessingException { SearchDatWebDto searchDatWebDto = new SearchDatWebDto(title, providerName); if (author != null && !author.isEmpty()) {