Skip to content

Commit

Permalink
CDE-297 : Feat : ajout semaphore pour conditionner la production dans…
Browse files Browse the repository at this point in the history
… le second topic à la fin du traitement sur les lignes kbart
  • Loading branch information
pierre-maraval committed Nov 23, 2023
1 parent d8fb769 commit efaa393
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 54 deletions.
42 changes: 42 additions & 0 deletions src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import fr.abes.LigneKbartImprime;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -14,11 +16,15 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Semaphore;

@Configuration
@EnableKafka
Expand Down Expand Up @@ -53,6 +59,18 @@ public ConsumerFactory<String, String> consumerKbartFactory() {
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConsumerFactory<String, String> consumerLineFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+ UUID.randomUUID()));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props, new ErrorHandlingDeserializer<>(new StringDeserializer()), new ErrorHandlingDeserializer<>(new StringDeserializer()));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaKbartListenerContainerFactory() {
Expand All @@ -62,6 +80,25 @@ public ConsumerFactory<String, String> consumerKbartFactory() {
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaNbLinesListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerLineFactory());
factory.setCommonErrorHandler(new CommonErrorHandler() {
@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
CommonErrorHandler.super.handleOtherException(thrownException, consumer, container, batchListener);
}

@Override
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
return CommonErrorHandler.super.handleOne(thrownException, record, consumer, container);
}
});
return factory;
}

@Bean
public Map<String, Object> producerConfigsWithTransaction() {
Map<String, Object> props = new HashMap<>();
Expand Down Expand Up @@ -121,4 +158,9 @@ public KafkaTransactionManager<String, LigneKbartImprime> kafkaTransactionManage
public KafkaTemplate<String, String> kafkatemplateEndoftraitement(final ProducerFactory producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
public Semaphore semaphore() {
return new Semaphore(1);
}
}
121 changes: 73 additions & 48 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import fr.abes.bestppn.service.LogFileService;
import fr.abes.bestppn.utils.Utils;
import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.Email;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
Expand All @@ -21,10 +20,7 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

@Slf4j
@Service
Expand All @@ -48,7 +44,6 @@ public class TopicConsumer {

private boolean isOnError = false;

private final Set<Header> headerList = new HashSet<>();
private ExecutorService executorService;

private final ProviderRepository providerRepository;
Expand All @@ -59,33 +54,36 @@ public class TopicConsumer {

private int totalLine = 0;

public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService) {
private final Semaphore semaphore;

public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService, Semaphore semaphore) {
this.service = service;
this.emailService = emailService;
this.providerRepository = providerRepository;
this.executionReportService = executionReportService;
this.logFileService = logFileService;
this.semaphore = semaphore;
}

@PostConstruct
public void initExecutor() {
executorService = Executors.newFixedThreadPool(nbThread);
}

/**
* Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent.
*
* @param lignesKbart message kafka récupéré par le Consumer Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart}", "${topic.name.source.kbart.errors}", "${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) throws Exception {
log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset());
//initialisation des paramètres
extractDataFromHeader(lignesKbart.headers().toArray());
ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
@KafkaListener(topics = {"${topic.name.source.kbart}",}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {
log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset() + " / value : " + lignesKbart.value());
try {
String providerName = Utils.extractProvider(filename);
Optional<Provider> providerOpt = providerRepository.findByProvider(providerName);
//traitement de chaque ligne kbart
if (lignesKbart.topic().equals(topicKbart)) {
this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray());
ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
String providerName = Utils.extractProvider(filename);
executorService.execute(() -> {
try {
service.processConsumerRecord(lignesKbart, providerName, isForced);
Expand All @@ -102,54 +100,81 @@ public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) thr
}
});
}
//réception du nombre total de lignes, indique la fin du package en cours
if (lignesKbart.topic().equals(topicKbartNbLines)) {
Thread.sleep(1000);
totalLine = Integer.parseInt(lignesKbart.value());
//on laisse les threads terminer leur traitement
executorService.awaitTermination(1, TimeUnit.HOURS);
try {
if (!isOnError) {
service.commitDatas(providerOpt, providerName, filename, headerList);
//quel que soit le résultat du traitement, on envoie le rapport par mail
log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine);
emailService.sendMailWithAttachment(filename);
logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
totalLine = 0;
} else {
isOnError = false;
}
} catch (IllegalPackageException | IllegalDateException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
} catch (IllegalProviderException e) {
isOnError = true;
log.error("Erreur dans les données en entrée, provider incorrect");
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
}

}

@KafkaListener(topics = {"${topic.name.source.nbLines}"}, groupId = "nbLinesLocal", containerFactory = "kafkaNbLinesListenerContainerFactory")
public void listenNbLines(ConsumerRecord<String, String> nbLines) {
try {
semaphore.acquire();
log.info("Permit acquis");
if (this.filename.equals(extractFilenameFromHeader(nbLines.headers().toArray()))) {
log.info("condition vérifiée : " + nbLines.value());
totalLine = Integer.parseInt(nbLines.value());
if (!isOnError) {
String providerName = Utils.extractProvider(filename);
Optional<Provider> providerOpt = providerRepository.findByProvider(providerName);
service.commitDatas(providerOpt, providerName, filename);
//quel que soit le résultat du traitement, on envoie le rapport par mail
log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine);
emailService.sendMailWithAttachment(filename);
logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
totalLine = 0;
} else {
isOnError = false;
}
}
if (lignesKbart.topic().equals(topicKbartErrors)) {
executorService.shutdownNow();
isOnError = true;
}
} catch (IllegalPackageException | IllegalDateException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
} catch (IllegalProviderException e) {
isOnError = true;
log.error("Erreur dans les données en entrée, provider incorrect");
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
} catch (ExecutionException | InterruptedException | IOException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
} finally {
semaphore.release();
log.info("semaphore libéré");
}

}

private void extractDataFromHeader(Header[] headers) {
/* @KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "errorsLocal", containerFactory = "kafkaKbartListenerContainerFactory")
public void listenErrors(ConsumerRecord<String, String> error) {
executorService.shutdownNow();
isOnError = true;
log.error(error.value());
emailService.addLineKbartToMailAttachementWithErrorMessage(error.value());
emailService.sendMailWithAttachment(filename);
logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
} */

private String extractFilenameFromHeader(Header[] headers) {
String nomFichier = "";
for (Header header : headers) {
if (header.key().equals("FileName")) {
filename = new String(header.value());
headerList.add(header);
if (filename.contains("_FORCE")) {
nomFichier = new String(header.value());
if (nomFichier.contains("_FORCE")) {
isForced = true;
}
}
}
return nomFichier;
}
}
8 changes: 5 additions & 3 deletions src/main/java/fr/abes/bestppn/kafka/TopicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void sendPrintNotice(List<LigneKbartImprime> ligneKbartImprimes, String f
* @param filename : nom du fichier à traiter
*/
@Transactional(transactionManager = "kafkaTransactionManagerKbartConnect")
public void sendPpnExNihilo(List<LigneKbartDto> ppnFromKbartToCreate, ProviderPackage provider, String filename) throws JsonProcessingException {
public void sendPpnExNihilo(List<LigneKbartDto> ppnFromKbartToCreate, ProviderPackage provider, String filename) {
for (LigneKbartDto ligne : ppnFromKbartToCreate) {
ligne.setIdProviderPackage(provider.getIdProviderPackage());
ligne.setProviderPackagePackage(provider.getPackageName());
Expand Down Expand Up @@ -179,9 +180,10 @@ public byte[] value() {

/**
* Envoie un message de fin de traitement sur le topic kafka endOfTraitment_kbart2kafka
* @param headerList list de Header (contient le nom du package et la date)
*/
public void sendEndOfTraitmentReport(Set<Header> headerList) throws ExecutionException, InterruptedException {
public void sendEndOfTraitmentReport(String filename) {
List<Header> headerList = new ArrayList<>();
headerList.add(new RecordHeader("FileName", filename.getBytes(StandardCharsets.UTF_8)));
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicEndOfTraitment, null, "", "OK", headerList);
kafkatemplateEndoftraitement.send(record);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/fr/abes/bestppn/service/EmailService.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void addLineKbartToMailAttachment(LigneKbartDto dto) {
mailAttachment.addKbartDto(dto);
}

public void sendMailWithAttachment(String packageName) throws MessagingException {
public void sendMailWithAttachment(String packageName) {
try {
// Création du chemin d'accès pour le fichier .csv
Path csvPath = Path.of("rapport_" + packageName + ".csv");
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/fr/abes/bestppn/service/KbartService.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void processConsumerRecord(ConsumerRecord<String, String> lignesKbart, St
serviceMail.addLineKbartToMailAttachment(ligneFromKafka);
}

public void commitDatas(Optional<Provider> providerOpt, String providerName, String filename, Set<Header> headerList) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, MessagingException, IOException {
public void commitDatas(Optional<Provider> providerOpt, String providerName, String filename) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, IOException {
ProviderPackage provider = providerService.handlerProvider(providerOpt, filename, providerName);

producer.sendKbart(kbartToSend, provider, filename);
Expand All @@ -96,7 +96,7 @@ public void commitDatas(Optional<Provider> providerOpt, String providerName, Str
ppnToCreate.clear();
ppnFromKbartToCreate.clear();

producer.sendEndOfTraitmentReport(headerList);
producer.sendEndOfTraitmentReport(filename);
}

private static LigneKbartImprime getLigneKbartImprime(PpnWithDestinationDto ppnWithDestinationDto, LigneKbartDto ligneFromKafka) {
Expand Down

0 comments on commit efaa393

Please sign in to comment.