diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index 8e182d1..b22d57a 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -4,7 +4,9 @@ import fr.abes.LigneKbartImprime; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -14,11 +16,15 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.transaction.KafkaTransactionManager; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Semaphore; @Configuration @EnableKafka @@ -53,6 +59,18 @@ public ConsumerFactory consumerKbartFactory() { return new DefaultKafkaConsumerFactory<>(props); } + @Bean + public ConsumerFactory consumerLineFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+ UUID.randomUUID())); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + return new DefaultKafkaConsumerFactory<>(props, new ErrorHandlingDeserializer<>(new StringDeserializer()), new ErrorHandlingDeserializer<>(new StringDeserializer())); + } + @Bean public ConcurrentKafkaListenerContainerFactory kafkaKbartListenerContainerFactory() { @@ -62,6 +80,25 @@ public ConsumerFactory consumerKbartFactory() { return factory; } + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaNbLinesListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerLineFactory()); + factory.setCommonErrorHandler(new CommonErrorHandler() { + @Override + public void handleOtherException(Exception thrownException, Consumer consumer, MessageListenerContainer container, boolean batchListener) { + CommonErrorHandler.super.handleOtherException(thrownException, consumer, container, batchListener); + } + + @Override + public boolean handleOne(Exception thrownException, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { + return CommonErrorHandler.super.handleOne(thrownException, record, consumer, container); + } + }); + return factory; + } + @Bean public Map producerConfigsWithTransaction() { Map props = new HashMap<>(); @@ -121,4 +158,9 @@ public KafkaTransactionManager kafkaTransactionManage public KafkaTemplate kafkatemplateEndoftraitement(final ProducerFactory producerFactory) { return new KafkaTemplate<>(producerFactory); } + + @Bean + public Semaphore semaphore() { + return new Semaphore(1); + } } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 01e2442..2860933 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -9,7 +9,6 @@ import fr.abes.bestppn.service.LogFileService; import fr.abes.bestppn.utils.Utils; import jakarta.annotation.PostConstruct; -import jakarta.validation.constraints.Email; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; @@ -21,10 +20,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; @Slf4j @Service @@ -48,7 +44,6 @@ public class TopicConsumer { private boolean isOnError = false; - private final Set
headerList = new HashSet<>(); private ExecutorService executorService; private final ProviderRepository providerRepository; @@ -59,33 +54,36 @@ public class TopicConsumer { private int totalLine = 0; - public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService) { + private final Semaphore semaphore; + + public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService, Semaphore semaphore) { this.service = service; this.emailService = emailService; this.providerRepository = providerRepository; this.executionReportService = executionReportService; this.logFileService = logFileService; + this.semaphore = semaphore; } @PostConstruct public void initExecutor() { executorService = Executors.newFixedThreadPool(nbThread); } + /** * Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent. + * * @param lignesKbart message kafka récupéré par le Consumer Kafka */ - @KafkaListener(topics = {"${topic.name.source.kbart}", "${topic.name.source.kbart.errors}", "${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") - public void listenKbartFromKafka(ConsumerRecord lignesKbart) throws Exception { - log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset()); - //initialisation des paramètres - extractDataFromHeader(lignesKbart.headers().toArray()); - ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j + @KafkaListener(topics = {"${topic.name.source.kbart}",}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") + public void listenKbartFromKafka(ConsumerRecord lignesKbart) { + log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset() + " / value : " + lignesKbart.value()); try { - String providerName = Utils.extractProvider(filename); - Optional providerOpt = providerRepository.findByProvider(providerName); //traitement de chaque ligne kbart if (lignesKbart.topic().equals(topicKbart)) { + this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray()); + ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j + String providerName = Utils.extractProvider(filename); executorService.execute(() -> { try { service.processConsumerRecord(lignesKbart, providerName, isForced); @@ -102,54 +100,81 @@ public void listenKbartFromKafka(ConsumerRecord lignesKbart) thr } }); } - //réception du nombre total de lignes, indique la fin du package en cours - if (lignesKbart.topic().equals(topicKbartNbLines)) { - Thread.sleep(1000); - totalLine = Integer.parseInt(lignesKbart.value()); - //on laisse les threads terminer leur traitement - executorService.awaitTermination(1, TimeUnit.HOURS); - try { - if (!isOnError) { - service.commitDatas(providerOpt, providerName, filename, headerList); - //quel que soit le résultat du traitement, on envoie le rapport par mail - log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine); - emailService.sendMailWithAttachment(filename); - logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced); - emailService.clearMailAttachment(); - executionReportService.clearExecutionReport(); - totalLine = 0; - } else { - isOnError = false; - } - } catch (IllegalPackageException | IllegalDateException e) { - isOnError = true; - log.error(e.getMessage()); - emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); - executionReportService.addNbLinesWithErrorsInBestPPNSearch(); + } catch (IllegalProviderException e) { + isOnError = true; + log.error("Erreur dans les données en entrée, provider incorrect"); + emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); + executionReportService.addNbLinesWithInputDataErrors(); + } + + } + + @KafkaListener(topics = {"${topic.name.source.nbLines}"}, groupId = "nbLinesLocal", containerFactory = "kafkaNbLinesListenerContainerFactory") + public void listenNbLines(ConsumerRecord nbLines) { + try { + semaphore.acquire(); + log.info("Permit acquis"); + if (this.filename.equals(extractFilenameFromHeader(nbLines.headers().toArray()))) { + log.info("condition vérifiée : " + nbLines.value()); + totalLine = Integer.parseInt(nbLines.value()); + if (!isOnError) { + String providerName = Utils.extractProvider(filename); + Optional providerOpt = providerRepository.findByProvider(providerName); + service.commitDatas(providerOpt, providerName, filename); + //quel que soit le résultat du traitement, on envoie le rapport par mail + log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine); + emailService.sendMailWithAttachment(filename); + logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced); + emailService.clearMailAttachment(); + executionReportService.clearExecutionReport(); + totalLine = 0; + } else { + isOnError = false; } } - if (lignesKbart.topic().equals(topicKbartErrors)) { - executorService.shutdownNow(); - isOnError = true; - } + } catch (IllegalPackageException | IllegalDateException e) { + isOnError = true; + log.error(e.getMessage()); + emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); + executionReportService.addNbLinesWithErrorsInBestPPNSearch(); } catch (IllegalProviderException e) { isOnError = true; log.error("Erreur dans les données en entrée, provider incorrect"); emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); executionReportService.addNbLinesWithInputDataErrors(); + } catch (ExecutionException | InterruptedException | IOException e) { + isOnError = true; + log.error(e.getMessage()); + emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage()); + executionReportService.addNbLinesWithInputDataErrors(); + } finally { + semaphore.release(); + log.info("semaphore libéré"); } - } - private void extractDataFromHeader(Header[] headers) { + /* @KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "errorsLocal", containerFactory = "kafkaKbartListenerContainerFactory") + public void listenErrors(ConsumerRecord error) { + executorService.shutdownNow(); + isOnError = true; + log.error(error.value()); + emailService.addLineKbartToMailAttachementWithErrorMessage(error.value()); + emailService.sendMailWithAttachment(filename); + logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced); + emailService.clearMailAttachment(); + executionReportService.clearExecutionReport(); + } */ + + private String extractFilenameFromHeader(Header[] headers) { + String nomFichier = ""; for (Header header : headers) { if (header.key().equals("FileName")) { - filename = new String(header.value()); - headerList.add(header); - if (filename.contains("_FORCE")) { + nomFichier = new String(header.value()); + if (nomFichier.contains("_FORCE")) { isForced = true; } } } + return nomFichier; } } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index 95955f3..94ffdeb 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; @@ -108,7 +109,7 @@ public void sendPrintNotice(List ligneKbartImprimes, String f * @param filename : nom du fichier à traiter */ @Transactional(transactionManager = "kafkaTransactionManagerKbartConnect") - public void sendPpnExNihilo(List ppnFromKbartToCreate, ProviderPackage provider, String filename) throws JsonProcessingException { + public void sendPpnExNihilo(List ppnFromKbartToCreate, ProviderPackage provider, String filename) { for (LigneKbartDto ligne : ppnFromKbartToCreate) { ligne.setIdProviderPackage(provider.getIdProviderPackage()); ligne.setProviderPackagePackage(provider.getPackageName()); @@ -179,9 +180,10 @@ public byte[] value() { /** * Envoie un message de fin de traitement sur le topic kafka endOfTraitment_kbart2kafka - * @param headerList list de Header (contient le nom du package et la date) */ - public void sendEndOfTraitmentReport(Set
headerList) throws ExecutionException, InterruptedException { + public void sendEndOfTraitmentReport(String filename) { + List
headerList = new ArrayList<>(); + headerList.add(new RecordHeader("FileName", filename.getBytes(StandardCharsets.UTF_8))); try { ProducerRecord record = new ProducerRecord<>(topicEndOfTraitment, null, "", "OK", headerList); kafkatemplateEndoftraitement.send(record); diff --git a/src/main/java/fr/abes/bestppn/service/EmailService.java b/src/main/java/fr/abes/bestppn/service/EmailService.java index 919152d..c1bb96d 100644 --- a/src/main/java/fr/abes/bestppn/service/EmailService.java +++ b/src/main/java/fr/abes/bestppn/service/EmailService.java @@ -50,7 +50,7 @@ public void addLineKbartToMailAttachment(LigneKbartDto dto) { mailAttachment.addKbartDto(dto); } - public void sendMailWithAttachment(String packageName) throws MessagingException { + public void sendMailWithAttachment(String packageName) { try { // Création du chemin d'accès pour le fichier .csv Path csvPath = Path.of("rapport_" + packageName + ".csv"); diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java index d97cf09..93c3cef 100644 --- a/src/main/java/fr/abes/bestppn/service/KbartService.java +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -85,7 +85,7 @@ public void processConsumerRecord(ConsumerRecord lignesKbart, St serviceMail.addLineKbartToMailAttachment(ligneFromKafka); } - public void commitDatas(Optional providerOpt, String providerName, String filename, Set
headerList) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, MessagingException, IOException { + public void commitDatas(Optional providerOpt, String providerName, String filename) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, IOException { ProviderPackage provider = providerService.handlerProvider(providerOpt, filename, providerName); producer.sendKbart(kbartToSend, provider, filename); @@ -96,7 +96,7 @@ public void commitDatas(Optional providerOpt, String providerName, Str ppnToCreate.clear(); ppnFromKbartToCreate.clear(); - producer.sendEndOfTraitmentReport(headerList); + producer.sendEndOfTraitmentReport(filename); } private static LigneKbartImprime getLigneKbartImprime(PpnWithDestinationDto ppnWithDestinationDto, LigneKbartDto ligneFromKafka) {