Skip to content

Commit

Permalink
CDE-426 : Refactor : ajout Timestamp dans workInProgress pour control…
Browse files Browse the repository at this point in the history
…er que la réception de 2 messages n'est pas trop espacée, indiquant un fichier rechargé suite à message non consommé
  • Loading branch information
pierre-maraval committed Aug 30, 2024
1 parent a8440ee commit 83c5859
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
10 changes: 5 additions & 5 deletions src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
import fr.abes.bestppn.model.dto.PackageKbartDto;
import fr.abes.bestppn.model.dto.kafka.LigneKbartDto;
import fr.abes.bestppn.model.entity.ExecutionReport;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -44,6 +41,8 @@ public class KafkaWorkInProgress {

private final List<LigneKbartDto> ppnFromKbartToCreate;

private long timestamp;


public KafkaWorkInProgress(boolean isForced, boolean isBypassed) {
this.isForced = isForced;
Expand All @@ -55,6 +54,7 @@ public KafkaWorkInProgress(boolean isForced, boolean isBypassed) {
this.kbartToSend = Collections.synchronizedList(new ArrayList<>());
this.ppnToCreate = Collections.synchronizedList(new ArrayList<>());
this.ppnFromKbartToCreate = Collections.synchronizedList(new ArrayList<>());
this.timestamp = Calendar.getInstance().getTimeInMillis();
}

public void incrementThreads() {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ExecutionException;

@Slf4j
@Service
public class TopicConsumer {
@Value("${delay.max.topic}")
private int maxDelayBetweenMessage;
private final ObjectMapper mapper;
private final KbartService service;

Expand Down Expand Up @@ -50,6 +54,11 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}")
public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
String filename = extractFilenameFromKey(ligneKbart.key());
long now = Calendar.getInstance().getTimeInMillis();
//si on a pas reçu de message depuis plus de maxDelayBetweenMessage
if (this.workInProgress.get(filename).getTimestamp() + maxDelayBetweenMessage < now) {
workInProgress.remove(filename);
}
if (!this.workInProgress.containsKey(filename)) {
//nouveau fichier trouvé dans le topic, on initialise les variables partagées
workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS")));
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/fr/abes/bestppn/service/KbartService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Calendar;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -64,6 +65,8 @@ public void processConsumerRecord(LigneKbartDto ligneFromKafka, String providerN
}
}
workInProgress.get(filename).addKbartToSend(ligneFromKafka);
//quel que soit le résultat du calcul du best ppn on met à jour le timestamp correspondant à la consommation du message
workInProgress.get(filename).setTimestamp(Calendar.getInstance().getTimeInMillis());
}

public void commitDatas(String providerName, String filename) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, IOException, IllegalProviderException {
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ topic.name.source.nbLines=bacon.kbart.toload.nbLines
topic.name.target.kbart.bypass.toload=bacon.kbart.bypass.toload

spring.jpa.open-in-view=false
# 5 minutes en ms
delay.max.topic=300000

# SpringDoc Swagger
springdoc.swagger-ui.path=/convergence-documentation
Expand Down

0 comments on commit 83c5859

Please sign in to comment.