Skip to content

Commit

Permalink
Merge pull request #106 from abes-esr/CDE-424-opti-suppression-header
Browse files Browse the repository at this point in the history
Cde 424 opti suppression header
  • Loading branch information
SamuelQuetin authored Aug 30, 2024
2 parents a0365ae + 43cd7ca commit a8440ee
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 49 deletions.
2 changes: 1 addition & 1 deletion README-developpement.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ La classe `TopicConsumer.java` comporte deux `@KafkaListener` qui ont les rôles
1. `kbartFromkafkaListener()` :
- un objet `KafkaWorkInProgress` sera créé et placé dans la `Map<String, KafkaWorkInProgress>` pour chaque nouveau nom de fichier kbart détecté à la lecture des messages dans le topic kafka `bacon.kbart.toload`
- lit les messages kafka à partir du topic `bacon.kbart.toload` (chaque message correspond à une ligne d'un fichier kbart lu par l'API [kbart2kafka](https://github.com/abes-esr/kbart2kafka))
> :information_source: Le nom du fichier kbart se trouve dans la `key` du message. Le numéro de la ligne courante `nbCurrentLines` ainsi que le nombre de ligne total du fichier kbart `nbLinesTotal` sont passés dans le header du message
> :information_source: Le nom du fichier kbart se trouve dans la `key` du message. Le numéro de la ligne courante `nbCurrentLines` ainsi que le nombre de ligne total du fichier kbart `nbLinesTotal` sont passés dans la dto du message
- demande le calcul du best ppn pour chaque ligne. Le calcul du best ppn sera ignoré si le nom du fichier comporte la chaine de caractère `_BYPASS`
- demande l'envoi des lignes vers de nouveaux topics
- gère les actions consécutives aux erreurs relevées durant le traitement d'une ligne. Certaines erreurs seront ignorées si le nom du fichier comporte la chaine de caractère `_FORCE`
Expand Down
44 changes: 19 additions & 25 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
import fr.abes.bestppn.utils.Utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;

Expand Down Expand Up @@ -52,19 +49,19 @@ public TopicConsumer(ObjectMapper mapper, KbartService service, EmailService ema
*/
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}")
public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
String filename = ligneKbart.key();
String filename = extractFilenameFromKey(ligneKbart.key());
if (!this.workInProgress.containsKey(filename)) {
//nouveau fichier trouvé dans le topic, on initialise les variables partagées
workInProgress.put(filename, new KafkaWorkInProgress(ligneKbart.key().contains("_FORCE"), ligneKbart.key().contains("_BYPASS")));
}
try {
//traitement de chaque ligne kbart
LigneKbartDto ligneKbartDto = mapper.readValue(ligneKbart.value(), LigneKbartDto.class);
String providerName = Utils.extractProvider(ligneKbart.key());
String providerName = Utils.extractProvider(filename);
try {
log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + ligneKbart.key() + ";" + Thread.currentThread().getName());
log.info("Partition;" + ligneKbart.partition() + ";offset;" + ligneKbart.offset() + ";fichier;" + filename + ";" + Thread.currentThread().getName());
workInProgress.get(filename).incrementThreads();
String origineNbCurrentLine = new String(ligneKbart.headers().lastHeader("nbCurrentLines").value());
int origineNbCurrentLine = ligneKbartDto.getNbCurrentLines();
ThreadContext.put("package", (filename + ";" + origineNbCurrentLine)); //Ajoute le nom de fichier dans le contexte du thread pour log4j
service.processConsumerRecord(ligneKbartDto, providerName, workInProgress.get(filename).isForced(), workInProgress.get(filename).isBypassed(), filename);
} catch (IOException | URISyntaxException | RestClientException | IllegalDoiException e) {
Expand All @@ -85,19 +82,13 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> ligneKbart) {
if (ligneKbartDto.getBestPpn() != null && !ligneKbartDto.getBestPpn().isEmpty())
workInProgress.get(filename).addNbBestPpnFindedInExecutionReport();
workInProgress.get(filename).addLineKbartToMailAttachment(ligneKbartDto);
Header lastHeader = ligneKbart.headers().lastHeader("nbLinesTotal");
if (lastHeader != null) {
int nbLignesTotal = Integer.parseInt(new String(lastHeader.value()));
int nbCurrentLine = workInProgress.get(filename).incrementNbLignesTraiteesAndGet();
log.debug("Ligne en cours : {} NbLignesTotal : {}", nbCurrentLine, nbLignesTotal);
if (nbLignesTotal == nbCurrentLine) {
log.debug("Commit du fichier {}", filename);
workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal);
handleFichier(filename);
}
} else {
String errorMsg = "Header absent de la ligne : " + ligneKbart.headers();
log.error(errorMsg);
int nbLignesTotal = ligneKbartDto.getNbLinesTotal();
int nbCurrentLine = workInProgress.get(filename).incrementNbLignesTraiteesAndGet();
log.debug("Ligne en cours : {} NbLignesTotal : {}", nbCurrentLine, nbLignesTotal);
if (nbLignesTotal == nbCurrentLine) {
log.debug("Commit du fichier {}", filename);
workInProgress.get(filename).setNbtotalLinesInExecutionReport(nbLignesTotal);
handleFichier(filename);
}
//on ne décrémente pas le nb de thread si l'objet de suivi a été supprimé après la production des messages dans le second topic
if (workInProgress.get(filename) != null)
Expand Down Expand Up @@ -145,13 +136,16 @@ private void handleFichier(String filename) {
@KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory")
public void errorsListener(ConsumerRecord<String, String> error) {
log.error(error.value());
String filename = error.key();
String filename = extractFilenameFromKey(error.key());
if (workInProgress.containsKey(filename)) {
String fileNameFromError = error.key();
emailService.sendProductionErrorEmail(fileNameFromError, error.value());
logFileService.createExecutionReport(fileNameFromError, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced());
emailService.sendProductionErrorEmail(filename, error.value());
logFileService.createExecutionReport(filename, workInProgress.get(filename).getExecutionReport(), workInProgress.get(filename).isForced());
workInProgress.get(filename).setIsOnError(true);
handleFichier(fileNameFromError);
handleFichier(filename);
}
}

private String extractFilenameFromKey (String key) {
return key.substring(0, key.lastIndexOf('_'));
}
}
30 changes: 10 additions & 20 deletions src/main/java/fr/abes/bestppn/kafka/TopicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -94,6 +95,7 @@ public void sendBypassToLoad(List<LigneKbartDto> kbart, ProviderPackage provider

private void sendToTopic(List<LigneKbartDto> kbart, ProviderPackage provider, String filename, String destinationTopic) {
Integer nbLigneTotal = kbart.size();
AtomicInteger index = new AtomicInteger(0);
for (LigneKbartDto ligneKbartDto : kbart) {
ligneKbartDto.setIdProviderPackage(provider.getIdProviderPackage());
ligneKbartDto.setProviderPackagePackage(provider.getPackageName());
Expand All @@ -103,7 +105,7 @@ private void sendToTopic(List<LigneKbartDto> kbart, ProviderPackage provider, St
List<Header> headerList = new ArrayList<>();
executorService.execute(() -> {
headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes()));
ProducerRecord<String, LigneKbartConnect> record = new ProducerRecord<>(destinationTopic, new Random().nextInt(nbThread), filename, ligneKbartConnect, headerList);
ProducerRecord<String, LigneKbartConnect> record = new ProducerRecord<>(destinationTopic, new Random().nextInt(nbThread), filename+"_"+index.getAndIncrement(), ligneKbartConnect, headerList);
CompletableFuture<SendResult<String, LigneKbartConnect>> result = kafkaTemplateConnect.send(record);
assert result != null : "Result est null, donc exception";
result.whenComplete((sr, ex) -> {
Expand All @@ -125,8 +127,9 @@ private void sendToTopic(List<LigneKbartDto> kbart, ProviderPackage provider, St
*/
public void sendPrintNotice(List<LigneKbartImprime> ligneKbartImprimes, String filename) {
Integer nbLigneTotal = ligneKbartImprimes.size();
int index = 0;
for (LigneKbartImprime ppnToCreate : ligneKbartImprimes) {
sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename, nbLigneTotal);
sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename+"_"+(index++), nbLigneTotal);
}
if (!ligneKbartImprimes.isEmpty())
log.debug("message envoyé vers {}", topicNoticeImprimee);
Expand All @@ -140,12 +143,13 @@ public void sendPrintNotice(List<LigneKbartImprime> ligneKbartImprimes, String f
*/
public void sendPpnExNihilo(List<LigneKbartDto> ppnFromKbartToCreate, ProviderPackage provider, String filename) {
Integer nbLigneTotal = ppnFromKbartToCreate.size();
int index = 0;
for (LigneKbartDto ligne : ppnFromKbartToCreate) {
ligne.setIdProviderPackage(provider.getIdProviderPackage());
ligne.setProviderPackagePackage(provider.getPackageName());
ligne.setProviderPackageDateP(provider.getDateP());
ligne.setProviderPackageIdtProvider(provider.getProviderIdtProvider());
sendNoticeExNihilo(ligne, topicKbartPpnToCreate, filename, nbLigneTotal);
sendNoticeExNihilo(ligne, topicKbartPpnToCreate, filename+"_"+(index++), nbLigneTotal);
}
if (!ppnFromKbartToCreate.isEmpty())
log.debug("message envoyé vers {}", topicKbartPpnToCreate);
Expand Down Expand Up @@ -175,14 +179,14 @@ private void sendNoticeExNihilo(LigneKbartDto ligneKbartDto, String topic, Strin
* Méthode envoyant un objet de notice imprimé sur un topic Kafka
* @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider
* @param topic : topic d'envoi de la ligne
* @param filename : clé kafka de la ligne correspondant au nom du fichier
* @param key : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel
* @param nbLignesTotal : nombre de lignes totales du fichier
*/
private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String filename, Integer nbLignesTotal) {
private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String key, Integer nbLignesTotal) {
List<Header> headerList = new ArrayList<>();
headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes()));
try {
ProducerRecord<String, LigneKbartImprime> record = new ProducerRecord<>(topic, null, filename, ligne, headerList);
ProducerRecord<String, LigneKbartImprime> record = new ProducerRecord<>(topic, null, key, ligne, headerList);
final SendResult<String, LigneKbartImprime> result = kafkaTemplateImprime.send(record).get();
logEnvoi(result, record);
} catch (Exception e) {
Expand All @@ -196,18 +200,4 @@ private void logEnvoi(SendResult<String, ?> result, ProducerRecord<String, ?> re
log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d, headers=%s)",
record.key(), record.value(), metadata.topic(), metadata.partition(), metadata.offset(), Stream.of(result.getProducerRecord().headers().toArray()).map(h -> h.key() + ":" + Arrays.toString(h.value())).collect(Collectors.joining(";"))));
}

private Header constructHeader(String key, byte[] value) {
return new Header() {
@Override
public String key() {
return key;
}

@Override
public byte[] value() {
return value;
}
};
}
}
4 changes: 1 addition & 3 deletions src/main/java/fr/abes/bestppn/model/dto/PackageKbartDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

@Getter
@Setter
Expand All @@ -26,7 +27,4 @@ public void addKbartDto(LigneKbartDto ligneKbartDto) {
this.kbartDtos.add(ligneKbartDto);
}

public void clearKbartDto(){
this.kbartDtos.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
@Data
@NoArgsConstructor
public class LigneKbartDto {

@JsonProperty("nbCurrentLines")
private int nbCurrentLines;
@JsonProperty("nbLinesTotal")
private int nbLinesTotal;

@CsvBindByName(column = "publication_title")
@CsvBindByPosition(position = 0)
@JsonProperty("publication_title")
Expand Down

0 comments on commit a8440ee

Please sign in to comment.