From e536ed8ba2b614e173d7f15affeddb0f4c9c88ef Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Tue, 9 Jul 2024 14:27:48 +0200 Subject: [PATCH 01/11] FIX : modification variables configuration kafka --- .../abes/bestppn/configuration/KafkaConfig.java | 17 ++++------------- .../fr/abes/bestppn/kafka/TopicConsumer.java | 2 +- .../repository/bacon/LigneKbartRepository.java | 1 - src/main/resources/application-dev.properties | 14 ++++++-------- src/main/resources/application-prod.properties | 16 +++++++--------- src/main/resources/application-test.properties | 14 ++++++-------- src/main/resources/application.properties | 5 ----- 7 files changed, 24 insertions(+), 45 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index dd1e636..baa7755 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -24,21 +24,12 @@ @Configuration @EnableKafka public class KafkaConfig { - @Value("${spring.kafka.concurrency.nbThread}") - private int nbThread; - - @Value("${spring.kafka.consumer.bootstrap-servers}") + @Value("${abes.kafka.bootstrap-servers}") private String bootstrapAddress; - - @Value("${spring.kafka.consumer.properties.isolation.level}") - private String isolationLevel; - - @Value("${spring.kafka.registry.url}") + @Value("${abes.kafka.registry.url}") private String registryUrl; - @Value("${spring.kafka.auto.register.schema}") - private boolean autoRegisterSchema; @Bean public ConsumerFactory consumerKbartFactory() { @@ -47,7 +38,7 @@ public ConsumerFactory consumerKbartFactory() { props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+ UUID.randomUUID())); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return new DefaultKafkaConsumerFactory<>(props); } @@ -70,7 +61,7 @@ public Map producerConfigsWithTransaction() { //props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); - props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, autoRegisterSchema); + props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false); //props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout); return props; } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 5f4face..601dc83 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -49,7 +49,7 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema * * @param ligneKbart message kafka récupéré par le Consumer Kafka */ - @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") + @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}") public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { String filename = ligneKbart.key(); if (!this.workInProgress.containsKey(filename)) { diff --git a/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java b/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java index 92459cd..b9fa950 100644 --- a/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java +++ b/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java @@ -9,7 +9,6 @@ @Repository @BaconDbConfiguration public interface LigneKbartRepository extends JpaRepository { - @Transactional void deleteAllByIdProviderPackage(Integer idProviderPackage); } diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index d8cb9bc..68ccd08 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -1,9 +1,10 @@ # Producer properties -spring.kafka.producer.bootstrap-servers= -spring.kafka.consumer.bootstrap-servers= -spring.kafka.registry.url= -spring.kafka.auto.register.schema=false -spring.kafka.concurrency.nbThread= +abes.kafka.bootstrap-servers= +abes.kafka.registry.url= +abes.kafka.concurrency.nbThread= + +topic.groupid.source.kbart=lignesKbart +topic.groupid.source.errors=errors url.onlineId2Ppn= url.printId2Ppn= @@ -41,8 +42,5 @@ spring.sql.bacon.init.mode=never mail.ws.url= mail.ws.recipient= -topic.groupid.source.kbart=lignesKbart -topic.groupid.source.errors=errors - logging.level.root=info logging.level.fr.abes.bestppn=debug \ No newline at end of file diff --git a/src/main/resources/application-prod.properties b/src/main/resources/application-prod.properties index 1d1fcd5..49771a1 100644 --- a/src/main/resources/application-prod.properties +++ b/src/main/resources/application-prod.properties @@ -1,9 +1,10 @@ # Producer properties -spring.kafka.producer.bootstrap-servers= -spring.kafka.consumer.bootstrap-servers= -spring.kafka.registry.url= -spring.kafka.auto.register.schema=false -spring.kafka.concurrency.nbThread= +abes.kafka.bootstrap-servers= +abes.kafka.registry.url= +abes.kafka.concurrency.nbThread= + +topic.groupid.source.kbart=lignesKbart +topic.groupid.source.errors=errors url.onlineId2Ppn= url.printId2Ppn= @@ -38,7 +39,4 @@ spring.sql.bacon.init.mode=never # Mailing mail.ws.url= -mail.ws.recipient= - -topic.groupid.source.kbart=lignesKbart -topic.groupid.source.errors=errors \ No newline at end of file +mail.ws.recipient= \ No newline at end of file diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 1f877ab..9448c79 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -1,9 +1,10 @@ # Producer properties -spring.kafka.producer.bootstrap-servers= -spring.kafka.consumer.bootstrap-servers= -spring.kafka.registry.url= -spring.kafka.auto.register.schema=false -spring.kafka.concurrency.nbThread= +abes.kafka.bootstrap-servers= +abes.kafka.registry.url= +abes.kafka.concurrency.nbThread= + +topic.groupid.source.kbart=lignesKbart +topic.groupid.source.errors=errors url.onlineId2Ppn= url.printId2Ppn= @@ -39,6 +40,3 @@ spring.sql.bacon.init.mode=never # Mailing mail.ws.url= mail.ws.recipient= - -topic.groupid.source.kbart=lignesKbart -topic.groupid.source.errors=errors \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f476b7b..45283e9 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -20,11 +20,6 @@ server.port=8083 log4j2.logdir=logs logging.config=classpath:log4j2-all.xml -# Common Kafka Properties -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer -spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer -spring.kafka.consumer.properties.isolation.level=read_committed - # Topic Kafka topic.name.target.kbart=bacon.kbart.withppn.toload From a8ba1ade6bc5aa1b46e3313beef38c3cef5a2cf1 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Tue, 9 Jul 2024 14:33:24 +0200 Subject: [PATCH 02/11] FIX : modification variables configuration kafka log4j --- src/main/resources/log4j2-all.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/log4j2-all.xml b/src/main/resources/log4j2-all.xml index 480b1d5..536f411 100644 --- a/src/main/resources/log4j2-all.xml +++ b/src/main/resources/log4j2-all.xml @@ -7,7 +7,7 @@ ${bundle:application:application.name} ${bundle:application:application.name}_error ${bundle:application:application.name}_debug - ${env:SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS} + ${env:ABES_KAFKA_BOOTSTRAP_SERVERS} 100 MB From 085e736ffbed1afff6ca6a7dd6271e888ae0c432 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Thu, 11 Jul 2024 08:00:05 +0200 Subject: [PATCH 03/11] =?UTF-8?q?FIX=20:=20Commit=20des=20offset=20kafka?= =?UTF-8?q?=20Suppression=20s=C3=A9maphore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bestppn/configuration/KafkaConfig.java | 5 +-- .../bestppn/kafka/KafkaWorkInProgress.java | 8 ---- .../fr/abes/bestppn/kafka/TopicConsumer.java | 27 ++++--------- .../fr/abes/bestppn/kafka/TopicProducer.java | 7 +--- .../bacon/LigneKbartRepository.java | 3 +- .../fr/abes/bestppn/service/KbartService.java | 1 - .../fr/abes/bestppn/service/WsService.java | 38 ++++++++++++++++--- src/main/resources/log4j2-all.xml | 4 ++ 8 files changed, 48 insertions(+), 45 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java index baa7755..887d290 100644 --- a/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java +++ b/src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java @@ -38,8 +38,9 @@ public ConsumerFactory consumerKbartFactory() { props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+ UUID.randomUUID())); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000); return new DefaultKafkaConsumerFactory<>(props); } @@ -58,11 +59,9 @@ public Map producerConfigsWithTransaction() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - //props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false); - //props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout); return props; } diff --git a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java index 80e3dc2..08b6031 100644 --- a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java +++ b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java @@ -38,8 +38,6 @@ public class KafkaWorkInProgress { private final AtomicInteger nbActiveThreads; - private final Semaphore semaphore; - private final List kbartToSend; private final List ppnToCreate; @@ -54,7 +52,6 @@ public KafkaWorkInProgress(boolean isForced, boolean isBypassed) { this.isOnError = new AtomicBoolean(false); this.nbLignesTraitees = new AtomicInteger(0); this.nbActiveThreads = new AtomicInteger(0); - this.semaphore = new Semaphore(1); this.kbartToSend = Collections.synchronizedList(new ArrayList<>()); this.ppnToCreate = Collections.synchronizedList(new ArrayList<>()); this.ppnFromKbartToCreate = Collections.synchronizedList(new ArrayList<>()); @@ -109,11 +106,6 @@ public void addLineKbartToMailAttachment(LigneKbartDto dto) { mailAttachment.addKbartDto(dto); } - @PreDestroy - public void onDestroy() { - this.semaphore.release(); - } - public void addPpnToCreate(LigneKbartImprime ligneKbartImprime) { this.ppnToCreate.add(ligneKbartImprime); } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 601dc83..3f9334d 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -14,6 +14,7 @@ import org.apache.logging.log4j.ThreadContext; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import org.springframework.web.client.RestClientException; @@ -91,11 +92,8 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { log.debug("Ligne en cours : {} NbLignesTotal : {}", nbCurrentLine, nbLignesTotal); if (nbLignesTotal == nbCurrentLine) { log.debug("Commit du fichier {}", filename); - if (workInProgress.get(filename).getSemaphore().tryAcquire()) { - log.debug("pas d'erreur dans le topic d'erreur"); - workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); - handleFichier(filename); - } + workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); + handleFichier(filename); } } else { String errorMsg = "Header absent de la ligne : " + ligneKbart.headers(); @@ -111,7 +109,6 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { workInProgress.get(filename).addLineKbartToMailAttachementWithErrorMessage(new LigneKbartDto(), e.getMessage()); workInProgress.get(filename).addNbLinesWithInputDataErrorsInExecutionReport(); } - } private void handleFichier(String filename) { @@ -149,22 +146,12 @@ private void handleFichier(String filename) { public void errorsListener(ConsumerRecord error) { log.error(error.value()); String filename = error.key(); - do { - try { - //ajout d'un sleep sur la durée du poll kafka pour être sur que le consumer de kbart ait lu au moins une fois - Thread.sleep(80); - } catch (InterruptedException e) { - log.warn("Erreur de sleep sur attente fin de traitement"); - } - } while (workInProgress.get(filename) != null && workInProgress.get(filename).getNbActiveThreads() != 0); if (workInProgress.containsKey(filename)) { String fileNameFromError = error.key(); - if (workInProgress.get(filename).getSemaphore().tryAcquire()) { - emailService.sendProductionErrorEmail(fileNameFromError, error.value()); - logFileService.createExecutionReport(fileNameFromError, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced()); - workInProgress.get(filename).setIsOnError(true); - handleFichier(fileNameFromError); - } + emailService.sendProductionErrorEmail(fileNameFromError, error.value()); + logFileService.createExecutionReport(fileNameFromError, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced()); + workInProgress.get(filename).setIsOnError(true); + handleFichier(fileNameFromError); } } } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index 12263b7..07fd9bf 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -33,7 +33,7 @@ @Service @RequiredArgsConstructor public class TopicProducer { - @Value("${spring.kafka.concurrency.nbThread}") + @Value("${abes.kafka.concurrency.nbThread}") private int nbThread; @Value("${topic.name.target.kbart}") @@ -55,17 +55,14 @@ public class TopicProducer { private KafkaTemplate kafkaTemplateImprime; - private KafkaTemplate kafkatemplateEndoftraitement; - private ExecutorService executorService; private UtilsMapper utilsMapper; @Autowired - public TopicProducer(KafkaTemplate kafkaTemplateConnect, KafkaTemplate kafkaTemplateImprime, KafkaTemplate kafkatemplateEndoftraitement, UtilsMapper utilsMapper) { + public TopicProducer(KafkaTemplate kafkaTemplateConnect, KafkaTemplate kafkaTemplateImprime, UtilsMapper utilsMapper) { this.kafkaTemplateConnect = kafkaTemplateConnect; this.kafkaTemplateImprime = kafkaTemplateImprime; - this.kafkatemplateEndoftraitement = kafkatemplateEndoftraitement; this.utilsMapper = utilsMapper; } diff --git a/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java b/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java index b9fa950..a87066c 100644 --- a/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java +++ b/src/main/java/fr/abes/bestppn/repository/bacon/LigneKbartRepository.java @@ -4,11 +4,10 @@ import fr.abes.bestppn.model.entity.bacon.LigneKbart; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; @Repository @BaconDbConfiguration public interface LigneKbartRepository extends JpaRepository { - @Transactional + void deleteAllByIdProviderPackage(Integer idProviderPackage); } diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java index 1680339..b077254 100644 --- a/src/main/java/fr/abes/bestppn/service/KbartService.java +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -38,7 +38,6 @@ public KbartService(BestPpnService service, TopicProducer producer, ProviderServ this.workInProgress = workInProgress; } - @Transactional public void processConsumerRecord(LigneKbartDto ligneFromKafka, String providerName, boolean isForced, Boolean isBypassed, String filename) throws IOException, BestPpnException, URISyntaxException, IllegalDoiException { log.info("Début calcul BestPpn pour la ligne " + ligneFromKafka); if (!isBypassed) { diff --git a/src/main/java/fr/abes/bestppn/service/WsService.java b/src/main/java/fr/abes/bestppn/service/WsService.java index 4ec27dc..75ac369 100644 --- a/src/main/java/fr/abes/bestppn/service/WsService.java +++ b/src/main/java/fr/abes/bestppn/service/WsService.java @@ -18,9 +18,17 @@ import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static java.net.http.HttpResponse.BodyHandlers.ofString; @Service @Slf4j @@ -41,10 +49,13 @@ public class WsService { private final ObjectMapper mapper; + private HttpClient httpClient; + public WsService(ObjectMapper mapper) { this.headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); this.mapper = mapper; + this.httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofMillis(5000)).build(); } @@ -68,7 +79,7 @@ public String getRestCall(String url, String... params) throws RestClientExcepti return restTemplate.getForObject(formedUrl.toString(), String.class); } - public String getCall(String url, Map params) throws RestClientException { + public String getCall(String url, Map params) throws RestClientException, ExecutionException, InterruptedException { StringBuilder formedUrl = new StringBuilder(url); if (!params.isEmpty()) { formedUrl.append("?"); @@ -81,8 +92,17 @@ public String getCall(String url, Map params) throws RestClientE formedUrl.deleteCharAt(formedUrl.length() - 1); } log.debug(formedUrl.toString()); - RestTemplate restTemplate = new RestTemplate(); - return restTemplate.getForObject(formedUrl.toString(), String.class); + /*RestTemplate restTemplate = new RestTemplate(); + String result = restTemplate.getForObject(formedUrl.toString(), String.class);*/ + HttpRequest.Builder request = HttpRequest.newBuilder().GET().uri(URI.create(formedUrl.toString())); + CompletableFuture result = httpClient.sendAsync(request.build(), ofString()).thenApplyAsync(res -> { + if (res.statusCode() != 200) { + throw new IllegalStateException("Invalid rest response " + res); + } else { + return res.body(); + } + }); + return result.get(); } public ResultWsSudocDto callOnlineId2Ppn(String type, String id, @Nullable String provider) throws RestClientException, IllegalArgumentException { @@ -143,9 +163,15 @@ public ResultWsSudocDto callDoi2Ppn(String doi, @Nullable String provider) throw params.put("provider", provider); ResultWsSudocDto result; try { - result = mapper.readValue(getCall(urlDoi2Ppn, params), ResultWsSudocDto.class); + String resultCall = getCall(urlDoi2Ppn, params); + result = mapper.readValue(resultCall, ResultWsSudocDto.class); result.setUrl(urlDoi2Ppn + "?provider=" + provider + "&doi=" + doi); - } catch (RestClientException ex) { + } catch (ExecutionException | InterruptedException e) { + log.error("doi : {} / provider {} : n'est pas au bon format.", doi, provider); + throw new IllegalDoiException(e.getMessage()); + } + + /*catch (RestClientException ex) { if (ex.getMessage().contains("Le DOI n'est pas au bon format")) { log.error("doi : {} / provider {} : n'est pas au bon format.", doi, provider); throw new IllegalDoiException(ex.getMessage()); @@ -153,7 +179,7 @@ public ResultWsSudocDto callDoi2Ppn(String doi, @Nullable String provider) throw log.warn("doi : {} / provider {} : Impossible d'accéder au ws doi2ppn.", doi, provider); throw new RestClientException(ex.getMessage()); } - } + }*/ return result; } diff --git a/src/main/resources/log4j2-all.xml b/src/main/resources/log4j2-all.xml index 536f411..2e3fa7e 100644 --- a/src/main/resources/log4j2-all.xml +++ b/src/main/resources/log4j2-all.xml @@ -140,6 +140,10 @@ + + + + From cdbc51aec190900e497af21a460602c07da07661 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Thu, 11 Jul 2024 08:00:47 +0200 Subject: [PATCH 04/11] =?UTF-8?q?FIX=20:=20Commit=20des=20offset=20kafka?= =?UTF-8?q?=20Suppression=20s=C3=A9maphore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/log4j2-all.xml | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/main/resources/log4j2-all.xml b/src/main/resources/log4j2-all.xml index 2e3fa7e..c442fb1 100644 --- a/src/main/resources/log4j2-all.xml +++ b/src/main/resources/log4j2-all.xml @@ -129,18 +129,7 @@ - - - - - - - - - - - - + From a0365aed434789d30bda1346301f72556fa7a522 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Fri, 12 Jul 2024 10:02:23 +0200 Subject: [PATCH 05/11] FIX : Appel doi avec uppercase --- src/main/java/fr/abes/bestppn/service/WsService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/fr/abes/bestppn/service/WsService.java b/src/main/java/fr/abes/bestppn/service/WsService.java index 75ac369..85b8c43 100644 --- a/src/main/java/fr/abes/bestppn/service/WsService.java +++ b/src/main/java/fr/abes/bestppn/service/WsService.java @@ -159,7 +159,7 @@ public ResultWsSudocDto callDat2Ppn(String date, String author, String title, St public ResultWsSudocDto callDoi2Ppn(String doi, @Nullable String provider) throws JsonProcessingException, IllegalDoiException { Map params = new HashMap<>(); - params.put("doi", doi); + params.put("doi", doi.toUpperCase()); params.put("provider", provider); ResultWsSudocDto result; try { From f52a30ddf2069893de67ae7624e1e9d941d15167 Mon Sep 17 00:00:00 2001 From: TristanReamot Date: Fri, 30 Aug 2024 09:01:55 +0200 Subject: [PATCH 06/11] =?UTF-8?q?CDE=20424=20Opti=20Passage=20de=20Current?= =?UTF-8?q?=20Line=20et=20Nb=20Total=20Lines=20de=20header=20=C3=A0=20dto?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README-developpement.md | 2 +- .../fr/abes/bestppn/kafka/TopicConsumer.java | 25 ++++++------------- .../fr/abes/bestppn/kafka/TopicProducer.java | 14 ----------- .../bestppn/model/dto/PackageKbartDto.java | 4 +-- .../model/dto/kafka/LigneKbartDto.java | 6 +++++ 5 files changed, 16 insertions(+), 35 deletions(-) diff --git a/README-developpement.md b/README-developpement.md index 9de6662..5b7b12f 100644 --- a/README-developpement.md +++ b/README-developpement.md @@ -37,7 +37,7 @@ La classe `TopicConsumer.java` comporte deux `@KafkaListener` qui ont les rôles 1. `kbartFromkafkaListener()` : - un objet `KafkaWorkInProgress` sera créé et placé dans la `Map` pour chaque nouveau nom de fichier kbart détecté à la lecture des messages dans le topic kafka `bacon.kbart.toload` - lit les messages kafka à partir du topic `bacon.kbart.toload` (chaque message correspond à une ligne d'un fichier kbart lu par l'API [kbart2kafka](https://github.com/abes-esr/kbart2kafka)) - > :information_source: Le nom du fichier kbart se trouve dans la `key` du message. Le numéro de la ligne courante `nbCurrentLines` ainsi que le nombre de ligne total du fichier kbart `nbLinesTotal` sont passés dans le header du message + > :information_source: Le nom du fichier kbart se trouve dans la `key` du message. Le numéro de la ligne courante `nbCurrentLines` ainsi que le nombre de ligne total du fichier kbart `nbLinesTotal` sont passés dans la dto du message - demande le calcul du best ppn pour chaque ligne. Le calcul du best ppn sera ignoré si le nom du fichier comporte la chaine de caractère `_BYPASS` - demande l'envoi des lignes vers de nouveaux topics - gère les actions consécutives aux erreurs relevées durant le traitement d'une ligne. Certaines erreurs seront ignorées si le nom du fichier comporte la chaine de caractère `_FORCE` diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 3f9334d..5afe3f2 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -10,11 +10,8 @@ import fr.abes.bestppn.utils.Utils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Header; import org.apache.logging.log4j.ThreadContext; -import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import org.springframework.web.client.RestClientException; @@ -64,7 +61,7 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { try { log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + ligneKbart.key() + ";" + Thread.currentThread().getName()); workInProgress.get(filename).incrementThreads(); - String origineNbCurrentLine = new String(ligneKbart.headers().lastHeader("nbCurrentLines").value()); + int origineNbCurrentLine = ligneKbartDto.getNbCurrentLines(); ThreadContext.put("package", (filename + ";" + origineNbCurrentLine)); //Ajoute le nom de fichier dans le contexte du thread pour log4j service.processConsumerRecord(ligneKbartDto, providerName, workInProgress.get(filename).isForced(), workInProgress.get(filename).isBypassed(), filename); } catch (IOException | URISyntaxException | RestClientException | IllegalDoiException e) { @@ -85,19 +82,13 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { if (ligneKbartDto.getBestPpn() != null && !ligneKbartDto.getBestPpn().isEmpty()) workInProgress.get(filename).addNbBestPpnFindedInExecutionReport(); workInProgress.get(filename).addLineKbartToMailAttachment(ligneKbartDto); - Header lastHeader = ligneKbart.headers().lastHeader("nbLinesTotal"); - if (lastHeader != null) { - int nbLignesTotal = Integer.parseInt(new String(lastHeader.value())); - int nbCurrentLine = workInProgress.get(filename).incrementNbLignesTraiteesAndGet(); - log.debug("Ligne en cours : {} NbLignesTotal : {}", nbCurrentLine, nbLignesTotal); - if (nbLignesTotal == nbCurrentLine) { - log.debug("Commit du fichier {}", filename); - workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); - handleFichier(filename); - } - } else { - String errorMsg = "Header absent de la ligne : " + ligneKbart.headers(); - log.error(errorMsg); + int nbLignesTotal = ligneKbartDto.getNbLinesTotal(); + int nbCurrentLine = workInProgress.get(filename).incrementNbLignesTraiteesAndGet(); + log.debug("Ligne en cours : {} NbLignesTotal : {}", nbCurrentLine, nbLignesTotal); + if (nbLignesTotal == nbCurrentLine) { + log.debug("Commit du fichier {}", filename); + workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal); + handleFichier(filename); } //on ne décrémente pas le nb de thread si l'objet de suivi a été supprimé après la production des messages dans le second topic if (workInProgress.get(filename) != null) diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index 07fd9bf..55769c5 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -196,18 +196,4 @@ private void logEnvoi(SendResult result, ProducerRecord re log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d, headers=%s)", record.key(), record.value(), metadata.topic(), metadata.partition(), metadata.offset(), Stream.of(result.getProducerRecord().headers().toArray()).map(h -> h.key() + ":" + Arrays.toString(h.value())).collect(Collectors.joining(";")))); } - - private Header constructHeader(String key, byte[] value) { - return new Header() { - @Override - public String key() { - return key; - } - - @Override - public byte[] value() { - return value; - } - }; - } } diff --git a/src/main/java/fr/abes/bestppn/model/dto/PackageKbartDto.java b/src/main/java/fr/abes/bestppn/model/dto/PackageKbartDto.java index fd211ab..6df2986 100644 --- a/src/main/java/fr/abes/bestppn/model/dto/PackageKbartDto.java +++ b/src/main/java/fr/abes/bestppn/model/dto/PackageKbartDto.java @@ -10,6 +10,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.stream.Collectors; @Getter @Setter @@ -26,7 +27,4 @@ public void addKbartDto(LigneKbartDto ligneKbartDto) { this.kbartDtos.add(ligneKbartDto); } - public void clearKbartDto(){ - this.kbartDtos.clear(); - } } diff --git a/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java b/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java index 29c20a6..5720a94 100644 --- a/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java +++ b/src/main/java/fr/abes/bestppn/model/dto/kafka/LigneKbartDto.java @@ -12,6 +12,12 @@ @Data @NoArgsConstructor public class LigneKbartDto { + + @JsonProperty("nbCurrentLines") + private int nbCurrentLines; + @JsonProperty("nbLinesTotal") + private int nbLinesTotal; + @CsvBindByName(column = "publication_title") @CsvBindByPosition(position = 0) @JsonProperty("publication_title") From 9f0d19a5cf6e5c5d994b1acad8568c659c447bc9 Mon Sep 17 00:00:00 2001 From: TristanReamot Date: Fri, 30 Aug 2024 09:51:04 +0200 Subject: [PATCH 07/11] =?UTF-8?q?CDE=20424=20Opti=20Modification=20de=20la?= =?UTF-8?q?=20cl=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fr/abes/bestppn/kafka/TopicConsumer.java | 8 ++++++-- .../fr/abes/bestppn/kafka/TopicProducer.java | 16 ++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 5afe3f2..dd01703 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -49,7 +49,7 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema */ @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}") public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { - String filename = ligneKbart.key(); + String filename = extractFilenameFromKey(ligneKbart.key()); if (!this.workInProgress.containsKey(filename)) { //nouveau fichier trouvé dans le topic, on initialise les variables partagées workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); @@ -136,7 +136,7 @@ private void handleFichier(String filename) { @KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory") public void errorsListener(ConsumerRecord error) { log.error(error.value()); - String filename = error.key(); + String filename = extractFilenameFromKey(error.key()); if (workInProgress.containsKey(filename)) { String fileNameFromError = error.key(); emailService.sendProductionErrorEmail(fileNameFromError, error.value()); @@ -145,4 +145,8 @@ public void errorsListener(ConsumerRecord error) { handleFichier(fileNameFromError); } } + + private String extractFilenameFromKey (String key) { + return key.substring(0, key.lastIndexOf('_')); + } } diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index 55769c5..5a68edc 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -94,6 +95,7 @@ public void sendBypassToLoad(List kbart, ProviderPackage provider private void sendToTopic(List kbart, ProviderPackage provider, String filename, String destinationTopic) { Integer nbLigneTotal = kbart.size(); + AtomicInteger index = new AtomicInteger(0); for (LigneKbartDto ligneKbartDto : kbart) { ligneKbartDto.setIdProviderPackage(provider.getIdProviderPackage()); ligneKbartDto.setProviderPackagePackage(provider.getPackageName()); @@ -103,7 +105,7 @@ private void sendToTopic(List kbart, ProviderPackage provider, St List
headerList = new ArrayList<>(); executorService.execute(() -> { headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes())); - ProducerRecord record = new ProducerRecord<>(destinationTopic, new Random().nextInt(nbThread), filename, ligneKbartConnect, headerList); + ProducerRecord record = new ProducerRecord<>(destinationTopic, new Random().nextInt(nbThread), filename+"_"+index.getAndIncrement(), ligneKbartConnect, headerList); CompletableFuture> result = kafkaTemplateConnect.send(record); assert result != null : "Result est null, donc exception"; result.whenComplete((sr, ex) -> { @@ -125,8 +127,9 @@ private void sendToTopic(List kbart, ProviderPackage provider, St */ public void sendPrintNotice(List ligneKbartImprimes, String filename) { Integer nbLigneTotal = ligneKbartImprimes.size(); + int index = 0; for (LigneKbartImprime ppnToCreate : ligneKbartImprimes) { - sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename, nbLigneTotal); + sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename+"_"+(index++), nbLigneTotal); } if (!ligneKbartImprimes.isEmpty()) log.debug("message envoyé vers {}", topicNoticeImprimee); @@ -140,12 +143,13 @@ public void sendPrintNotice(List ligneKbartImprimes, String f */ public void sendPpnExNihilo(List ppnFromKbartToCreate, ProviderPackage provider, String filename) { Integer nbLigneTotal = ppnFromKbartToCreate.size(); + int index = 0; for (LigneKbartDto ligne : ppnFromKbartToCreate) { ligne.setIdProviderPackage(provider.getIdProviderPackage()); ligne.setProviderPackagePackage(provider.getPackageName()); ligne.setProviderPackageDateP(provider.getDateP()); ligne.setProviderPackageIdtProvider(provider.getProviderIdtProvider()); - sendNoticeExNihilo(ligne, topicKbartPpnToCreate, filename, nbLigneTotal); + sendNoticeExNihilo(ligne, topicKbartPpnToCreate, filename+"_"+(index++), nbLigneTotal); } if (!ppnFromKbartToCreate.isEmpty()) log.debug("message envoyé vers {}", topicKbartPpnToCreate); @@ -175,14 +179,14 @@ private void sendNoticeExNihilo(LigneKbartDto ligneKbartDto, String topic, Strin * Méthode envoyant un objet de notice imprimé sur un topic Kafka * @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider * @param topic : topic d'envoi de la ligne - * @param filename : clé kafka de la ligne correspondant au nom du fichier + * @param key : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel * @param nbLignesTotal : nombre de lignes totales du fichier */ - private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String filename, Integer nbLignesTotal) { + private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String key, Integer nbLignesTotal) { List
headerList = new ArrayList<>(); headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes())); try { - ProducerRecord record = new ProducerRecord<>(topic, null, filename, ligne, headerList); + ProducerRecord record = new ProducerRecord<>(topic, null, key, ligne, headerList); final SendResult result = kafkaTemplateImprime.send(record).get(); logEnvoi(result, record); } catch (Exception e) { From 43cd7cad7530e9d13312bcc345f988d938d58812 Mon Sep 17 00:00:00 2001 From: TristanReamot Date: Fri, 30 Aug 2024 10:18:25 +0200 Subject: [PATCH 08/11] Update TopicConsumer.java --- .../java/fr/abes/bestppn/kafka/TopicConsumer.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index dd01703..6f578ff 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -57,9 +57,9 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { try { //traitement de chaque ligne kbart LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class); - String providerName = Utils.extractProvider(ligneKbart.key()); + String providerName = Utils.extractProvider(filename); try { - log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + ligneKbart.key() + ";" + Thread.currentThread().getName()); + log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + filename + ";" + Thread.currentThread().getName()); workInProgress.get(filename).incrementThreads(); int origineNbCurrentLine = ligneKbartDto.getNbCurrentLines(); ThreadContext.put("package", (filename + ";" + origineNbCurrentLine)); //Ajoute le nom de fichier dans le contexte du thread pour log4j @@ -138,11 +138,10 @@ public void errorsListener(ConsumerRecord error) { log.error(error.value()); String filename = extractFilenameFromKey(error.key()); if (workInProgress.containsKey(filename)) { - String fileNameFromError = error.key(); - emailService.sendProductionErrorEmail(fileNameFromError, error.value()); - logFileService.createExecutionReport(fileNameFromError, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced()); + emailService.sendProductionErrorEmail(filename, error.value()); + logFileService.createExecutionReport(filename, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced()); workInProgress.get(filename).setIsOnError(true); - handleFichier(fileNameFromError); + handleFichier(filename); } } From 83c5859b4cfb4252b72ffa79857b2effe6bd9b59 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Fri, 30 Aug 2024 11:26:44 +0200 Subject: [PATCH 09/11] =?UTF-8?q?CDE-426=20:=20Refactor=20:=20ajout=20Time?= =?UTF-8?q?stamp=20dans=20workInProgress=20pour=20controler=20que=20la=20r?= =?UTF-8?q?=C3=A9ception=20de=202=20messages=20n'est=20pas=20trop=20espac?= =?UTF-8?q?=C3=A9e,=20indiquant=20un=20fichier=20recharg=C3=A9=20suite=20?= =?UTF-8?q?=C3=A0=20message=20non=20consomm=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fr/abes/bestppn/kafka/KafkaWorkInProgress.java | 10 +++++----- src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java | 9 +++++++++ .../java/fr/abes/bestppn/service/KbartService.java | 3 +++ src/main/resources/application.properties | 2 ++ 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java index 08b6031..ad2f10a 100644 --- a/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java +++ b/src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java @@ -4,15 +4,12 @@ import fr.abes.bestppn.model.dto.PackageKbartDto; import fr.abes.bestppn.model.dto.kafka.LigneKbartDto; import fr.abes.bestppn.model.entity.ExecutionReport; -import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Semaphore; +import java.sql.Timestamp; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +41,8 @@ public class KafkaWorkInProgress { private final List ppnFromKbartToCreate; + private long timestamp; + public KafkaWorkInProgress(boolean isForced, boolean isBypassed) { this.isForced = isForced; @@ -55,6 +54,7 @@ public KafkaWorkInProgress(boolean isForced, boolean isBypassed) { this.kbartToSend = Collections.synchronizedList(new ArrayList<>()); this.ppnToCreate = Collections.synchronizedList(new ArrayList<>()); this.ppnFromKbartToCreate = Collections.synchronizedList(new ArrayList<>()); + this.timestamp = Calendar.getInstance().getTimeInMillis(); } public void incrementThreads() { diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 6f578ff..049608c 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -11,18 +11,22 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.logging.log4j.ThreadContext; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import org.springframework.web.client.RestClientException; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Calendar; import java.util.Map; import java.util.concurrent.ExecutionException; @Slf4j @Service public class TopicConsumer { + @Value("${delay.max.topic}") + private int maxDelayBetweenMessage; private final ObjectMapper mapper; private final KbartService service; @@ -50,6 +54,11 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema @KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}") public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { String filename = extractFilenameFromKey(ligneKbart.key()); + long now = Calendar.getInstance().getTimeInMillis(); + //si on a pas reçu de message depuis plus de maxDelayBetweenMessage + if (this.workInProgress.get(filename).getTimestamp() + maxDelayBetweenMessage < now) { + workInProgress.remove(filename); + } if (!this.workInProgress.containsKey(filename)) { //nouveau fichier trouvé dans le topic, on initialise les variables partagées workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS"))); diff --git a/src/main/java/fr/abes/bestppn/service/KbartService.java b/src/main/java/fr/abes/bestppn/service/KbartService.java index b077254..b8a4f21 100644 --- a/src/main/java/fr/abes/bestppn/service/KbartService.java +++ b/src/main/java/fr/abes/bestppn/service/KbartService.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.util.Calendar; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -64,6 +65,8 @@ public void processConsumerRecord(LigneKbartDto ligneFromKafka, String providerN } } workInProgress.get(filename).addKbartToSend(ligneFromKafka); + //quel que soit le résultat du calcul du best ppn on met à jour le timestamp correspondant à la consommation du message + workInProgress.get(filename).setTimestamp(Calendar.getInstance().getTimeInMillis()); } public void commitDatas(String providerName, String filename) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, IOException, IllegalProviderException { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 45283e9..9111e36 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -32,6 +32,8 @@ topic.name.source.nbLines=bacon.kbart.toload.nbLines topic.name.target.kbart.bypass.toload=bacon.kbart.bypass.toload spring.jpa.open-in-view=false +# 5 minutes en ms +delay.max.topic=300000 # SpringDoc Swagger springdoc.swagger-ui.path=/convergence-documentation From 2da9787680dd721f14facb5930dc4e8f1017ac8c Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Fri, 30 Aug 2024 14:37:32 +0200 Subject: [PATCH 10/11] =?UTF-8?q?CDE-427=20:=20suppression=20fonction=20ra?= =?UTF-8?q?ndom=20pour=20s=C3=A9lection=20de=20la=20partition=20Optimisati?= =?UTF-8?q?on=20des=20m=C3=A9thodes=20d'envois=20=C3=A0=20Kafka=20:=20cons?= =?UTF-8?q?truction=20des=20objets=20et=20des=20listes=20en=20dehors=20des?= =?UTF-8?q?=20boucles=20d'envoi?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fr/abes/bestppn/kafka/TopicProducer.java | 93 ++++++++++++------- 1 file changed, 60 insertions(+), 33 deletions(-) diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index 5a68edc..2be867e 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -59,12 +58,14 @@ public class TopicProducer { private ExecutorService executorService; private UtilsMapper utilsMapper; + private AtomicInteger lastThreadUsed; @Autowired public TopicProducer(KafkaTemplate kafkaTemplateConnect, KafkaTemplate kafkaTemplateImprime, UtilsMapper utilsMapper) { this.kafkaTemplateConnect = kafkaTemplateConnect; this.kafkaTemplateImprime = kafkaTemplateImprime; this.utilsMapper = utilsMapper; + this.lastThreadUsed = new AtomicInteger(0); } @PostConstruct @@ -85,9 +86,10 @@ public void sendKbart(List kbart, ProviderPackage provider, Strin /** * Méthode d'nvoi d'une ligne kbart vers topic kafka si option bypass activée - * @param kbart : ligne kbart à envoyer - * @param provider : provider - * @param filename : nom du fichier du traitement en cours + * + * @param kbart : ligne kbart à envoyer + * @param provider : provider + * @param filename : nom du fichier du traitement en cours */ public void sendBypassToLoad(List kbart, ProviderPackage provider, String filename) { sendToTopic(kbart, provider, filename, topicKbartBypassToload); @@ -95,19 +97,22 @@ public void sendBypassToLoad(List kbart, ProviderPackage provider private void sendToTopic(List kbart, ProviderPackage provider, String filename, String destinationTopic) { Integer nbLigneTotal = kbart.size(); + List
headerList = new ArrayList<>(); + headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes())); AtomicInteger index = new AtomicInteger(0); + //construction de la liste des lignes à envoyer dans le topic + List lignesToSend = new ArrayList<>(); for (LigneKbartDto ligneKbartDto : kbart) { ligneKbartDto.setIdProviderPackage(provider.getIdProviderPackage()); ligneKbartDto.setProviderPackagePackage(provider.getPackageName()); ligneKbartDto.setProviderPackageDateP(provider.getDateP()); ligneKbartDto.setProviderPackageIdtProvider(provider.getProviderIdtProvider()); - LigneKbartConnect ligneKbartConnect = utilsMapper.map(ligneKbartDto, LigneKbartConnect.class); - List
headerList = new ArrayList<>(); + lignesToSend.add(utilsMapper.map(ligneKbartDto, LigneKbartConnect.class)); + } + lignesToSend.forEach(ligneKbartConnect -> { executorService.execute(() -> { - headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes())); - ProducerRecord record = new ProducerRecord<>(destinationTopic, new Random().nextInt(nbThread), filename+"_"+index.getAndIncrement(), ligneKbartConnect, headerList); + ProducerRecord record = new ProducerRecord<>(destinationTopic, calculatePartition(this.nbThread), filename + "_" + index.getAndIncrement(), ligneKbartConnect, headerList); CompletableFuture> result = kafkaTemplateConnect.send(record); - assert result != null : "Result est null, donc exception"; result.whenComplete((sr, ex) -> { try { logEnvoi(result.get(), record); @@ -116,7 +121,7 @@ private void sendToTopic(List kbart, ProviderPackage provider, St } }); }); - } + }); } /** @@ -129,7 +134,7 @@ public void sendPrintNotice(List ligneKbartImprimes, String f Integer nbLigneTotal = ligneKbartImprimes.size(); int index = 0; for (LigneKbartImprime ppnToCreate : ligneKbartImprimes) { - sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename+"_"+(index++), nbLigneTotal); + sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename + "_" + (index++), nbLigneTotal); } if (!ligneKbartImprimes.isEmpty()) log.debug("message envoyé vers {}", topicNoticeImprimee); @@ -138,55 +143,64 @@ public void sendPrintNotice(List ligneKbartImprimes, String f /** * Méthode d'envoi d'une ligne Kbart pour création de notice ExNihilo + * * @param ppnFromKbartToCreate : liste de lignes kbart - * @param filename : nom du fichier à traiter + * @param filename : nom du fichier à traiter */ public void sendPpnExNihilo(List ppnFromKbartToCreate, ProviderPackage provider, String filename) { Integer nbLigneTotal = ppnFromKbartToCreate.size(); - int index = 0; + List lignesToSend = new ArrayList<>(); for (LigneKbartDto ligne : ppnFromKbartToCreate) { ligne.setIdProviderPackage(provider.getIdProviderPackage()); ligne.setProviderPackagePackage(provider.getPackageName()); ligne.setProviderPackageDateP(provider.getDateP()); ligne.setProviderPackageIdtProvider(provider.getProviderIdtProvider()); - sendNoticeExNihilo(ligne, topicKbartPpnToCreate, filename+"_"+(index++), nbLigneTotal); + lignesToSend.add(ligne); } + sendNoticeExNihilo(lignesToSend, topicKbartPpnToCreate, filename, nbLigneTotal); if (!ppnFromKbartToCreate.isEmpty()) log.debug("message envoyé vers {}", topicKbartPpnToCreate); } /** * Méthode envoyant un objet de notice imprimé sur un topic Kafka - * @param ligneKbartDto : ligne contenant la ligne kbart, et le provider - * @param topic : topic d'envoi de la ligne - * @param filemame : clé kafka de la ligne correspondant au nom de fichier + * + * @param lignesKbartDto : ligne contenant la ligne kbart, et le provider + * @param topic : topic d'envoi de la ligne + * @param filename : clé kafka de la ligne correspondant au nom de fichier */ - private void sendNoticeExNihilo(LigneKbartDto ligneKbartDto, String topic, String filemame, Integer nbLignesTotal) { + private void sendNoticeExNihilo(List lignesKbartDto, String topic, String filename, Integer nbLignesTotal) { List
headerList = new ArrayList<>(); - LigneKbartConnect ligne = utilsMapper.map(ligneKbartDto, LigneKbartConnect.class); - try { - headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes())); - ProducerRecord record = new ProducerRecord<>(topic, null, filemame, ligne, headerList); - final SendResult result = kafkaTemplateConnect.send(record).get(); - logEnvoi(result, record); - } catch (Exception e) { - String message = "Error sending message to topic " + topic; - throw new RuntimeException(message, e); - } + headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes())); + List lignesToSend = new ArrayList<>(); + lignesKbartDto.forEach(ligne -> lignesToSend.add(utilsMapper.map(lignesKbartDto, LigneKbartConnect.class))); + AtomicInteger index = new AtomicInteger(); + lignesToSend.forEach(ligne -> { + try { + ProducerRecord record = new ProducerRecord<>(topic, null, filename + "_" + index.getAndIncrement(), ligne, headerList); + final SendResult result = kafkaTemplateConnect.send(record).get(); + logEnvoi(result, record); + } catch (Exception e) { + String message = "Error sending message to topic " + topic; + throw new RuntimeException(message, e); + } + }); } /** * Méthode envoyant un objet de notice imprimé sur un topic Kafka - * @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider - * @param topic : topic d'envoi de la ligne - * @param key : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel + * + * @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider + * @param topic : topic d'envoi de la ligne + * @param filename : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel * @param nbLignesTotal : nombre de lignes totales du fichier */ - private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String key, Integer nbLignesTotal) { + private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String filename, Integer nbLignesTotal) { List
headerList = new ArrayList<>(); headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes())); + AtomicInteger index = new AtomicInteger(0); try { - ProducerRecord record = new ProducerRecord<>(topic, null, key, ligne, headerList); + ProducerRecord record = new ProducerRecord<>(topic, null, filename + "_" + index.getAndIncrement(), ligne, headerList); final SendResult result = kafkaTemplateImprime.send(record).get(); logEnvoi(result, record); } catch (Exception e) { @@ -200,4 +214,17 @@ private void logEnvoi(SendResult result, ProducerRecord re log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d, headers=%s)", record.key(), record.value(), metadata.topic(), metadata.partition(), metadata.offset(), Stream.of(result.getProducerRecord().headers().toArray()).map(h -> h.key() + ":" + Arrays.toString(h.value())).collect(Collectors.joining(";")))); } + + public Integer calculatePartition(int nbPartitions) throws ArithmeticException { + + if (nbPartitions == 0) { + throw new ArithmeticException("Nombre de threads = 0"); + } + + if (lastThreadUsed.get() >= nbPartitions) { + lastThreadUsed.set(0); + } + + return lastThreadUsed.getAndIncrement(); + } } From e5a2b933b9f74bd354bc09a3ece884e1b68df7d3 Mon Sep 17 00:00:00 2001 From: TristanReamot Date: Fri, 30 Aug 2024 15:29:16 +0200 Subject: [PATCH 11/11] =?UTF-8?q?CDE=20427=20Opti=20Pr=C3=A9cision=20sur?= =?UTF-8?q?=20condition=20et=20ligne=20debug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java index 049608c..e436877 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java @@ -56,8 +56,9 @@ public void kbartFromkafkaListener(ConsumerRecord ligneKbart) { String filename = extractFilenameFromKey(ligneKbart.key()); long now = Calendar.getInstance().getTimeInMillis(); //si on a pas reçu de message depuis plus de maxDelayBetweenMessage - if (this.workInProgress.get(filename).getTimestamp() + maxDelayBetweenMessage < now) { + if (this.workInProgress.containsKey(filename) && (this.workInProgress.get(filename).getTimestamp() + maxDelayBetweenMessage < now)) { workInProgress.remove(filename); + log.debug("détection de l'ancien lancement de fichier " + filename ); } if (!this.workInProgress.containsKey(filename)) { //nouveau fichier trouvé dans le topic, on initialise les variables partagées