diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 0b0ab0a..3fe23a0 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Slf4j @Service @@ -67,40 +69,6 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { //nouveau fichier trouvé dans le topic, on initialise les variables partagées workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); } - if (conditionOnLastMessage(filename, ligneKbart)) - traiterMessage(ligneKbart, filename); - else { - scheduleTimeOutCheck(filename); - } - } - - /** - * - * @param filename - * @param ligneKbart - * @return - */ - private boolean conditionOnLastMessage(String filename, ConsumerRecord ligneKbart) { - return workInProgress.get(filename).getNbLignesTraitees().get() <= Integer.parseInt(new String(ligneKbart.headers().lastHeader("nbLinesTotal").value())); - } - - /** - * Méthode permettant d'effectuer certaines actions si un message n'est pas reçu au bout d'un certain temps sur un fichier donné - */ - private void scheduleTimeOutCheck(String filename) { - ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - scheduledExecutorService.schedule(() -> { - //si pas de message reçu dans les 5 minutes suivant la réception du message précédent, on supprime l'objet temporaire, et on lève une erreur - workInProgress.get(filename).setIsOnError(true); - log.error("Une erreur s'est produit dans le processus de traitement des messages, veuillez relancer le traitement."); - if (workInProgress.get(filename).getSemaphore().tryAcquire()) { - handleFichier(filename); - } - }, 5, TimeUnit.SECONDS); - - } - - private void traiterMessage(ConsumerRecord ligneKbart, String filename) { try { //traitement de chaque ligne kbart LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class); @@ -151,8 +119,8 @@ private void traiterMessage(ConsumerRecord ligneKbart, String fi workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage()); workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); } - } + } private void handleFichier(String filename) { //on attend que l'ensemble des threads aient terminé de travailler avant de lancer le commit