Skip to content

Commit

Permalink
Merge pull request #68 from abes-esr/develop
Browse files Browse the repository at this point in the history
develop to main
  • Loading branch information
pierre-maraval authored Feb 8, 2024
2 parents e2e8223 + b6ed931 commit 0da8ad8
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 56 deletions.
6 changes: 2 additions & 4 deletions src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ConcurrentHashMap;

@Configuration
@EnableKafka
Expand Down Expand Up @@ -132,6 +130,6 @@ public KafkaTemplate<String, String> kafkatemplateEndoftraitement(final Producer

@Bean
public Map<String, KafkaWorkInProgress> kafkaWorkInProgress() {
return Collections.synchronizedMap(new HashMap<>());
return new ConcurrentHashMap<>();
}
}
8 changes: 2 additions & 6 deletions src/main/java/fr/abes/bestppn/kafka/KafkaWorkInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,8 @@ public int getNbActiveThreads() {
return this.nbActiveThreads.get();
}

public void incrementNbLignesTraitees() {
this.nbLignesTraitees.incrementAndGet();
}

public int getNbLignesTraitees() {
return this.nbLignesTraitees.get();
public int incrementNbLignesTraiteesAndGet() {
return this.nbLignesTraitees.incrementAndGet();
}

public void setIsOnError(boolean error) {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {

LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class);
String providerName = Utils.extractProvider(ligneKbart.key());
workInProgress.get(filename).incrementNbLignesTraitees();
executorService.execute(() -> {
try {
log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + ligneKbart.key() + ";" + Thread.currentThread().getName());
Expand All @@ -89,7 +88,7 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
Header lastHeader = ligneKbart.headers().lastHeader("nbLinesTotal");
if (lastHeader != null) {
int nbLignesTotal = Integer.parseInt(new String(lastHeader.value()));
if (nbLignesTotal == workInProgress.get(filename).getNbLignesTraitees()) {
if (nbLignesTotal == workInProgress.get(filename).incrementNbLignesTraiteesAndGet()) {
if (workInProgress.get(filename).getSemaphore().tryAcquire()) {
workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal);
handleFichier(filename);
Expand Down Expand Up @@ -152,7 +151,7 @@ private void handleFichier(String filename) {
log.error("Le nom du fichier " + filename + " n'est pas correct. " + e);
emailService.sendProductionErrorEmail(filename, e.getMessage());
} finally {
log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).getNbLignesTraitees());
log.info("Traitement terminé pour fichier " + filename + " / nb lignes " + workInProgress.get(filename).incrementNbLignesTraiteesAndGet());
workInProgress.remove(filename);
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/main/java/fr/abes/bestppn/model/entity/ExecutionReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

public class ExecutionReport {
@Getter @Setter
private int nbtotalLines;
private AtomicInteger nbtotalLines;

private AtomicInteger nbBestPpnFind;

Expand All @@ -20,8 +20,13 @@ public ExecutionReport() {
nbBestPpnFind = new AtomicInteger(0);
nbLinesWithInputDataErrors = new AtomicInteger(0);
nbLinesWithErrorsInBestPPNSearch = new AtomicInteger(0);
nbtotalLines = 0;
nbtotalLines = new AtomicInteger(0);
}

public void setNbtotalLines(int nbTotalLignes) {
this.nbtotalLines.set(nbTotalLignes);
}

public int getNbBestPpnFind() {
return nbBestPpnFind.get();
}
Expand All @@ -35,7 +40,7 @@ public int getNbLinesWithErrorsInBestPPNSearch() {
}

public int getNbLinesOk(){
return nbtotalLines - nbLinesWithErrorsInBestPPNSearch.get() - nbLinesWithInputDataErrors.get();
return nbtotalLines.get() - nbLinesWithErrorsInBestPPNSearch.get() - nbLinesWithInputDataErrors.get();
}

public void incrementNbBestPpnFind() {
Expand All @@ -46,13 +51,6 @@ public void incrementNbLinesWithInputDataErrors() {
nbLinesWithInputDataErrors.incrementAndGet();
}

public void clear(){
nbtotalLines = 0;
nbBestPpnFind.set(0);
nbLinesWithInputDataErrors.set(0);
nbLinesWithErrorsInBestPPNSearch.set(0);
}

public void incrementNbLinesWithErrorsInBestPPNSearch() {
nbLinesWithErrorsInBestPPNSearch.incrementAndGet();
}
Expand Down
32 changes: 0 additions & 32 deletions src/main/java/fr/abes/bestppn/service/ExecutionReportService.java

This file was deleted.

8 changes: 7 additions & 1 deletion src/main/java/fr/abes/bestppn/service/KbartService.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ public void processConsumerRecord(LigneKbartDto ligneFromKafka, String providerN
BestPpn bestPpn = service.getBestPpn(ligneFromKafka, providerName, isForced, false);
switch (Objects.requireNonNull(bestPpn.getDestination())) {
case BEST_PPN_BACON -> ligneFromKafka.setBestPpn(bestPpn.getPpn());
case PRINT_PPN_SUDOC -> workInProgress.get(filename).addPpnToCreate(getLigneKbartImprime(bestPpn, ligneFromKafka));
case PRINT_PPN_SUDOC -> {
//on ne lance la création dans le Sudoc que pour les monographie
if (ligneFromKafka.getPublicationType().equals("monograph")) {
workInProgress.get(filename).addPpnToCreate(getLigneKbartImprime(bestPpn, ligneFromKafka));
}
}
case NO_PPN_FOUND_SUDOC -> {
//on ne lance la création dans le Sudoc que pour les monographie
if (ligneFromKafka.getPublicationType().equals("monograph")) {
workInProgress.get(filename).addPpnFromKbartToCreate(ligneFromKafka);
}
Expand Down

0 comments on commit 0da8ad8

Please sign in to comment.