Skip to content

Commit

Permalink
FEAT : CDE-403-sur-un-gros-fichier-le-bad-ecrase-son-contenu-au-fil-d…
Browse files Browse the repository at this point in the history
…u-traitement

     - surpression d'un sleep avant le log de fin de traitement
  • Loading branch information
EryneKL committed Apr 5, 2024
1 parent 67de75b commit 446a968
Showing 1 changed file with 2 additions and 5 deletions.
7 changes: 2 additions & 5 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
log.error(e.getMessage());
workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(ligneKbartDto, e.getMessage());
workInProgress.get(filename).addNbLinesWithErrorsInExecutionReport();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//on ne décrémente pas le nb de thread si l'objet de suivi a été supprimé après la production des messages dans le second topic
if (workInProgress.get(filename) != null)
Expand All @@ -125,7 +123,7 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
}


private void handleFichier(String filename) throws InterruptedException {
private void handleFichier(String filename) {
//on attend que l'ensemble des threads aient terminé de travailler avant de lancer le commit
do {
try {
Expand Down Expand Up @@ -153,14 +151,13 @@ private void handleFichier(String filename) throws InterruptedException {
log.error("Le nom du fichier " + filename + " n'est pas correct. " + e);
emailService.sendProductionErrorEmail(filename, e.getMessage());
} finally {
Thread.sleep(5000); // pour attendre que tous les threads de best-ppn-api aient terminé leurs traitements
log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).incrementNbLignesTraiteesAndGet());
workInProgress.remove(filename);
}
}

@KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory")
public void errorsListener(ConsumerRecord<String, String> error) throws InterruptedException {
public void errorsListener(ConsumerRecord<String, String> error) {
log.error(error.value());
String filename = error.key();
do {
Expand Down

0 comments on commit 446a968

Please sign in to comment.