Skip to content

Commit

Permalink
Merge pull request #12 from abes-esr/CDE-170-modifier-le-consumer-pou…
Browse files Browse the repository at this point in the history
…r-creer-un-kafka-connect-entre-le-topic-et-base-bacon

Cde 170 modifier le consumer pour creer un kafka connect entre le topic et base bacon
  • Loading branch information
SamuelQuetin authored Sep 28, 2023
2 parents 6831004 + bd28dc2 commit eb138c4
Show file tree
Hide file tree
Showing 15 changed files with 2,692 additions and 47 deletions.
37 changes: 37 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
<!-- BDD -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
Expand Down Expand Up @@ -269,8 +279,35 @@
</plugin>
</plugins>
</pluginManagement>
<plugins>
<!-- génération des classes pour le schema registry kafka -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/fr/abes/bestppn/dto/connect/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<scm>
<connection>scm:git:https://github.com/abes-esr/best-ppn-api.git</connection>
<tag>HEAD</tag>
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/fr/abes/bestppn/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package fr.abes.bestppn.configuration;

import fr.abes.bestppn.dto.connect.LigneKbartConnect;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -28,6 +32,9 @@ public class KafkaConfig {
@Value("${spring.kafka.consumer.properties.isolation.level}")
private String isolationLevel;

@Value("${spring.kafka.registry.url}")
private String registryUrl;

@Bean
public ConsumerFactory<String, String> consumerKbartFactory() {
Map<String, Object> props = new HashMap<>();
Expand All @@ -53,7 +60,8 @@ public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
return props;
}

Expand All @@ -73,4 +81,7 @@ public KafkaTransactionManager<String, String> kafkaTransactionManager(){
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public KafkaProducer<String, LigneKbartConnect> kafkaProducer() { return new KafkaProducer<>(producerConfigs());}
}
2,344 changes: 2,344 additions & 0 deletions src/main/java/fr/abes/bestppn/dto/connect/LigneKbartConnect.java

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion src/main/java/fr/abes/bestppn/dto/kafka/LigneKbartDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Data
@NoArgsConstructor
public class LigneKbartDto {
Expand Down Expand Up @@ -110,10 +112,19 @@ public class LigneKbartDto {
@CsvBindByPosition(position = 24)
@JsonProperty("access_type")
private String accessType;
@CsvBindByName(column = "bestPpn")
@CsvBindByName(column = "best_ppn")
@CsvBindByPosition(position = 25)
@JsonProperty("best_ppn")
private String bestPpn;

@JsonProperty("provider_package_package")
private String providerPackagePackage;
@JsonProperty("provider_package_date_p")
private Date providerPackageDateP;
@JsonProperty("provider_package_idt_provider")
private Integer providerPackageIdtProvider;


@JsonIgnore
@CsvBindByName(column = "errorType")
@CsvBindByPosition(position = 26)
Expand Down
38 changes: 21 additions & 17 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,14 @@ public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {

if(lignesKbart.value().equals("OK") ){
if( !isOnError ) {
if (providerOpt.isPresent()) {
Provider provider = providerOpt.get();
ProviderPackageId providerPackageId = new ProviderPackageId(Utils.extractPackageName(filename), Utils.extractDate(filename), provider.getIdtProvider());
Optional<ProviderPackage> providerPackage = providerPackageRepository.findByProviderPackageId(providerPackageId);
//pas d'info de package, on le crée
providerPackage.orElseGet(() -> providerPackageRepository.save(new ProviderPackage(providerPackageId, 'N')));
} else {
//pas de provider, ni de package, on les crée tous les deux
Provider newProvider = new Provider(providerName);
Provider savedProvider = providerRepository.save(newProvider);
ProviderPackage providerPackage = new ProviderPackage(new ProviderPackageId(Utils.extractPackageName(filename), Utils.extractDate(filename), savedProvider.getIdtProvider()), 'N');
providerPackageRepository.save(providerPackage);
}
ProviderPackage provider = handlerProvider(providerOpt, filename, providerName);

// TODO vérifier s'il est pertinent de retirer le "_FORCE" du paramètre FileName du header avant envoi au producer
// fileName = fileName.contains("_FORCE") ? fileName.replace("_FORCE", "") : fileName;

producer.sendKbart(kbartToSend, lignesKbart.headers());
producer.sendPrintNotice(ppnToCreate, lignesKbart.headers());
producer.sendPpnExNihilo(ppnFromKbartToCreate, lignesKbart.headers());
producer.sendKbart(kbartToSend, provider);
producer.sendPrintNotice(ppnToCreate, provider);
producer.sendPpnExNihilo(ppnFromKbartToCreate, provider);
//producer.sendOk(lignesKbart.headers());
} else {
isOnError = false;
}
Expand Down Expand Up @@ -158,6 +146,22 @@ public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {
}
}

private ProviderPackage handlerProvider(Optional<Provider> providerOpt, String filename, String providerName) throws IllegalPackageException, IllegalDateException {
if (providerOpt.isPresent()) {
Provider provider = providerOpt.get();
ProviderPackageId providerPackageId = new ProviderPackageId(Utils.extractPackageName(filename), Utils.extractDate(filename), provider.getIdtProvider());
Optional<ProviderPackage> providerPackage = providerPackageRepository.findByProviderPackageId(providerPackageId);
//pas d'info de package, on le crée
return providerPackage.orElseGet(() -> providerPackageRepository.save(new ProviderPackage(providerPackageId, 'N')));
} else {
//pas de provider, ni de package, on les crée tous les deux
Provider newProvider = new Provider(providerName);
Provider savedProvider = providerRepository.save(newProvider);
ProviderPackage providerPackage = new ProviderPackage(new ProviderPackageId(Utils.extractPackageName(filename), Utils.extractDate(filename), savedProvider.getIdtProvider()), 'N');
return providerPackageRepository.save(providerPackage);
}
}

private void addLineToMailAttachementWithErrorMessage(String messageError) {
LigneKbartDto ligneVide = new LigneKbartDto();
ligneVide.setErrorType(messageError);
Expand Down
88 changes: 73 additions & 15 deletions src/main/java/fr/abes/bestppn/kafka/TopicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.bestppn.dto.connect.LigneKbartConnect;
import fr.abes.bestppn.dto.kafka.LigneKbartDto;
import fr.abes.bestppn.dto.kafka.PpnKbartProviderDto;
import fr.abes.bestppn.entity.bacon.ProviderPackage;
import fr.abes.bestppn.exception.BestPpnException;
import fr.abes.bestppn.utils.Utils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
Expand All @@ -18,6 +21,9 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.List;

@Slf4j
Expand All @@ -37,45 +43,97 @@ public class TopicProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private KafkaProducer<String, LigneKbartConnect> producer;

private final ObjectMapper mapper;


@Transactional(transactionManager = "kafkaTransactionManager", rollbackFor = {BestPpnException.class, JsonProcessingException.class})
public void sendKbart(List<LigneKbartDto> kbart, Headers headers) throws JsonProcessingException, BestPpnException {
public void sendKbart(List<LigneKbartDto> kbart, ProviderPackage provider) throws JsonProcessingException, BestPpnException {
for (LigneKbartDto ligne : kbart) {
ligne.setProviderPackagePackage(provider.getProviderPackageId().getPackageName());
ligne.setProviderPackageDateP(provider.getProviderPackageId().getDateP());
ligne.setProviderPackageIdtProvider(provider.getProviderPackageId().getProviderIdtProvider());
if( ligne.isBestPpnEmpty()){
throw new BestPpnException("La ligne " + ligne +" n'a pas de BestPpn.");
}
setHeadersAndSend(headers, mapper.writeValueAsString(ligne), topicKbart);
sendObject(ligne, topicKbart);
}
log.debug("message envoyé vers {}", topicKbart);
}


@Transactional(transactionManager = "kafkaTransactionManager")
public void sendPrintNotice(List<PpnKbartProviderDto> ppnKbartProviderDtoList, Headers headers) throws JsonProcessingException {
public void sendPrintNotice(List<PpnKbartProviderDto> ppnKbartProviderDtoList, ProviderPackage provider) throws JsonProcessingException {
for (PpnKbartProviderDto ppnToCreate : ppnKbartProviderDtoList) {
setHeadersAndSend(headers, mapper.writeValueAsString(ppnToCreate), topicNoticeImprimee);
ppnToCreate.getKbart().setProviderPackagePackage(provider.getProviderPackageId().getPackageName());
ppnToCreate.getKbart().setProviderPackageDateP(provider.getProviderPackageId().getDateP());
ppnToCreate.getKbart().setProviderPackageIdtProvider(provider.getProviderPackageId().getProviderIdtProvider());
send(mapper.writeValueAsString(ppnToCreate), topicNoticeImprimee);
}
log.debug("message envoyé vers {}", topicNoticeImprimee);
}

@Transactional(transactionManager = "kafkaTransactionManager")
public void sendPpnExNihilo(List<LigneKbartDto> ppnFromKbartToCreate, Headers headers) throws JsonProcessingException {
public void sendPpnExNihilo(List<LigneKbartDto> ppnFromKbartToCreate, ProviderPackage provider) throws JsonProcessingException {
for (LigneKbartDto ligne : ppnFromKbartToCreate) {
setHeadersAndSend(headers, mapper.writeValueAsString(ligne), topicKbartPpnToCreate);
ligne.setProviderPackagePackage(provider.getProviderPackageId().getPackageName());
ligne.setProviderPackageDateP(provider.getProviderPackageId().getDateP());
ligne.setProviderPackageIdtProvider(provider.getProviderPackageId().getProviderIdtProvider());
send(mapper.writeValueAsString(ligne), topicKbartPpnToCreate);
}
log.debug("message envoyé vers {}", topicKbartPpnToCreate);
}

private void setHeadersAndSend(Headers headers, String value, String topic) {
MessageBuilder<String> messageBuilder = MessageBuilder
private void send(String value, String topic) {
Message<String> message = MessageBuilder
.withPayload(value)
.setHeader(KafkaHeaders.TOPIC, topic);
for (Header header : headers.toArray()) {
messageBuilder.setHeader(header.key(), header.value());
}
Message<String> message = messageBuilder.build();
.setHeader(KafkaHeaders.TOPIC, topic).build();
kafkaTemplate.send(message);
}

private void sendObject(LigneKbartDto ligneKbartDto, String topic) {
LigneKbartConnect ligne = new LigneKbartConnect();
ligne.setPUBLICATIONTITLE(ligneKbartDto.getPublicationTitle());
ligne.setPRINTIDENTIFIER(ligneKbartDto.getPrintIdentifier());
ligne.setONLINEIDENTIFIER(ligneKbartDto.getOnlineIdentifier());
ligne.setDATEFIRSTISSUEONLINE(Utils.formatDate(ligneKbartDto.getDateFirstIssueOnline(), true));
ligne.setDATELASTISSUEONLINE(Utils.formatDate(ligneKbartDto.getDateLastIssueOnline(), false));
ligne.setDATEMONOGRAPHPUBLISHEDPRINT(Utils.formatDate(ligneKbartDto.getDateMonographPublishedPrint(), true));
ligne.setDATEMONOGRAPHPUBLISHEDONLIN(Utils.formatDate(ligneKbartDto.getDateMonographPublishedOnline(), true));
ligne.setNUMFIRSTVOLONLINE((ligneKbartDto.getNumFirstVolOnline() != null) ? ligneKbartDto.getNumFirstVolOnline().toString() : "");
ligne.setNUMFIRSTISSUEONLINE((ligneKbartDto.getNumFirstIssueOnline() != null) ? ligneKbartDto.getNumFirstIssueOnline().toString() : "");
ligne.setNUMLASTVOLONLINE((ligneKbartDto.getNumLastVolOnline() != null) ? ligneKbartDto.getNumLastVolOnline().toString() : "");
ligne.setNUMLASTISSUEONLINE((ligneKbartDto.getNumLastIssueOnline() != null) ? ligneKbartDto.getNumLastIssueOnline().toString() : "");
ligne.setTITLEURL(ligneKbartDto.getTitleUrl());
ligne.setFIRSTAUTHOR(ligneKbartDto.getFirstAuthor());
ligne.setTITLEID(ligneKbartDto.getTitleId());
ligne.setEMBARGOINFO(ligneKbartDto.getEmbargoInfo());
ligne.setCOVERAGEDEPTH(ligneKbartDto.getCoverageDepth());
ligne.setNOTES(ligneKbartDto.getNotes());
ligne.setPUBLISHERNAME(ligneKbartDto.getPublisherName());
ligne.setPUBLICATIONTYPE(ligneKbartDto.getPublicationType());
ligne.setMONOGRAPHVOLUME((ligneKbartDto.getMonographVolume() != null) ? ligneKbartDto.getMonographVolume().toString() : "");
ligne.setMONOGRAPHEDITION(ligneKbartDto.getMonographEdition());
ligne.setFIRSTEDITOR(ligneKbartDto.getFirstEditor());
ligne.setPARENTPUBLICATIONTITLEID(ligneKbartDto.getParentPublicationTitleId());
ligne.setPRECEDINGPUBLICATIONTITLEID(ligneKbartDto.getPrecedingPublicationTitleId());
ligne.setACCESSTYPE(ligneKbartDto.getAccessType());
ligne.setPROVIDERPACKAGEPACKAGE(ligneKbartDto.getProviderPackagePackage());
ligne.setPROVIDERPACKAGEDATEP(Utils.convertDateToLocalDate(ligneKbartDto.getProviderPackageDateP()));
ligne.setPROVIDERPACKAGEIDTPROVIDER(ligneKbartDto.getProviderPackageIdtProvider());
ligne.setBESTPPN(ligneKbartDto.getBestPpn());

ProducerRecord<String, LigneKbartConnect> record = new ProducerRecord<>(topic, ligne);
producer.send(record, (recordMetadata, e) -> {
if (e == null) {
log.debug("Envoi à Kafka " + recordMetadata);
}
else {
log.error(e.getMessage());
}
});
}

}
31 changes: 27 additions & 4 deletions src/main/java/fr/abes/bestppn/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -92,4 +91,28 @@ public static Date extractDate(String filename) throws IllegalDateException {
}
}

public static LocalDate convertDateToLocalDate(Date dateToConvert) {
return dateToConvert.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
}

/**
* Permet de formatter une date en entrée dans un fichier kbart et de renvoyer la date prête à être envoyée
* @param date : date à formatter
* @param debut : indique s'il s'agit d'une date de début : true : date de début, false : date de fin
* @return la date formattée
*/
public static String formatDate(String date, boolean debut) {
if (date == null) return null;
String yearRegExp = "([\\d]{4})";
String dateRegExp = "\\d{4}-\\d{2}-\\d{2}";
int day = (debut) ? 1 : 31;
int month = (debut) ? Calendar.JANUARY : Calendar.DECEMBER;
if (date.matches(yearRegExp)) {
return new GregorianCalendar(Integer.parseInt(date), month, day).toZonedDateTime().toLocalDate().toString();
}
if (date.matches(dateRegExp)) {
return date;
}
return null;
}
}
1 change: 1 addition & 0 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=

url.onlineId2Ppn=
url.printId2Ppn=
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application-prod.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=

url.onlineId2Ppn=
url.printId2Ppn=
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application-test.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=

url.onlineId2Ppn=
url.printId2Ppn=
Expand Down
2 changes: 0 additions & 2 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ logging.config=classpath:log4j2-all.xml

# Common Kafka Properties
auto.create.topics.enable=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.properties.isolation.level= read_committed

Expand Down
Loading

0 comments on commit eb138c4

Please sign in to comment.