From af699ed9d876746e3e3acdbb305060f4d27ab323 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Thu, 25 Apr 2024 14:16:39 +0200 Subject: [PATCH] =?UTF-8?q?Suppression=20timeout=20sur=20lecture=20des=20m?= =?UTF-8?q?essages=20suite=20=C3=A0=20fausse=20manip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fr/abes/bestppn/kafka/TopicConsumer.java | 40 ++----------------- 1 file changed, 4 insertions(+), 36 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 0b0ab0a..3fe23a0 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Slf4j @Service @@ -67,40 +69,6 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { //nouveau fichier trouvé dans le topic, on initialise les variables partagées workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); } - if (conditionOnLastMessage(filename, ligneKbart)) - traiterMessage(ligneKbart, filename); - else { - scheduleTimeOutCheck(filename); - } - } - - /** - * - * @param filename - * @param ligneKbart - * @return - */ - private boolean conditionOnLastMessage(String filename, ConsumerRecord ligneKbart) { - return workInProgress.get(filename).getNbLignesTraitees().get() <= Integer.parseInt(new String(ligneKbart.headers().lastHeader("nbLinesTotal").value())); - } - - /** - * Méthode permettant d'effectuer certaines actions si un message n'est pas reçu au bout d'un certain temps sur un fichier donné - */ - private void scheduleTimeOutCheck(String filename) { - ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - scheduledExecutorService.schedule(() -> { - //si pas de message reçu dans les 5 minutes suivant la réception du message précédent, on supprime l'objet temporaire, et on lève une erreur - workInProgress.get(filename).setIsOnError(true); - log.error("Une erreur s'est produit dans le processus de traitement des messages, veuillez relancer le traitement."); - if (workInProgress.get(filename).getSemaphore().tryAcquire()) { - handleFichier(filename); - } - }, 5, TimeUnit.SECONDS); - - } - - private void traiterMessage(ConsumerRecord ligneKbart, String filename) { try { //traitement de chaque ligne kbart LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class); @@ -151,8 +119,8 @@ private void traiterMessage(ConsumerRecord ligneKbart, String fi workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage()); workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); } - } + } private void handleFichier(String filename) { //on attend que l'ensemble des threads aient terminé de travailler avant de lancer le commit