Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge develop dans main #109

Merged
merged 14 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README-developpement.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ La classe `TopicConsumer.java` comporte deux `@KafkaListener` qui ont les rôles
1. `kbartFromkafkaListener()` :
- un objet `KafkaWorkInProgress` sera créé et placé dans la `Map<String, KafkaWorkInProgress>` pour chaque nouveau nom de fichier kbart détecté à la lecture des messages dans le topic kafka `bacon.kbart.toload`
- lit les messages kafka à partir du topic `bacon.kbart.toload` (chaque message correspond à une ligne d'un fichier kbart lu par l'API [kbart2kafka](https://github.com/abes-esr/kbart2kafka))
> :information_source: Le nom du fichier kbart se trouve dans la `key` du message. Le numéro de la ligne courante `nbCurrentLines` ainsi que le nombre de ligne total du fichier kbart `nbLinesTotal` sont passés dans le header du message
> :information_source: Le nom du fichier kbart se trouve dans la `key` du message. Le numéro de la ligne courante `nbCurrentLines` ainsi que le nombre de ligne total du fichier kbart `nbLinesTotal` sont passés dans la dto du message
- demande le calcul du best ppn pour chaque ligne. Le calcul du best ppn sera ignoré si le nom du fichier comporte la chaine de caractère `_BYPASS`
- demande l'envoi des lignes vers de nouveaux topics
- gère les actions consécutives aux erreurs relevées durant le traitement d'une ligne. Certaines erreurs seront ignorées si le nom du fichier comporte la chaine de caractère `_FORCE`
Expand Down
20 changes: 5 additions & 15 deletions src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,12 @@
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.concurrency.nbThread}")
private int nbThread;

@Value("${spring.kafka.consumer.bootstrap-servers}")
@Value("${abes.kafka.bootstrap-servers}")
private String bootstrapAddress;


@Value("${spring.kafka.consumer.properties.isolation.level}")
private String isolationLevel;

@Value("${spring.kafka.registry.url}")
@Value("${abes.kafka.registry.url}")
private String registryUrl;

@Value("${spring.kafka.auto.register.schema}")
private boolean autoRegisterSchema;

@Bean
public ConsumerFactory<String, String> consumerKbartFactory() {
Expand All @@ -47,8 +38,9 @@ public ConsumerFactory<String, String> consumerKbartFactory() {
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+ UUID.randomUUID()));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
return new DefaultKafkaConsumerFactory<>(props);
}

Expand All @@ -67,11 +59,9 @@ public Map<String, Object> producerConfigsWithTransaction() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, autoRegisterSchema);
//props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
return props;
}

Expand Down
18 changes: 5 additions & 13 deletions src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
import fr.abes.bestppn.model.dto.PackageKbartDto;
import fr.abes.bestppn.model.dto.kafka.LigneKbartDto;
import fr.abes.bestppn.model.entity.ExecutionReport;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -38,14 +35,14 @@ public class KafkaWorkInProgress {

private final AtomicInteger nbActiveThreads;

private final Semaphore semaphore;

private final List<LigneKbartDto> kbartToSend;

private final List<LigneKbartImprime> ppnToCreate;

private final List<LigneKbartDto> ppnFromKbartToCreate;

private long timestamp;


public KafkaWorkInProgress(boolean isForced, boolean isBypassed) {
this.isForced = isForced;
Expand All @@ -54,10 +51,10 @@ public KafkaWorkInProgress(boolean isForced, boolean isBypassed) {
this.isOnError = new AtomicBoolean(false);
this.nbLignesTraitees = new AtomicInteger(0);
this.nbActiveThreads = new AtomicInteger(0);
this.semaphore = new Semaphore(1);
this.kbartToSend = Collections.synchronizedList(new ArrayList<>());
this.ppnToCreate = Collections.synchronizedList(new ArrayList<>());
this.ppnFromKbartToCreate = Collections.synchronizedList(new ArrayList<>());
this.timestamp = Calendar.getInstance().getTimeInMillis();
}

public void incrementThreads() {
Expand Down Expand Up @@ -109,11 +106,6 @@ public void addLineKbartToMailAttachment(LigneKbartDto dto) {
mailAttachment.addKbartDto(dto);
}

@PreDestroy
public void onDestroy() {
this.semaphore.release();
}

public void addPpnToCreate(LigneKbartImprime ligneKbartImprime) {
this.ppnToCreate.add(ligneKbartImprime);
}
Expand Down
69 changes: 30 additions & 39 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import fr.abes.bestppn.utils.Utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
Expand All @@ -19,12 +18,15 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ExecutionException;

@Slf4j
@Service
public class TopicConsumer {
@Value("${delay.max.topic}")
private int maxDelayBetweenMessage;
private final ObjectMapper mapper;
private final KbartService service;

Expand All @@ -49,21 +51,27 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema
*
* @param ligneKbart message kafka récupéré par le Consumer Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}")
public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
String filename = ligneKbart.key();
String filename = extractFilenameFromKey(ligneKbart.key());
long now = Calendar.getInstance().getTimeInMillis();
//si on a pas reçu de message depuis plus de maxDelayBetweenMessage
if (this.workInProgress.containsKey(filename) && (this.workInProgress.get(filename).getTimestamp() + maxDelayBetweenMessage < now)) {
workInProgress.remove(filename);
log.debug("détection de l'ancien lancement de fichier " + filename );
}
if (!this.workInProgress.containsKey(filename)) {
//nouveau fichier trouvé dans le topic, on initialise les variables partagées
workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS")));
}
try {
//traitement de chaque ligne kbart
LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class);
String providerName = Utils.extractProvider(ligneKbart.key());
String providerName = Utils.extractProvider(filename);
try {
log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + ligneKbart.key() + ";" + Thread.currentThread().getName());
log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + filename + ";" + Thread.currentThread().getName());
workInProgress.get(filename).incrementThreads();
String origineNbCurrentLine = new String(ligneKbart.headers().lastHeader("nbCurrentLines").value());
int origineNbCurrentLine = ligneKbartDto.getNbCurrentLines();
ThreadContext.put("package", (filename + ";" + origineNbCurrentLine)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
service.processConsumerRecord(ligneKbartDto, providerName, workInProgress.get(filename).isForced(), workInProgress.get(filename).isBypassed(), filename);
} catch (IOException | URISyntaxException | RestClientException | IllegalDoiException e) {
Expand All @@ -84,22 +92,13 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
if (ligneKbartDto.getBestPpn() != null && !ligneKbartDto.getBestPpn().isEmpty())
workInProgress.get(filename).addNbBestPpnFindedInExecutionReport();
workInProgress.get(filename).addLineKbartToMailAttachment(ligneKbartDto);
Header lastHeader = ligneKbart.headers().lastHeader("nbLinesTotal");
if (lastHeader != null) {
int nbLignesTotal = Integer.parseInt(new String(lastHeader.value()));
int nbCurrentLine = workInProgress.get(filename).incrementNbLignesTraiteesAndGet();
log.debug("Ligne en cours : {} NbLignesTotal : {}", nbCurrentLine, nbLignesTotal);
if (nbLignesTotal == nbCurrentLine) {
log.debug("Commit du fichier {}", filename);
if (workInProgress.get(filename).getSemaphore().tryAcquire()) {
log.debug("pas d'erreur dans le topic d'erreur");
workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal);
handleFichier(filename);
}
}
} else {
String errorMsg = "Header absent de la ligne : " + ligneKbart.headers();
log.error(errorMsg);
int nbLignesTotal = ligneKbartDto.getNbLinesTotal();
int nbCurrentLine = workInProgress.get(filename).incrementNbLignesTraiteesAndGet();
log.debug("Ligne en cours : {} NbLignesTotal : {}", nbCurrentLine, nbLignesTotal);
if (nbLignesTotal == nbCurrentLine) {
log.debug("Commit du fichier {}", filename);
workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal);
handleFichier(filename);
}
//on ne décrémente pas le nb de thread si l'objet de suivi a été supprimé après la production des messages dans le second topic
if (workInProgress.get(filename) != null)
Expand All @@ -111,7 +110,6 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage());
workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport();
}

}

private void handleFichier(String filename) {
Expand Down Expand Up @@ -148,23 +146,16 @@ private void handleFichier(String filename) {
@KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory")
public void errorsListener(ConsumerRecord<String, String> error) {
log.error(error.value());
String filename = error.key();
do {
try {
//ajout d'un sleep sur la durée du poll kafka pour être sur que le consumer de kbart ait lu au moins une fois
Thread.sleep(80);
} catch (InterruptedException e) {
log.warn("Erreur de sleep sur attente fin de traitement");
}
} while (workInProgress.get(filename) != null && workInProgress.get(filename).getNbActiveThreads() != 0);
String filename = extractFilenameFromKey(error.key());
if (workInProgress.containsKey(filename)) {
String fileNameFromError = error.key();
if (workInProgress.get(filename).getSemaphore().tryAcquire()) {
emailService.sendProductionErrorEmail(fileNameFromError, error.value());
logFileService.createExecutionReport(fileNameFromError, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced());
workInProgress.get(filename).setIsOnError(true);
handleFichier(fileNameFromError);
}
emailService.sendProductionErrorEmail(filename, error.value());
logFileService.createExecutionReport(filename, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced());
workInProgress.get(filename).setIsOnError(true);
handleFichier(filename);
}
}

private String extractFilenameFromKey (String key) {
return key.substring(0, key.lastIndexOf('_'));
}
}
Loading
Loading