diff --git a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java index 08b6031..ad2f10a 100644 --- a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java +++ b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java @@ -4,15 +4,12 @@ import fr.abes.bestppn.model.dto.PackageKbartDto; import fr.abes.bestppn.model.dto.kafka.LigneKbartDto; import fr.abes.bestppn.model.entity.ExecutionReport; -import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Semaphore; +import java.sql.Timestamp; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +41,8 @@ public class KafkaWorkInProgress { private final List ppnFromKbartToCreate; + private long timestamp; + public KafkaWorkInProgress(boolean isForced, boolean isBypassed) { this.isForced = isForced; @@ -55,6 +54,7 @@ public KafkaWorkInProgress(boolean isForced, boolean isBypassed) { this.kbartToSend = Collections.synchronizedList(new ArrayList<>()); this.ppnToCreate = Collections.synchronizedList(new ArrayList<>()); this.ppnFromKbartToCreate = Collections.synchronizedList(new ArrayList<>()); + this.timestamp = Calendar.getInstance().getTimeInMillis(); } public void incrementThreads() { diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 6f578ff..049608c 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -11,18 +11,22 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.logging.log4j.ThreadContext; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import org.springframework.web.client.RestClientException; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Calendar; import java.util.Map; import java.util.concurrent.ExecutionException; @Slf4j @Service public class TopicConsumer { + @Value("${delay.max.topic}") + private int maxDelayBetweenMessage; private final ObjectMapper mapper; private final KbartService service; @@ -50,6 +54,11 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}") public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { String filename = extractFilenameFromKey(ligneKbart.key()); + long now = Calendar.getInstance().getTimeInMillis(); + //si on a pas reçu de message depuis plus de maxDelayBetweenMessage + if (this.workInProgress.get(filename).getTimestamp() + maxDelayBetweenMessage < now) { + workInProgress.remove(filename); + } if (!this.workInProgress.containsKey(filename)) { //nouveau fichier trouvé dans le topic, on initialise les variables partagées workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java index b077254..b8a4f21 100644 --- a/src/main/java/fr/abes/bestppn/service/KbartService.java +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.util.Calendar; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -64,6 +65,8 @@ public void processConsumerRecord(LigneKbartDto ligneFromKafka, String providerN } } workInProgress.get(filename).addKbartToSend(ligneFromKafka); + //quel que soit le résultat du calcul du best ppn on met à jour le timestamp correspondant à la consommation du message + workInProgress.get(filename).setTimestamp(Calendar.getInstance().getTimeInMillis()); } public void commitDatas(String providerName, String filename) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, IOException, IllegalProviderException { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 45283e9..9111e36 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -32,6 +32,8 @@ topic.name.source.nbLines=bacon.kbart.toload.nbLines topic.name.target.kbart.bypass.toload=bacon.kbart.bypass.toload spring.jpa.open-in-view=false +# 5 minutes en ms +delay.max.topic=300000 # SpringDoc Swagger springdoc.swagger-ui.path=/convergence-documentation