Skip to content

Commit

Permalink
CDE-297 : Refactor :
Browse files Browse the repository at this point in the history
Correction exception et modification algorithme
  • Loading branch information
pierre-maraval committed Nov 23, 2023
1 parent e6ee9e8 commit d8fb769
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 66 deletions.
3 changes: 1 addition & 2 deletions src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ public ProducerFactory<String, LigneKbartImprime> producerFactoryLigneKbartImpri

@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
return factory;
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
Expand Down
99 changes: 61 additions & 38 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import fr.abes.bestppn.service.EmailService;
import fr.abes.bestppn.service.ExecutionReportService;
import fr.abes.bestppn.service.KbartService;
import fr.abes.bestppn.service.LogFileService;
import fr.abes.bestppn.utils.Utils;
import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.Email;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
Expand All @@ -18,9 +20,7 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -48,19 +48,23 @@ public class TopicConsumer {

private boolean isOnError = false;

private final List<Header> headerList = new ArrayList<>();
private final Set<Header> headerList = new HashSet<>();
private ExecutorService executorService;

private final ProviderRepository providerRepository;


private final ExecutionReportService executionReportService;

public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService) {
private final LogFileService logFileService;

private int totalLine = 0;

public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService) {
this.service = service;
this.emailService = emailService;
this.providerRepository = providerRepository;
this.executionReportService = executionReportService;
this.logFileService = logFileService;
}

@PostConstruct
Expand All @@ -71,49 +75,68 @@ public void initExecutor() {
* Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent.
* @param lignesKbart message kafka récupéré par le Consumer Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart}, ${topic.name.source.kbart.errors}, ${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
@KafkaListener(topics = {"${topic.name.source.kbart}", "${topic.name.source.kbart.errors}", "${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) throws Exception {
log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset());
//initialisation des paramètres
extractDataFromHeader(lignesKbart.headers().toArray());
ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
String providerName = Utils.extractProvider(filename);
Optional<Provider> providerOpt = providerRepository.findByProvider(providerName);
if (lignesKbart.topic().equals(topicKbart)) {
executorService.execute(() -> {
try {
String providerName = Utils.extractProvider(filename);
Optional<Provider> providerOpt = providerRepository.findByProvider(providerName);
//traitement de chaque ligne kbart
if (lignesKbart.topic().equals(topicKbart)) {
executorService.execute(() -> {
try {
service.processConsumerRecord(lignesKbart, providerName, isForced);
} catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
} catch (IllegalPpnException | BestPpnException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
}
});
}
//réception du nombre total de lignes, indique la fin du package en cours
if (lignesKbart.topic().equals(topicKbartNbLines)) {
Thread.sleep(1000);
totalLine = Integer.parseInt(lignesKbart.value());
//on laisse les threads terminer leur traitement
executorService.awaitTermination(1, TimeUnit.HOURS);
try {
service.processConsumerRecord(lignesKbart, providerName, isForced);
} catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
} catch (IllegalPpnException |BestPpnException e) {
if (!isOnError) {
service.commitDatas(providerOpt, providerName, filename, headerList);
//quel que soit le résultat du traitement, on envoie le rapport par mail
log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine);
emailService.sendMailWithAttachment(filename);
logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
totalLine = 0;
} else {
isOnError = false;
}
} catch (IllegalPackageException | IllegalDateException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineToMailAttachementWithErrorMessage(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
}
});
}
if (lignesKbart.topic().equals(topicKbartNbLines)) {
executorService.awaitTermination(1, TimeUnit.HOURS);
try {
if (!isOnError)
service.commitDatas(providerOpt, providerName, filename, headerList, isForced);
} catch (IllegalPackageException | IllegalDateException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
} catch (IllegalProviderException e) {
}
if (lignesKbart.topic().equals(topicKbartErrors)) {
executorService.shutdownNow();
isOnError = true;
log.error("Erreur dans les données en entrée, provider incorrect");
emailService.addLineToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
}
}
if (lignesKbart.topic().equals(topicKbartErrors)) {
executorService.shutdownNow();
} catch (IllegalProviderException e) {
isOnError = true;
log.error("Erreur dans les données en entrée, provider incorrect");
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
}

}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/fr/abes/bestppn/kafka/TopicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -180,7 +181,7 @@ public byte[] value() {
* Envoie un message de fin de traitement sur le topic kafka endOfTraitment_kbart2kafka
* @param headerList list de Header (contient le nom du package et la date)
*/
public void sendEndOfTraitmentReport(List<Header> headerList) throws ExecutionException, InterruptedException {
public void sendEndOfTraitmentReport(Set<Header> headerList) throws ExecutionException, InterruptedException {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicEndOfTraitment, null, "", "OK", headerList);
kafkatemplateEndoftraitement.send(record);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/fr/abes/bestppn/service/EmailService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public class EmailService {

private final PackageKbartDto mailAttachment = new PackageKbartDto();

public void addLineToMailAttachementWithErrorMessage(String messageError) {
public void addLineKbartToMailAttachementWithErrorMessage(String messageError) {
LigneKbartDto ligneVide = new LigneKbartDto();
ligneVide.setErrorType(messageError);
mailAttachment.addKbartDto(ligneVide);
}

public void addPackageToMailAttachment(LigneKbartDto dto) {
public void addLineKbartToMailAttachment(LigneKbartDto dto) {
mailAttachment.addKbartDto(dto);
}

Expand Down
24 changes: 5 additions & 19 deletions src/main/java/fr/abes/bestppn/service/KbartService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ExecutionException;

@Service
Expand All @@ -35,8 +32,6 @@ public class KbartService {

private final EmailService serviceMail;

private final LogFileService logFileService;

@Getter
private final List<LigneKbartDto> kbartToSend = Collections.synchronizedList(new ArrayList<>());

Expand All @@ -51,14 +46,11 @@ public class KbartService {
private final ExecutionReportService executionReportService;


private String totalLine = "";

public KbartService(ObjectMapper mapper, BestPpnService service, TopicProducer producer, EmailService serviceMail, LogFileService logFileService, ProviderService providerService, ExecutionReportService executionReportService) {
public KbartService(ObjectMapper mapper, BestPpnService service, TopicProducer producer, EmailService serviceMail, ProviderService providerService, ExecutionReportService executionReportService) {
this.mapper = mapper;
this.service = service;
this.producer = producer;
this.serviceMail = serviceMail;
this.logFileService = logFileService;
this.providerService = providerService;
this.executionReportService = executionReportService;
}
Expand Down Expand Up @@ -90,10 +82,10 @@ public void processConsumerRecord(ConsumerRecord<String, String> lignesKbart, St
log.info("Bestppn déjà existant sur la ligne : " + lignesKbart + ",PPN : " + ligneFromKafka.getBestPpn());
kbartToSend.add(ligneFromKafka);
}
serviceMail.addPackageToMailAttachment(ligneFromKafka);
serviceMail.addLineKbartToMailAttachment(ligneFromKafka);
}

public void commitDatas(Optional<Provider> providerOpt, String providerName, String filename, List<Header> headerList, boolean isForced) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, MessagingException, IOException {
public void commitDatas(Optional<Provider> providerOpt, String providerName, String filename, Set<Header> headerList) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, MessagingException, IOException {
ProviderPackage provider = providerService.handlerProvider(providerOpt, filename, providerName);

producer.sendKbart(kbartToSend, provider, filename);
Expand All @@ -104,13 +96,7 @@ public void commitDatas(Optional<Provider> providerOpt, String providerName, Str
ppnToCreate.clear();
ppnFromKbartToCreate.clear();

log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine);
serviceMail.sendMailWithAttachment(filename);
producer.sendEndOfTraitmentReport(headerList); // Appel le producer pour l'envoi du message de fin de traitement.
logFileService.createExecutionReport(filename, Integer.parseInt(totalLine), executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced);

serviceMail.clearMailAttachment();
executionReportService.clearExecutionReport();
producer.sendEndOfTraitmentReport(headerList);
}

private static LigneKbartImprime getLigneKbartImprime(PpnWithDestinationDto ppnWithDestinationDto, LigneKbartDto ligneFromKafka) {
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/fr/abes/bestppn/service/WsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,10 @@ public String getCall(String url, Map<String, String> params) throws RestClientE
return restTemplate.getForObject(formedUrl.toString(), String.class);
}

@ExecutionTime
public ResultWsSudocDto callOnlineId2Ppn(String type, String id, @Nullable String provider) throws RestClientException, IllegalArgumentException {
return getResultWsSudocDto(type, id, provider, urlOnlineId2Ppn);
}

@ExecutionTime
public ResultWsSudocDto callPrintId2Ppn(String type, String id, @Nullable String provider) throws RestClientException, IllegalArgumentException {
return getResultWsSudocDto(type, id, provider, urlPrintId2Ppn);
}
Expand All @@ -113,7 +111,6 @@ private ResultWsSudocDto getResultWsSudocDto(String type, String id, @Nullable S
return result;
}

@ExecutionTime
public ResultDat2PpnWebDto callDat2Ppn(String date, String author, String title) throws JsonProcessingException {
SearchDatWebDto searchDatWebDto = new SearchDatWebDto(title);
if (author != null && !author.isEmpty()) {
Expand All @@ -125,7 +122,6 @@ public ResultDat2PpnWebDto callDat2Ppn(String date, String author, String title)
return mapper.readValue(postCall(urlDat2Ppn, mapper.writeValueAsString(searchDatWebDto)), ResultDat2PpnWebDto.class);
}

@ExecutionTime
public ResultWsSudocDto callDoi2Ppn(String doi, @Nullable String provider) throws JsonProcessingException {
Map<String, String> params = new HashMap<>();
params.put("doi", doi);
Expand Down

0 comments on commit d8fb769

Please sign in to comment.