Skip to content

Commit

Permalink
Merge pull request #84
Browse files Browse the repository at this point in the history
FEAT : CDE-403-sur-un-gros-fichier-le-bad-ecrase-son-contenu-au-fil-d…
  • Loading branch information
jvk88511334 authored Apr 5, 2024
2 parents 2fa3df1 + 67de75b commit fd19ae0
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
log.error(e.getMessage());
workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(ligneKbartDto, e.getMessage());
workInProgress.get(filename).addNbLinesWithErrorsInExecutionReport();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//on ne décrémente pas le nb de thread si l'objet de suivi a été supprimé après la production des messages dans le second topic
if (workInProgress.get(filename) != null)
Expand All @@ -123,7 +125,7 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
}


private void handleFichier(String filename) {
private void handleFichier(String filename) throws InterruptedException {
//on attend que l'ensemble des threads aient terminé de travailler avant de lancer le commit
do {
try {
Expand Down Expand Up @@ -151,13 +153,14 @@ private void handleFichier(String filename) {
log.error("Le nom du fichier " + filename + " n'est pas correct. " + e);
emailService.sendProductionErrorEmail(filename, e.getMessage());
} finally {
Thread.sleep(5000); // pour attendre que tous les threads de best-ppn-api aient terminé leurs traitements
log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).incrementNbLignesTraiteesAndGet());
workInProgress.remove(filename);
}
}

@KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory")
public void errorsListener(ConsumerRecord<String, String> error) {
public void errorsListener(ConsumerRecord<String, String> error) throws InterruptedException {
log.error(error.value());
String filename = error.key();
do {
Expand Down

0 comments on commit fd19ae0

Please sign in to comment.