Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into CDE-293-appel-best…
Browse files Browse the repository at this point in the history
…-ppn-prod-ko-quand-caracteres-accentues-dans-luri
  • Loading branch information
SamuelQuetin committed Nov 21, 2023
2 parents 9abe126 + e7de626 commit ebf50e7
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 230 deletions.
5 changes: 5 additions & 0 deletions src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class KafkaConfig {
@Value("${spring.kafka.auto.register.schema}")
private boolean autoRegisterSchema;

@Value("${spring.kafka.producer.transaction-timeout}")
private Integer transactionTimeout;

@Bean
public ConsumerFactory<String, String> consumerKbartFactory() {
Map<String, Object> props = new HashMap<>();
Expand All @@ -46,6 +49,7 @@ public ConsumerFactory<String, String> consumerKbartFactory() {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}

Expand All @@ -67,6 +71,7 @@ public Map<String, Object> producerConfigsWithTransaction() {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, autoRegisterSchema);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout);
return props;
}

Expand Down
229 changes: 8 additions & 221 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
@@ -1,240 +1,27 @@
package fr.abes.bestppn.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.LigneKbartImprime;
import fr.abes.bestppn.dto.PackageKbartDto;
import fr.abes.bestppn.dto.kafka.LigneKbartDto;
import fr.abes.bestppn.dto.kafka.PpnWithDestinationDto;
import fr.abes.bestppn.entity.ExecutionReport;
import fr.abes.bestppn.entity.bacon.Provider;
import fr.abes.bestppn.entity.bacon.ProviderPackage;
import fr.abes.bestppn.exception.*;
import fr.abes.bestppn.repository.bacon.LigneKbartRepository;
import fr.abes.bestppn.repository.bacon.ProviderPackageRepository;
import fr.abes.bestppn.repository.bacon.ProviderRepository;
import fr.abes.bestppn.service.BestPpnService;
import fr.abes.bestppn.service.EmailService;
import fr.abes.bestppn.service.LogFileService;
import fr.abes.bestppn.utils.Utils;
import jakarta.mail.MessagingException;
import lombok.RequiredArgsConstructor;
import fr.abes.bestppn.service.KbartService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

@Slf4j
@Service
@RequiredArgsConstructor
public class TopicConsumer {
@Autowired
private ObjectMapper mapper;

@Autowired
private BestPpnService service;

@Autowired
private TopicProducer producer;

@Autowired
private EmailService serviceMail;

@Autowired
private LogFileService logFileService;

private final List<LigneKbartDto> kbartToSend = new ArrayList<>();

private final List<LigneKbartImprime> ppnToCreate = new ArrayList<>();

private final List<LigneKbartDto> ppnFromKbartToCreate = new ArrayList<>();

private final PackageKbartDto mailAttachment = new PackageKbartDto();

private final ProviderPackageRepository providerPackageRepository;

private final ProviderRepository providerRepository;

private final LigneKbartRepository ligneKbartRepository;

private final List<Header> headerList = new ArrayList<>();
private final KbartService service;

private boolean isOnError = false;

boolean injectKafka = false;

private ExecutionReport executionReport = new ExecutionReport();

private String filename = "";

private String totalLine = "";
public TopicConsumer(KbartService service) {
this.service = service;
}

/**
* Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent.
* @param lignesKbart message kafka récupéré par le Consumer Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) throws Exception {
try {
String currentLine = "";
for (Header header : lignesKbart.headers().toArray()) {
if (header.key().equals("FileName")) {
filename = new String(header.value());
headerList.add(header);
if (filename.contains("_FORCE")) {
injectKafka = true;
}
} else if (header.key().equals("CurrentLine")) {
currentLine = new String(header.value());
headerList.add(header);
} else if (header.key().equals("TotalLine")) {
totalLine = new String(header.value());
executionReport.setNbtotalLines(Integer.parseInt(totalLine));
headerList.add(header);
}
}
ThreadContext.put("package", (filename + "[line : " + currentLine + "]")); // Ajoute le numéro de ligne courante au contexte log4j2 pour inscription dans le header kafka

String nbLine = currentLine + "/" + totalLine;
String providerName = Utils.extractProvider(filename);
Optional<Provider> providerOpt = providerRepository.findByProvider(providerName);
if (lignesKbart.value().equals("OK")) {
if (!isOnError) {
ProviderPackage provider = handlerProvider(providerOpt, filename, providerName);

producer.sendKbart(kbartToSend, provider, filename);
producer.sendPrintNotice(ppnToCreate, filename);
producer.sendPpnExNihilo(ppnFromKbartToCreate, provider, filename);
} else {
isOnError = false;
}
log.info("Nombre de best ppn trouvé : " + executionReport.getNbBestPpnFind() + "/" + totalLine);
serviceMail.sendMailWithAttachment(filename, mailAttachment);
producer.sendEndOfTraitmentReport(headerList); // Appel le producer pour l'envoi du message de fin de traitement.
logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReport.getNbLinesOk(), executionReport.getNbLinesWithInputDataErrors(), executionReport.getNbLinesWithErrorsInBestPPNSearch(), injectKafka);
kbartToSend.clear();
ppnToCreate.clear();
ppnFromKbartToCreate.clear();
mailAttachment.clearKbartDto();
executionReport.clear();
} else {
LigneKbartDto ligneFromKafka = mapper.readValue(lignesKbart.value(), LigneKbartDto.class);
if (ligneFromKafka.isBestPpnEmpty()) {
log.info("Debut du calcul du bestppn sur la ligne : " + nbLine);
log.info(ligneFromKafka.toString());
PpnWithDestinationDto ppnWithDestinationDto = service.getBestPpn(ligneFromKafka, providerName, injectKafka);
switch (ppnWithDestinationDto.getDestination()) {
case BEST_PPN_BACON -> {
ligneFromKafka.setBestPpn(ppnWithDestinationDto.getPpn());
executionReport.addNbBestPpnFind();
}
case PRINT_PPN_SUDOC -> {
LigneKbartImprime ligne = LigneKbartImprime.newBuilder()
.setPpn(ppnWithDestinationDto.getPpn())
.setPublicationTitle(ligneFromKafka.getPublicationTitle())
.setPrintIdentifier(ligneFromKafka.getPrintIdentifier())
.setOnlineIdentifier(ligneFromKafka.getOnlineIdentifier())
.setDateFirstIssueOnline(ligneFromKafka.getDateFirstIssueOnline())
.setNumFirstVolOnline(ligneFromKafka.getNumFirstVolOnline())
.setNumFirstIssueOnline(ligneFromKafka.getNumFirstIssueOnline())
.setDateLastIssueOnline(ligneFromKafka.getDateLastIssueOnline())
.setNumLastVolOnline(ligneFromKafka.getNumLastVolOnline())
.setNumLastIssueOnline(ligneFromKafka.getNumLastIssueOnline())
.setTitleUrl(ligneFromKafka.getTitleUrl())
.setFirstAuthor(ligneFromKafka.getFirstAuthor())
.setTitleId(ligneFromKafka.getTitleId())
.setEmbargoInfo(ligneFromKafka.getEmbargoInfo())
.setCoverageDepth(ligneFromKafka.getCoverageDepth())
.setNotes(ligneFromKafka.getNotes())
.setPublisherName(ligneFromKafka.getPublisherName())
.setPublicationType(ligneFromKafka.getPublicationType())
.setDateMonographPublishedPrint(ligneFromKafka.getDateMonographPublishedPrint())
.setDateMonographPublishedOnline(ligneFromKafka.getDateMonographPublishedOnline())
.setMonographVolume(ligneFromKafka.getMonographVolume())
.setMonographEdition(ligneFromKafka.getMonographEdition())
.setFirstEditor(ligneFromKafka.getFirstEditor())
.setParentPublicationTitleId(ligneFromKafka.getParentPublicationTitleId())
.setPrecedingPublicationTitleId(ligneFromKafka.getPrecedingPublicationTitleId())
.setAccessType(ligneFromKafka.getAccessType())
.build();
ppnToCreate.add(ligne);
}
case NO_PPN_FOUND_SUDOC -> {
if (ligneFromKafka.getPublicationType().equals("monograph")) {
ppnFromKbartToCreate.add(ligneFromKafka);
}
}
}
//on envoie vers bacon même si on n'a pas trouvé de bestppn
kbartToSend.add(ligneFromKafka);
} else {
log.info("Bestppn déjà existant sur la ligne : " + nbLine + ", le voici : " + ligneFromKafka.getBestPpn());
kbartToSend.add(ligneFromKafka);
}
mailAttachment.addKbartDto(ligneFromKafka);
}
} catch (IllegalProviderException e) {
isOnError = true;
log.error("Erreur dans les données en entrée, provider incorrect");
addLineToMailAttachementWithErrorMessage(e.getMessage());
executionReport.addNbLinesWithInputDataErrors();
} catch (URISyntaxException | RestClientException | IllegalArgumentException | IOException |
IllegalPackageException | IllegalDateException e) {
isOnError = true;
log.error(e.getMessage());
addLineToMailAttachementWithErrorMessage(e.getMessage());
executionReport.addNbLinesWithInputDataErrors();
} catch (IllegalPpnException | BestPpnException e) {
isOnError = true;
log.error(e.getMessage());
addLineToMailAttachementWithErrorMessage(e.getMessage());
executionReport.addNbLinesWithErrorsInBestPPNSearch();
} catch (MessagingException | ExecutionException | InterruptedException | RuntimeException e) {
log.error(e.getMessage());
producer.sendEndOfTraitmentReport(headerList);
logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReport.getNbLinesOk(), executionReport.getNbLinesWithInputDataErrors(), executionReport.getNbLinesWithErrorsInBestPPNSearch(), injectKafka);
}
}

private ProviderPackage handlerProvider(Optional<Provider> providerOpt, String filename, String providerName) throws IllegalPackageException, IllegalDateException {
String packageName = Utils.extractPackageName(filename);
Date packageDate = Utils.extractDate(filename);
if (providerOpt.isPresent()) {
Provider provider = providerOpt.get();

Optional<ProviderPackage> providerPackageOpt = providerPackageRepository.findByPackageNameAndDatePAndProviderIdtProvider(packageName,packageDate,provider.getIdtProvider());
if( providerPackageOpt.isPresent()){
log.info("clear row package : " + providerPackageOpt.get());
ligneKbartRepository.deleteAllByIdProviderPackage(providerPackageOpt.get().getIdProviderPackage());
return providerPackageOpt.get();
} else {
//pas d'info de package, on le crée
return providerPackageRepository.save(new ProviderPackage(packageName, packageDate, provider.getIdtProvider(), 'N'));
}
} else {
//pas de provider, ni de package, on les crée tous les deux
Provider newProvider = new Provider(providerName);
Provider savedProvider = providerRepository.save(newProvider);
log.info("Le provider " + savedProvider.getProvider() + " a été créé.");
ProviderPackage providerPackage = new ProviderPackage(packageName, packageDate, savedProvider.getIdtProvider(), 'N');
return providerPackageRepository.save(providerPackage);
}
}

private void addLineToMailAttachementWithErrorMessage(String messageError) {
LigneKbartDto ligneVide = new LigneKbartDto();
ligneVide.setErrorType(messageError);
mailAttachment.addKbartDto(ligneVide);
log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset());
service.processConsumerRecord(lignesKbart);
}
}
6 changes: 1 addition & 5 deletions src/main/java/fr/abes/bestppn/kafka/TopicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,14 @@ public TopicProducer(KafkaTemplate<String, LigneKbartConnect> kafkaTemplateConne
* @param filename : nom du fichier du traitement en cours
*/
@Transactional(transactionManager = "kafkaTransactionManagerKbartConnect", rollbackFor = {BestPpnException.class, JsonProcessingException.class})
public void sendKbart(List<LigneKbartDto> kbart, ProviderPackage provider, String filename) throws JsonProcessingException, BestPpnException, ExecutionException, InterruptedException {
int numLigneCourante = 0;
public void sendKbart(List<LigneKbartDto> kbart, ProviderPackage provider, String filename) {
for (LigneKbartDto ligne : kbart) {
numLigneCourante++;
ligne.setIdProviderPackage(provider.getIdProviderPackage());
ligne.setProviderPackagePackage(provider.getPackageName());
ligne.setProviderPackageDateP(provider.getDateP());
ligne.setProviderPackageIdtProvider(provider.getProviderIdtProvider());
List<Header> headerList = new ArrayList<>();
headerList.add(constructHeader("filename", filename.getBytes()));
if (numLigneCourante == kbart.size())
headerList.add(constructHeader("OK", "true".getBytes()));
sendObject(ligne, topicKbart, headerList);
}
if (!kbart.isEmpty())
Expand Down
Loading

0 comments on commit ebf50e7

Please sign in to comment.