diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 36bf791..3fe23a0 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -64,15 +64,13 @@ public void initExecutor() { */ @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { - String filename = ligneKbart.key(); + if (!this.workInProgress.containsKey(filename)) { + //nouveau fichier trouvé dans le topic, on initialise les variables partagées + workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); + } try { //traitement de chaque ligne kbart - if (!this.workInProgress.containsKey(ligneKbart.key())) { - //nouveau fichier trouvé dans le topic, on initialise les variables partagées - workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); - } - LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class); String providerName = Utils.extractProvider(ligneKbart.key()); executorService.execute(() -> { @@ -90,6 +88,8 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { } catch (BestPpnException e) { if (!workInProgress.get(filename).isForced()) { workInProgress.get(filename).setIsOnError(true); + } else { + workInProgress.get(filename).addKbartToSend(ligneKbartDto); } log.error(e.getMessage()); ligneKbartDto.setErrorType(e.getMessage()); @@ -102,7 +102,6 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { if (lastHeader != null) { int nbLignesTotal = Integer.parseInt(new String(lastHeader.value())); if (nbLignesTotal == workInProgress.get(filename).incrementNbLignesTraiteesAndGet()) { - if (workInProgress.get(filename).getSemaphore().tryAcquire()) { workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); handleFichier(filename); @@ -120,8 +119,8 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage()); workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); } - } + } private void handleFichier(String filename) { //on attend que l'ensemble des threads aient terminé de travailler avant de lancer le commit diff --git a/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java b/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java index 73f6e87..6172eb6 100644 --- a/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java +++ b/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java @@ -159,11 +159,11 @@ public String getAuthor() { @JsonIgnore public String getAnneeFromDate_monograph_published_print() { - return this.dateMonographPublishedPrint; + return this.dateMonographPublishedPrint.substring(0, 4); } @JsonIgnore public String getAnneeFromDate_monograph_published_online() { - return this.dateMonographPublishedOnline; + return this.dateMonographPublishedOnline.substring(0, 4); } } diff --git a/src/main/java/fr/abes/bestppn/service/BestPpnService.java b/src/main/java/fr/abes/bestppn/service/BestPpnService.java index b44e0ef..ffa4c48 100644 --- a/src/main/java/fr/abes/bestppn/service/BestPpnService.java +++ b/src/main/java/fr/abes/bestppn/service/BestPpnService.java @@ -124,10 +124,10 @@ private void feedPpnListFromDat(LigneKbartDto kbart, Map ppnEle if (isSendLogs) messages.add(message); ResultWsSudocDto resultCallWs = null; if (!kbart.getAnneeFromDate_monograph_published_online().isEmpty()) { - log.debug("Appel dat2ppn : date_monograph_published_online : " + kbart.getAnneeFromDate_monograph_published_online() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); + log.debug("Appel dat2ppn : date_monograph_published_online : " + kbart.getDateMonographPublishedOnline() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); resultCallWs = service.callDat2Ppn(kbart.getAnneeFromDate_monograph_published_online(), kbart.getAuthor(), kbart.getPublicationTitle(), providerName); } else if (ppnElecScoredList.isEmpty() && !kbart.getAnneeFromDate_monograph_published_print().isEmpty()) { - log.debug("Appel dat2ppn : date_monograph_published_print : " + kbart.getAnneeFromDate_monograph_published_print() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); + log.debug("Appel dat2ppn : date_monograph_published_print : " + kbart.getDateMonographPublishedPrint() + " / publication_title : " + kbart.getPublicationTitle() + " auteur : " + kbart.getAuthor()); resultCallWs = service.callDat2Ppn(kbart.getAnneeFromDate_monograph_published_print(), kbart.getAuthor(), kbart.getPublicationTitle(), providerName); } if(resultCallWs != null && !resultCallWs.getPpns().isEmpty()) { diff --git a/src/main/java/fr/abes/bestppn/service/WsService.java b/src/main/java/fr/abes/bestppn/service/WsService.java index a49acc2..4ec27dc 100644 --- a/src/main/java/fr/abes/bestppn/service/WsService.java +++ b/src/main/java/fr/abes/bestppn/service/WsService.java @@ -115,6 +115,15 @@ private ResultWsSudocDto getResultWsSudocDto(String type, String id, @Nullable S return result; } + /** + * Appel ws dat2ppn + * @param date date au format YYYY à rechercher + * @param author auteur à rechercher + * @param title titre à rechercher + * @param providerName nom du provider + * @return résultat de l'appel à dat2ppn + * @throws JsonProcessingException erreur construction objet résultat + */ public ResultWsSudocDto callDat2Ppn(String date, String author, String title, String providerName) throws JsonProcessingException { SearchDatWebDto searchDatWebDto = new SearchDatWebDto(title, providerName); if (author != null && !author.isEmpty()) {