Skip to content

Commit

Permalink
CDE-297 : Ajout semaphore
Browse files Browse the repository at this point in the history
Refactor divers
  • Loading branch information
pierre-maraval committed Nov 23, 2023
1 parent efaa393 commit 1ecea31
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public KafkaTemplate<String, String> kafkatemplateEndoftraitement(final Producer
}

@Bean
public Semaphore semaphore() {
public Semaphore semaphoreNbLines() {
return new Semaphore(1);
}
}
10 changes: 9 additions & 1 deletion src/main/java/fr/abes/bestppn/entity/ExecutionReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ public ExecutionReport() {
}

public int getNbtotalLines() {
return nbtotalLines;
return this.nbtotalLines;
}

public void setNbtotalLines(int nbtotalLines) {
this.nbtotalLines = nbtotalLines;
}

public int getNbBestPpnFind() {
Expand All @@ -37,6 +41,10 @@ public int getNbLinesWithErrorsInBestPPNSearch() {
return nbLinesWithErrorsInBestPPNSearch.get();
}

public int getNbLinesOk(){
return nbtotalLines - nbLinesWithErrorsInBestPPNSearch.get() - nbLinesWithInputDataErrors.get();
}

public void incrementNbBestPpnFind() {
nbBestPpnFind.incrementAndGet();
}
Expand Down
134 changes: 63 additions & 71 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@
public class TopicConsumer {
private final KbartService service;

@Value("${topic.name.source.kbart}")
private String topicKbart;
@Value("${topic.name.source.nbLines}")
private String topicKbartNbLines;
@Value("${topic.name.source.kbart.errors}")
private String topicKbartErrors;

@Value("${spring.kafka.concurrency.nbThread}")
private int nbThread;
private final EmailService emailService;
Expand All @@ -52,17 +45,16 @@ public class TopicConsumer {

private final LogFileService logFileService;

private int totalLine = 0;
private final Semaphore semaphoreNbLines;

private final Semaphore semaphore;

public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService, Semaphore semaphore) {
public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService, Semaphore semaphoreNbLines) {
this.service = service;
this.emailService = emailService;
this.providerRepository = providerRepository;
this.executionReportService = executionReportService;
this.logFileService = logFileService;
this.semaphore = semaphore;
this.semaphoreNbLines = semaphoreNbLines;
}

@PostConstruct
Expand All @@ -80,90 +72,75 @@ public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {
log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset() + " / value : " + lignesKbart.value());
try {
//traitement de chaque ligne kbart
if (lignesKbart.topic().equals(topicKbart)) {
this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray());
ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
String providerName = Utils.extractProvider(filename);
executorService.execute(() -> {
try {
service.processConsumerRecord(lignesKbart, providerName, isForced);
} catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
} catch (IllegalPpnException | BestPpnException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
}
});
}
this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray());
ThreadContext.put("package", (filename)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
String providerName = Utils.extractProvider(filename);
executorService.execute(() -> {
try {
service.processConsumerRecord(lignesKbart, providerName, isForced);
} catch (ExecutionException | InterruptedException | IOException | URISyntaxException e) {
addDataError(e.getMessage());
} catch (IllegalPpnException | BestPpnException e) {
addBestPPNSearchError(e.getMessage());
}
});
} catch (IllegalProviderException e) {
isOnError = true;
log.error("Erreur dans les données en entrée, provider incorrect");
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
addDataError(e.getMessage());
}

}

@KafkaListener(topics = {"${topic.name.source.nbLines}"}, groupId = "nbLinesLocal", containerFactory = "kafkaNbLinesListenerContainerFactory")

@KafkaListener(topics = {"${topic.name.source.nbLines}"}, groupId = "${topic.groupid.source.nbLines}", containerFactory = "kafkaNbLinesListenerContainerFactory")
public void listenNbLines(ConsumerRecord<String, String> nbLines) {
try {
semaphore.acquire();
log.info("Permit acquis");
semaphoreNbLines.acquire();
log.debug("Permit acquis");
if (this.filename.equals(extractFilenameFromHeader(nbLines.headers().toArray()))) {
log.info("condition vérifiée : " + nbLines.value());
totalLine = Integer.parseInt(nbLines.value());
executionReportService.setNbtotalLines(Integer.parseInt(nbLines.value()));
if (!isOnError) {
String providerName = Utils.extractProvider(filename);
Optional<Provider> providerOpt = providerRepository.findByProvider(providerName);
service.commitDatas(providerOpt, providerName, filename);
//quel que soit le résultat du traitement, on envoie le rapport par mail
log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + totalLine);
log.info("Nombre de best ppn trouvé : " + executionReportService.getExecutionReport().getNbBestPpnFind() + "/" + executionReportService.getExecutionReport().getNbtotalLines());
emailService.sendMailWithAttachment(filename);
logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
totalLine = 0;
logFileService.createExecutionReport(filename, executionReportService.getExecutionReport(), isForced);
} else {
isOnError = false;
}
}
} catch (IllegalPackageException | IllegalDateException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
} catch (IllegalProviderException e) {
isOnError = true;
log.error("Erreur dans les données en entrée, provider incorrect");
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
} catch (ExecutionException | InterruptedException | IOException e) {
isOnError = true;
log.error(e.getMessage());
emailService.addLineKbartToMailAttachementWithErrorMessage(e.getMessage());
executionReportService.addNbLinesWithInputDataErrors();
addBestPPNSearchError(e.getMessage());
} catch (IllegalProviderException | ExecutionException | InterruptedException | IOException e) {
addDataError(e.getMessage());
} finally {
semaphore.release();
log.info("semaphore libéré");
semaphoreNbLines.release();
log.debug("semaphore libéré");
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
service.clearListesKbart();
}
}

/* @KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "errorsLocal", containerFactory = "kafkaKbartListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory")
public void listenErrors(ConsumerRecord<String, String> error) {
executorService.shutdownNow();
isOnError = true;
log.error(error.value());
emailService.addLineKbartToMailAttachementWithErrorMessage(error.value());
emailService.sendMailWithAttachment(filename);
logFileService.createExecutionReport(filename, totalLine, executionReportService.getNbLinesOk(), executionReportService.getExecutionReport().getNbLinesWithInputDataErrors(), executionReportService.getExecutionReport().getNbLinesWithErrorsInBestPPNSearch(), isForced);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
} */
try {
if (this.filename.equals(extractFilenameFromHeader(error.headers().toArray()))) {
//erreur lors du chargement du fichier détectée, on arrête tous les threads en cours pour ne pas envoyer de lignes dans le topic
executorService.shutdown();
isOnError = true;
log.error(error.value());
emailService.addLineKbartToMailAttachementWithErrorMessage(error.value());
logFileService.createExecutionReport(filename, executionReportService.getExecutionReport(), isForced);
emailService.clearMailAttachment();
executionReportService.clearExecutionReport();
service.clearListesKbart();
}
} catch (IOException e) {
addDataError(e.getMessage());
}
}

private String extractFilenameFromHeader(Header[] headers) {
String nomFichier = "";
Expand All @@ -177,4 +154,19 @@ private String extractFilenameFromHeader(Header[] headers) {
}
return nomFichier;
}


private void addBestPPNSearchError(String message) {
isOnError = true;
log.error(message);
emailService.addLineKbartToMailAttachementWithErrorMessage(message);
executionReportService.addNbLinesWithErrorsInBestPPNSearch();
}

private void addDataError(String message) {
isOnError = true;
log.error(message);
emailService.addLineKbartToMailAttachementWithErrorMessage(message);
executionReportService.addNbLinesWithInputDataErrors();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ public class ExecutionReportService {
@Getter
private ExecutionReport executionReport = new ExecutionReport();

public int getNbLinesOk(){
return executionReport.getNbtotalLines() - executionReport.getNbLinesWithErrorsInBestPPNSearch() - executionReport.getNbLinesWithInputDataErrors();
public void setNbtotalLines(int nbtotalLines) {
this.executionReport.setNbtotalLines(nbtotalLines);
}

public void addNbBestPpnFind(){
executionReport.incrementNbBestPpnFind();
}
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/fr/abes/bestppn/service/KbartService.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,17 @@ public void processConsumerRecord(ConsumerRecord<String, String> lignesKbart, St

public void commitDatas(Optional<Provider> providerOpt, String providerName, String filename) throws IllegalPackageException, IllegalDateException, ExecutionException, InterruptedException, IOException {
ProviderPackage provider = providerService.handlerProvider(providerOpt, filename, providerName);

producer.sendKbart(kbartToSend, provider, filename);
producer.sendPrintNotice(ppnToCreate, filename);
producer.sendPpnExNihilo(ppnFromKbartToCreate, provider, filename);
clearListesKbart();
producer.sendEndOfTraitmentReport(filename);
}

public void clearListesKbart() {
kbartToSend.clear();
ppnToCreate.clear();
ppnFromKbartToCreate.clear();

producer.sendEndOfTraitmentReport(filename);
}

private static LigneKbartImprime getLigneKbartImprime(PpnWithDestinationDto ppnWithDestinationDto, LigneKbartDto ligneFromKafka) {
Expand Down
20 changes: 9 additions & 11 deletions src/main/java/fr/abes/bestppn/service/LogFileService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package fr.abes.bestppn.service;

import fr.abes.bestppn.entity.ExecutionReport;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

Expand All @@ -20,13 +21,10 @@ public class LogFileService {
/**
* Méthode qui créé le rapport d'execution dans un fichier log indépendant du reste de l'application
* @param fileName le nom du fichier
* @param totalLines le nombre total de lignes pour le fichier concerné
* @param linesOk le nombre de lignes OK pour le fichier concerné
* @param linesWithInputDataErrors le nombre de lignes contenant des erreurs de données
* @param linesWithErrorsInBestPPNSearch le nombre total de lignes contenant des erreurs lors de la recherche du bestPpn
* @param executionReport le rapport d'exécution qui sert à alimenter le fichier
* @throws IOException exception levée
*/
public void createExecutionReport(String fileName, int totalLines, int linesOk, int linesWithInputDataErrors, int linesWithErrorsInBestPPNSearch, boolean injectKafka) throws IOException {
public void createExecutionReport(String fileName, ExecutionReport executionReport, boolean isForced) throws IOException {
try {
// Création du fichier de log
Logger logger = Logger.getLogger("ExecutionReport");
Expand All @@ -37,17 +35,17 @@ public void createExecutionReport(String fileName, int totalLines, int linesOk,
SimpleFormatter formatter = new SimpleFormatter();
fh.setFormatter(formatter);
logger.setUseParentHandlers(false); // désactive l'affichage du log dans le terminal
logger.info("TOTAL LINES : " + totalLines + System.lineSeparator()
+ "LINES OK : " + linesOk + System.lineSeparator()
+ "LINES WITH INPUT DATA ERRORS : " + linesWithInputDataErrors + System.lineSeparator()
+ "LINES WITH ERRORS IN BESTPPN SEARCH : " + linesWithErrorsInBestPPNSearch + System.lineSeparator()
+ "FORCE_OPTION : " + injectKafka + System.lineSeparator());
logger.info("TOTAL LINES : " + executionReport.getNbtotalLines() + System.lineSeparator()
+ "LINES OK : " + executionReport.getNbLinesOk() + System.lineSeparator()
+ "LINES WITH INPUT DATA ERRORS : " + executionReport.getNbLinesWithInputDataErrors() + System.lineSeparator()
+ "LINES WITH ERRORS IN BESTPPN SEARCH : " + executionReport.getNbLinesWithErrorsInBestPPNSearch() + System.lineSeparator()
+ "FORCE_OPTION : " + isForced + System.lineSeparator());

// Fermeture du fichier de log
fh.close();

// Copie le fichier existant vers le répertoire temporaire en ajoutant sa date de création
if (source != null && Files.exists(source)) {
if (Files.exists(source)) {

// Vérification du chemin et création si inexistant
String tempLogWithSeparator = "tempLog" + File.separator;
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ mail.ws.url=
mail.ws.recipient=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.nbLines=nbLines
topic.groupid.source.errors=errors

logging.level.fr.abes.bestppn=INFO
4 changes: 3 additions & 1 deletion src/main/resources/application-prod.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ spring.sql.bacon.init.mode=never
mail.ws.url=
mail.ws.recipient=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.kbart=lignesKbart
topic.groupid.source.nbLines=nbLines
topic.groupid.source.errors=errors
4 changes: 3 additions & 1 deletion src/main/resources/application-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ spring.sql.bacon.init.mode=never
mail.ws.url=
mail.ws.recipient=

topic.groupid.source.kbart=lignesKbart
topic.groupid.source.kbart=lignesKbart
topic.groupid.source.nbLines=nbLines
topic.groupid.source.errors=errors

0 comments on commit 1ecea31

Please sign in to comment.