Skip to content

Commit

Permalink
Suppression timeout sur lecture des messages suite à fausse manip
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre-maraval committed Apr 25, 2024
1 parent 7741d23 commit af699ed
Showing 1 changed file with 4 additions and 36 deletions.
40 changes: 4 additions & 36 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@Service
Expand Down Expand Up @@ -67,40 +69,6 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
//nouveau fichier trouvé dans le topic, on initialise les variables partagées
workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS")));
}
if (conditionOnLastMessage(filename, ligneKbart))
traiterMessage(ligneKbart, filename);
else {
scheduleTimeOutCheck(filename);
}
}

/**
*
* @param filename
* @param ligneKbart
* @return
*/
private boolean conditionOnLastMessage(String filename, ConsumerRecord<String, String> ligneKbart) {
return workInProgress.get(filename).getNbLignesTraitees().get() <= Integer.parseInt(new String(ligneKbart.headers().lastHeader("nbLinesTotal").value()));
}

/**
* Méthode permettant d'effectuer certaines actions si un message n'est pas reçu au bout d'un certain temps sur un fichier donné
*/
private void scheduleTimeOutCheck(String filename) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> {
//si pas de message reçu dans les 5 minutes suivant la réception du message précédent, on supprime l'objet temporaire, et on lève une erreur
workInProgress.get(filename).setIsOnError(true);
log.error("Une erreur s'est produit dans le processus de traitement des messages, veuillez relancer le traitement.");
if (workInProgress.get(filename).getSemaphore().tryAcquire()) {
handleFichier(filename);
}
}, 5, TimeUnit.SECONDS);

}

private void traiterMessage(ConsumerRecord<String, String> ligneKbart, String filename) {
try {
//traitement de chaque ligne kbart
LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class);
Expand Down Expand Up @@ -151,8 +119,8 @@ private void traiterMessage(ConsumerRecord<String, String> ligneKbart, String fi
workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage());
workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport();
}
}

}

private void handleFichier(String filename) {
//on attend que l'ensemble des threads aient terminé de travailler avant de lancer le commit
Expand Down

0 comments on commit af699ed

Please sign in to comment.