Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cde 297 modifier traitement dans bestppnapi2 #36

Merged
merged 18 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Semaphore;

@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.concurrency.nbThread}")
private int nbThread;

@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapAddress;

Expand Down Expand Up @@ -53,6 +57,7 @@ public ConsumerFactory<String, String> consumerKbartFactory() {
return new DefaultKafkaConsumerFactory<>(props);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaKbartListenerContainerFactory() {
Expand Down Expand Up @@ -99,8 +104,7 @@ public ProducerFactory<String, LigneKbartImprime> producerFactoryLigneKbartImpri

@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
return factory;
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
Expand All @@ -122,4 +126,10 @@ public KafkaTransactionManager<String, LigneKbartImprime> kafkaTransactionManage
public KafkaTemplate<String, String> kafkatemplateEndoftraitement(final ProducerFactory producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
public Semaphore semaphore() {
return new Semaphore(1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import fr.abes.bestppn.dto.kafka.LigneKbartDto;
import fr.abes.bestppn.exception.BestPpnException;
import fr.abes.bestppn.exception.IllegalPpnException;
import fr.abes.bestppn.exception.IllegalDoiException;
import fr.abes.bestppn.service.BestPpnService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
Expand Down Expand Up @@ -41,7 +41,7 @@ public String bestPpn(@RequestParam(name = "provider") String provider, @Request
@RequestParam(name = "publication_type") String publicationType, @RequestParam(name = "online_identifier", required = false) String onlineIdentifier,
@RequestParam(name = "print_identifier", required = false) String printIdentifier, @RequestParam(name = "titleUrl", required = false) String titleUrl,
@RequestParam(name = "date_monograph_published_online", required = false) String dateMonographPublishedOnline, @RequestParam(name = "date_monograph_published_print", required = false) String dateMonographPublishedPrint,
@RequestParam(name = "first_author", required = false) String firstAuthor, @RequestParam(name = "force", required = false) Boolean force) throws IOException, IllegalPpnException {
@RequestParam(name = "first_author", required = false) String firstAuthor, @RequestParam(name = "force", required = false) Boolean force) throws IOException {
try {
LigneKbartDto ligneKbartDto = new LigneKbartDto();
ligneKbartDto.setPublicationType(publicationType);
Expand All @@ -56,7 +56,7 @@ public String bestPpn(@RequestParam(name = "provider") String provider, @Request
return service.getBestPpn(ligneKbartDto, provider, injectKafka).getPpn();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Une url dans le champ title_url du kbart n'est pas correcte");
} catch (BestPpnException | RestClientException | IllegalArgumentException e) {
} catch (BestPpnException | RestClientException | IllegalArgumentException | IllegalDoiException e) {
return e.getMessage();
}
}
Expand Down
61 changes: 45 additions & 16 deletions src/main/java/fr/abes/bestppn/entity/ExecutionReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,65 @@

import lombok.Data;

@Data
import java.util.concurrent.atomic.AtomicInteger;


public class ExecutionReport {
private int nbtotalLines = 0;
private int nbtotalLines;

private int nbBestPpnFind = 0;
private AtomicInteger nbBestPpnFind;

private int nbLinesWithInputDataErrors = 0;
private AtomicInteger nbLinesWithInputDataErrors;

private int nbLinesWithErrorsInBestPPNSearch = 0;
private AtomicInteger nbLinesWithErrorsInBestPPNSearch;

public int getNbLinesOk(){
return nbtotalLines - nbLinesWithErrorsInBestPPNSearch - nbLinesWithInputDataErrors;
public ExecutionReport() {
nbBestPpnFind = new AtomicInteger(0);
nbLinesWithInputDataErrors = new AtomicInteger(0);
nbLinesWithErrorsInBestPPNSearch = new AtomicInteger(0);
nbtotalLines = 0;
}

public void addNbBestPpnFind(){
nbBestPpnFind++;
public int getNbtotalLines() {
return this.nbtotalLines;
}

public void addNbLinesWithInputDataErrors(){
nbLinesWithInputDataErrors++;
public void setNbtotalLines(int nbtotalLines) {
this.nbtotalLines = nbtotalLines;
}

public void addNbLinesWithErrorsInBestPPNSearch(){
nbLinesWithErrorsInBestPPNSearch++;
public int getNbBestPpnFind() {
return nbBestPpnFind.get();
}

public int getNbLinesWithInputDataErrors() {
return nbLinesWithInputDataErrors.get();
}

public int getNbLinesWithErrorsInBestPPNSearch() {
return nbLinesWithErrorsInBestPPNSearch.get();
}

public int getNbLinesOk(){
return nbtotalLines - nbLinesWithErrorsInBestPPNSearch.get() - nbLinesWithInputDataErrors.get();
}

public void incrementNbBestPpnFind() {
nbBestPpnFind.incrementAndGet();
}

public void incrementNbLinesWithInputDataErrors() {
nbLinesWithInputDataErrors.incrementAndGet();
}

public void clear(){
nbtotalLines = 0;
nbBestPpnFind = 0;
nbLinesWithInputDataErrors = 0;
nbLinesWithErrorsInBestPPNSearch = 0;
nbBestPpnFind.set(0);
nbLinesWithInputDataErrors.set(0);
nbLinesWithErrorsInBestPPNSearch.set(0);
}

public void incrementNbLinesWithErrorsInBestPPNSearch() {
nbLinesWithErrorsInBestPPNSearch.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ protected ResponseEntity<Object> handleIllegalArgumentException(IllegalArgumentE
return buildResponseEntity(new ApiReturnError(HttpStatus.BAD_REQUEST, error, ex));
}

@ExceptionHandler(IllegalPpnException.class)
protected ResponseEntity<Object> handleIllegalPpnException(IllegalPpnException ex) {
String error = "Erreur dans la récupération d'une notice";
log.debug(ex.getLocalizedMessage());
return buildResponseEntity(new ApiReturnError(HttpStatus.SERVICE_UNAVAILABLE, error, ex));
}

@ExceptionHandler(BestPpnException.class)
protected ResponseEntity<Object> handleBestPpnException(BestPpnException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package fr.abes.bestppn.exception;

public class IllegalDoiException extends Exception {
public IllegalDoiException(String message) {
super(message);
}
}

This file was deleted.

192 changes: 188 additions & 4 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,211 @@
package fr.abes.bestppn.kafka;

import fr.abes.bestppn.entity.bacon.Provider;
import fr.abes.bestppn.exception.*;
import fr.abes.bestppn.repository.bacon.ProviderRepository;
import fr.abes.bestppn.service.EmailService;
import fr.abes.bestppn.service.ExecutionReportService;
import fr.abes.bestppn.service.KbartService;
import fr.abes.bestppn.service.LogFileService;
import fr.abes.bestppn.utils.Utils;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
public class TopicConsumer {
private final KbartService service;

public TopicConsumer(KbartService service) {
@Value("${spring.kafka.concurrency.nbThread}")
private int nbThread;
private final EmailService emailService;

private String filename = "";

private boolean isForced = false;

private boolean isOnError = false;

private ExecutorService executorService;

private final ProviderRepository providerRepository;

private final ExecutionReportService executionReportService;

private final LogFileService logFileService;

private AtomicInteger nbLignesTraitees;

private final Semaphore semaphore;

private final AtomicInteger nbActiveThreads;


public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService, Semaphore semaphore) {
this.service = service;
this.emailService = emailService;
this.providerRepository = providerRepository;
this.executionReportService = executionReportService;
this.logFileService = logFileService;
this.semaphore = semaphore;
this.nbLignesTraitees = new AtomicInteger(0);
this.nbActiveThreads = new AtomicInteger(0);
}


@PostConstruct
public void initExecutor() {
executorService = Executors.newFixedThreadPool(nbThread);
}

/**
* Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent.
*
* @param lignesKbart message kafka récupéré par le Consumer Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) throws Exception {
log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset());
service.processConsumerRecord(lignesKbart);
public void kbartFromkafkaListener(ConsumerRecord<String, String> lignesKbart) {
log.warn("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset() + " / value : " + lignesKbart.value());
try {
//traitement de chaque ligne kbart
this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray());
ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
String providerName = Utils.extractProvider(filename);
executorService.execute(() -> {
try {
this.nbActiveThreads.incrementAndGet();
this.nbLignesTraitees.incrementAndGet();
service.processConsumerRecord(lignesKbart, providerName, isForced);
Header lastHeader = lignesKbart.headers().lastHeader("nbLinesTotal");
if (lastHeader != null) {
int nbLignesTotal = Integer.parseInt(new String(lastHeader.value()));
if (nbLignesTotal == nbLignesTraitees.get() && semaphore.tryAcquire()) {
executionReportService.setNbtotalLines(nbLignesTotal);
handleFichier();
}
}
} catch (IOException | URISyntaxException | IllegalDoiException e) {
//erreurs non bloquantes, on les inscrits dans le rapport, mais on n'arrête pas le programme
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
} catch (BestPpnException e) {
if (isForced) {
//si le programme doit forcer l'insertion, il n'est pas arrêté en cas d'erreur sur le calcul du bestPpn
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
} else {
addBestPPNSearchError(e.getMessage());
}
} finally {
this.nbActiveThreads.addAndGet(-1);
}
});
} catch (IllegalProviderException e) {
addDataError(e.getMessage());
}
}

private void handleFichier() {
try {
if (!isOnError) {
String providerName = Utils.extractProvider(filename);
Optional<Provider> providerOpt = providerRepository.findByProvider(providerName);
service.commitDatas(providerOpt, providerName, filename);
//quel que soit le résultat du traitement, on envoie le rapport par mail
log.warn("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + executionReportService.getExecutionReport().getNbtotalLines());
logFileService.createExecutionReport(filename, executionReportService.getExecutionReport(), isForced);
} else {
isOnError = false;
}
emailService.sendMailWithAttachment(filename);
} catch (IllegalPackageException | IllegalDateException | IllegalProviderException | ExecutionException |
InterruptedException | IOException e) {
addDataError(e.getMessage());
} finally {
log.warn("Traitement terminé pour fichier " + this.filename + " / nb lignes " + nbLignesTraitees);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
service.clearListesKbart();
nbLignesTraitees = new AtomicInteger(0);
clearSharedObjects();
semaphore.release();
}
}

@KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory")
public void errorsListener(ConsumerRecord<String, String> error) {
log.error(error.value());
try {
do {
try {
//ajout d'un sleep sur la durée du poll kafka pour être sur que le consumer de kbart ait lu au moins une fois
Thread.sleep(80);
} catch (InterruptedException e) {
log.warn("Erreur de sleep sur attente fin de traitement");
}
} while (this.nbActiveThreads.get() != 0);
log.warn("boucle infinie terminée");
if (this.filename.equals(extractFilenameFromHeader(error.headers().toArray()))) {
if (semaphore.tryAcquire()) {
emailService.addLineKbartToMailAttachementWithErrorMessage(error.value());
logFileService.createExecutionReport(filename, executionReportService.getExecutionReport(), isForced);
isOnError = true;
handleFichier();
}
}
} catch (IOException e) {
addDataError(e.getMessage());
}
}

private void clearSharedObjects() {
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
service.clearListesKbart();
}

private String extractFilenameFromHeader(Header[] headers) {
String nomFichier = "";
for (Header header : headers) {
if (header.key().equals("FileName")) {
nomFichier = new String(header.value());
if (nomFichier.contains("_FORCE")) {
isForced = true;
}
}
}
return nomFichier;
}


private void addBestPPNSearchError(String message) {
isOnError = true;
log.error(message);
emailService.addLineKbartToMailAttachementWithErrorMessage(message);
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
}

private void addDataError(String message) {
isOnError = true;
log.error(message);
emailService.addLineKbartToMailAttachementWithErrorMessage(message);
executionReportService.addNbLinesWithInputDataErrors();
}
}
Loading
Loading