diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 81c8673..36bf791 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -82,6 +82,19 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { String origineNbCurrentLine = new String(ligneKbart.headers().lastHeader("nbCurrentLines").value()); ThreadContext.put("package", (filename + ";" + origineNbCurrentLine)); //Ajoute le nom de fichier dans le contexte du thread pour log4j service.processConsumerRecord(ligneKbartDto, providerName, workInProgress.get(filename).isForced(), workInProgress.get(filename).isBypassed(), filename); + } catch (IOException | URISyntaxException | RestClientException | IllegalDoiException e) { + //erreurs non bloquantes, on n'arrête pas le programme + log.warn(e.getMessage()); + ligneKbartDto.setErrorType(e.getMessage()); + workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); + } catch (BestPpnException e) { + if (!workInProgress.get(filename).isForced()) { + workInProgress.get(filename).setIsOnError(true); + } + log.error(e.getMessage()); + ligneKbartDto.setErrorType(e.getMessage()); + workInProgress.get(filename).addNbLinesWithErrorsInExecutionReport(); + } finally { if (ligneKbartDto.getBestPpn() != null && !ligneKbartDto.getBestPpn().isEmpty()) workInProgress.get(filename).addNbBestPpnFindedInExecutionReport(); workInProgress.get(filename).addLineKbartToMailAttachment(ligneKbartDto); @@ -89,37 +102,24 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { if (lastHeader != null) { int nbLignesTotal = Integer.parseInt(new String(lastHeader.value())); if (nbLignesTotal == workInProgress.get(filename).incrementNbLignesTraiteesAndGet()) { + if (workInProgress.get(filename).getSemaphore().tryAcquire()) { workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); handleFichier(filename); } } } - } catch (IOException | URISyntaxException | RestClientException | IllegalDoiException e) { - //erreurs non bloquantes, on n'arrête pas le programme - log.warn(e.getMessage()); - workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(ligneKbartDto, e.getMessage()); - workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); - } catch (BestPpnException e) { - if (!workInProgress.get(filename).isForced()) { - workInProgress.get(filename).setIsOnError(true); - } - log.error(e.getMessage()); - workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(ligneKbartDto, e.getMessage()); - workInProgress.get(filename).addNbLinesWithErrorsInExecutionReport(); - } finally { //on ne décrémente pas le nb de thread si l'objet de suivi a été supprimé après la production des messages dans le second topic if (workInProgress.get(filename) != null) workInProgress.get(filename).decrementThreads(); } }); - } catch(IllegalProviderException | JsonProcessingException e){ - workInProgress.get(filename).setIsOnError(true); - log.warn(e.getMessage()); - workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage()); - workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); - } - + } catch(IllegalProviderException | JsonProcessingException e){ + workInProgress.get(filename).setIsOnError(true); + log.warn(e.getMessage()); + workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage()); + workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); + } } @@ -135,9 +135,7 @@ private void handleFichier(String filename) { } } while (workInProgress.get(filename).getNbActiveThreads() > 1); try { - if (workInProgress.get(filename).isOnError()) { - log.error("Fichier " + filename + " : Une erreur s'est produite dans le traitement du fichier"); - } else { + if (!workInProgress.get(filename).isOnError()) { String providerName = Utils.extractProvider(filename); service.commitDatas(providerName, filename); //quel que soit le résultat du traitement, on envoie le rapport par mail @@ -151,7 +149,7 @@ private void handleFichier(String filename) { log.error("Le nom du fichier " + filename + " n'est pas correct. " + e); emailService.sendProductionErrorEmail(filename, e.getMessage()); } finally { - log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).incrementNbLignesTraiteesAndGet()); + log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).getNbLignesTraitees()); workInProgress.remove(filename); } } diff --git a/src/main/java/fr/abes/bestppn/service/BestPpnService.java b/src/main/java/fr/abes/bestppn/service/BestPpnService.java index 96fe6f2..b44e0ef 100644 --- a/src/main/java/fr/abes/bestppn/service/BestPpnService.java +++ b/src/main/java/fr/abes/bestppn/service/BestPpnService.java @@ -51,7 +51,7 @@ public BestPpnService(WsService service, NoticeService noticeService, TopicProdu this.checkUrlService = checkUrlService; } - public BestPpn getBestPpn(LigneKbartDto kbart, String provider, boolean injectKafka, boolean isSendLogs) throws IOException, BestPpnException, URISyntaxException, RestClientException, IllegalArgumentException, IllegalDoiException { + public BestPpn getBestPpn(LigneKbartDto kbart, String provider, boolean isForced, boolean isSendLogs) throws IOException, BestPpnException, URISyntaxException, RestClientException, IllegalArgumentException, IllegalDoiException { List messages = new ArrayList<>(); Map ppnElecScoredList = new HashMap<>(); Set ppnPrintResultList = new HashSet<>(); @@ -76,7 +76,7 @@ public BestPpn getBestPpn(LigneKbartDto kbart, String provider, boolean injectKa feedPpnListFromDat(kbart, ppnElecScoredList, ppnPrintResultList, provider, isSendLogs, messages); } - return getBestPpnByScore(kbart, ppnElecScoredList, ppnPrintResultList, injectKafka, isSendLogs, messages); + return getBestPpnByScore(kbart, ppnElecScoredList, ppnPrintResultList, isForced, isSendLogs, messages); } private void feedPpnListFromOnline(LigneKbartDto kbart, String provider, Map ppnElecScoredList, Set ppnPrintResultList, boolean isSendLogs, List messages) throws IOException, URISyntaxException, IllegalArgumentException, BestPpnException {