Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cde 204 renommer topic dans producer #10

Merged
merged 8 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.5</version>
<version>3.1.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>fr.abes</groupId>
Expand Down Expand Up @@ -154,7 +154,7 @@
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.7.1</version>
<version>5.8</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand All @@ -177,7 +177,7 @@
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.13.2</version>
<version>2.15.2</version>
</dependency>

<!-- TEST -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public ObjectMapper objectMapper() {
ObjectMapper objectMapper = builder.build();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
// return JsonMapper.builder().addModule(new JavaTimeModule()).build();
}

@Bean
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/fr/abes/bestppn/controller/BestPpnController.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public String bestPpn(@RequestParam(name = "provider") String provider, @Request
@RequestParam(name = "first_author", required = false) String firstAuthor, @RequestParam(name = "force", required = false) Boolean force) throws IOException, IllegalPpnException {
try {
LigneKbartDto ligneKbartDto = new LigneKbartDto();
ligneKbartDto.setPublication_type(publicationType);
ligneKbartDto.setPublication_title((publicationTitle != null) ? publicationTitle : "");
ligneKbartDto.setOnline_identifier((onlineIdentifier != null) ? onlineIdentifier : "");
ligneKbartDto.setPrint_identifier((printIdentifier != null) ? printIdentifier : "");
ligneKbartDto.setTitle_url((doi != null) ? doi : "");
ligneKbartDto.setDate_monograph_published_print((dateMonographPublishedPrint != null) ? dateMonographPublishedPrint : "");
ligneKbartDto.setDate_monograph_published_online((dateMonographPublishedOnline != null) ? dateMonographPublishedOnline : "");
ligneKbartDto.setFirst_author((firstAuthor != null) ? firstAuthor : "");
ligneKbartDto.setPublicationType(publicationType);
ligneKbartDto.setPublicationTitle((publicationTitle != null) ? publicationTitle : "");
ligneKbartDto.setOnlineIdentifier((onlineIdentifier != null) ? onlineIdentifier : "");
ligneKbartDto.setPrintIdentifier((printIdentifier != null) ? printIdentifier : "");
ligneKbartDto.setTitleUrl((doi != null) ? doi : "");
ligneKbartDto.setDateMonographPublishedPrint((dateMonographPublishedPrint != null) ? dateMonographPublishedPrint : "");
ligneKbartDto.setDateMonographPublishedOnline((dateMonographPublishedOnline != null) ? dateMonographPublishedOnline : "");
ligneKbartDto.setFirstAuthor((firstAuthor != null) ? firstAuthor : "");
boolean injectKafka = (force != null) ? force : false;
return service.getBestPpn(ligneKbartDto, provider, injectKafka).getPpn();
} catch (URISyntaxException e) {
Expand Down
92 changes: 60 additions & 32 deletions src/main/java/fr/abes/bestppn/dto/kafka/LigneKbartDto.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fr.abes.bestppn.dto.kafka;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.opencsv.bean.CsvBindByName;
import com.opencsv.bean.CsvBindByPosition;
import lombok.Data;
Expand All @@ -11,80 +12,105 @@
public class LigneKbartDto {
@CsvBindByName(column = "publication_title")
@CsvBindByPosition(position = 0)
private String publication_title;
@JsonProperty("publication_title")
private String publicationTitle;
@CsvBindByName(column = "print_identifier")
@CsvBindByPosition(position = 1)
private String print_identifier;
@JsonProperty("print_identifier")
private String printIdentifier;
@CsvBindByName(column = "online_identifier")
@CsvBindByPosition(position = 2)
private String online_identifier;
@JsonProperty("online_identifier")
private String onlineIdentifier;
@CsvBindByName(column = "date_first_issue_online")
@CsvBindByPosition(position = 3)
private String date_first_issue_online;
@JsonProperty("date_first_issue_online")
private String dateFirstIssueOnline;
@CsvBindByName(column = "num_first_vol_online")
@CsvBindByPosition(position = 4)
private Integer num_first_vol_online;
@JsonProperty("num_first_vol_online")
private Integer numFirstVolOnline;
@CsvBindByName(column = "num_first_issue_online")
@CsvBindByPosition(position = 5)
private Integer num_first_issue_online;
@JsonProperty("num_first_issue_online")
private Integer numFirstIssueOnline;
@CsvBindByName(column = "date_last_issue_online")
@CsvBindByPosition(position = 6)
private String date_last_issue_online;
@JsonProperty("date_last_issue_online")
private String dateLastIssueOnline;
@CsvBindByName(column = "num_last_vol_online")
@CsvBindByPosition(position = 7)
private Integer num_last_vol_online;
@JsonProperty("num_last_vol_online")
private Integer numLastVolOnline;
@CsvBindByName(column = "num_last_issue_online")
@CsvBindByPosition(position = 8)
private Integer num_last_issue_online;
@JsonProperty("num_last_issue_online")
private Integer numLastIssueOnline;
@CsvBindByName(column = "title_url")
@CsvBindByPosition(position = 9)
private String title_url;
@JsonProperty("title_url")
private String titleUrl;
@CsvBindByName(column = "first_author")
@CsvBindByPosition(position = 10)
private String first_author;
@JsonProperty("first_author")
private String firstAuthor;
@CsvBindByName(column = "title_id")
@CsvBindByPosition(position = 11)
private String title_id;
@JsonProperty("title_id")
private String titleId;
@CsvBindByName(column = "embargo_info")
@CsvBindByPosition(position = 12)
private String embargo_info;
@JsonProperty("embargo_info")
private String embargoInfo;
@CsvBindByName(column = "coverage_depth")
@CsvBindByPosition(position = 13)
private String coverage_depth;
@JsonProperty("coverage_depth")
private String coverageDepth;
@CsvBindByName(column = "notes")
@CsvBindByPosition(position = 14)
@JsonProperty("notes")
private String notes;
@CsvBindByName(column = "publisher_name")
@CsvBindByPosition(position = 15)
private String publisher_name;
@JsonProperty("publisher_name")
private String publisherName;
@CsvBindByName(column = "publication_type")
@CsvBindByPosition(position = 16)
private String publication_type;
@JsonProperty("publication_type")
private String publicationType;
@CsvBindByName(column = "date_monograph_published_print")
@CsvBindByPosition(position = 17)
private String date_monograph_published_print;
@JsonProperty("date_monograph_published_print")
private String dateMonographPublishedPrint;
@CsvBindByName(column = "date_monograph_published_online")
@CsvBindByPosition(position = 18)
private String date_monograph_published_online;
@JsonProperty("date_monograph_published_online")
private String dateMonographPublishedOnline;
@CsvBindByName(column = "monograph_volume")
@CsvBindByPosition(position = 19)
private Integer monograph_volume;
@JsonProperty("monograph_volume")
private Integer monographVolume;
@CsvBindByName(column = "monograph_edition")
@CsvBindByPosition(position = 20)
private String monograph_edition;
@JsonProperty("monograph_edition")
private String monographEdition;
@CsvBindByName(column = "first_editor")
@CsvBindByPosition(position = 21)
private String first_editor;
@JsonProperty("first_editor")
private String firstEditor;
@CsvBindByName(column = "parent_publication_title_id")
@CsvBindByPosition(position = 22)
private String parent_publication_title_id;
@JsonProperty("parent_publication_title_id")
private String parentPublicationTitleId;
@CsvBindByName(column = "preceding_publication_title_id")
@CsvBindByPosition(position = 23)
private String preceding_publication_title_id;
@JsonProperty("preceding_publication_title_id")
private String precedingPublicationTitleId;
@CsvBindByName(column = "access_type")
@CsvBindByPosition(position = 24)
private String access_type;
@CsvBindByName(column = "access_type")
@JsonProperty("access_type")
private String accessType;
@CsvBindByName(column = "bestPpn")
@CsvBindByPosition(position = 25)
private String bestPpn;

Expand All @@ -96,14 +122,16 @@ public class LigneKbartDto {

@Override
public int hashCode() {
return this.publication_title.hashCode() * this.online_identifier.hashCode() * this.print_identifier.hashCode();
return this.publicationTitle.hashCode() * this.onlineIdentifier.hashCode() * this.printIdentifier.hashCode();
}

@Override
public String toString() {
return "publication title : " + this.publication_title + " / publication_type : " + this.publication_type +
(this.online_identifier.isEmpty() ? "" : " / online_identifier : " + this.online_identifier) +
(this.print_identifier.isEmpty() ? "" : " / print_identifier : " + this.print_identifier);
if (this.publicationTitle != null)
return "publication title : " + this.publicationTitle + " / publication_type : " + this.publicationType +
(this.onlineIdentifier.isEmpty() ? "" : " / online_identifier : " + this.onlineIdentifier) +
(this.printIdentifier.isEmpty() ? "" : " / print_identifier : " + this.printIdentifier);
return "";
}

@JsonIgnore
Expand All @@ -113,16 +141,16 @@ public boolean isBestPpnEmpty() {

@JsonIgnore
public String getAuthor() {
return (!this.first_author.isEmpty()) ? this.first_author : this.first_editor;
return (!this.firstAuthor.isEmpty()) ? this.firstAuthor : this.firstEditor;
}

@JsonIgnore
public String getAnneeFromDate_monograph_published_print() {
return this.date_monograph_published_print;
return this.dateMonographPublishedPrint;
}

@JsonIgnore
public String getAnneeFromDate_monograph_published_online() {
return this.date_monograph_published_online;
return this.dateMonographPublishedOnline;
}
}
9 changes: 8 additions & 1 deletion src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class TopicConsumer {

private final List<PpnKbartProviderDto> ppnToCreate = new ArrayList<>();

private final List<LigneKbartDto> ppnFromKbartToCreate = new ArrayList<>();

private final PackageKbartDto mailAttachment = new PackageKbartDto();

private final ProviderPackageRepository providerPackageRepository;
Expand All @@ -63,7 +65,7 @@ public class TopicConsumer {
* Listener Kafka qui écoute un topic et récupère les messages dès qu'ils y arrivent.
* @param lignesKbart message kafka récupéré par le Consumer Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "lignesKbart", containerFactory = "kafkaKbartListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.kbart}"}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory")
public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {
try {
String filename = "";
Expand Down Expand Up @@ -106,6 +108,7 @@ public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {

producer.sendKbart(kbartToSend, lignesKbart.headers());
producer.sendPrintNotice(ppnToCreate, lignesKbart.headers());
producer.sendPpnExNihilo(ppnFromKbartToCreate, lignesKbart.headers());
} else {
isOnError = false;
}
Expand All @@ -114,6 +117,7 @@ public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {
serviceMail.sendMailWithAttachment(filename,mailAttachment);
kbartToSend.clear();
ppnToCreate.clear();
ppnFromKbartToCreate.clear();
mailAttachment.clearKbartDto();
} else {
LigneKbartDto ligneFromKafka = mapper.readValue(lignesKbart.value(), LigneKbartDto.class);
Expand All @@ -128,6 +132,9 @@ public void listenKbartFromKafka(ConsumerRecord<String, String> lignesKbart) {
kbartToSend.add(ligneFromKafka);
}
case PRINT_PPN_SUDOC -> ppnToCreate.add(new PpnKbartProviderDto(ppnWithDestinationDto.getPpn(),ligneFromKafka,providerName));
case NO_PPN_FOUND_SUDOC -> {
if (ligneFromKafka.getPublicationType().equals("monograph")) ppnFromKbartToCreate.add(ligneFromKafka);
}
}
} else {
log.info("Bestppn déjà existant sur la ligne : " + nbLine + ", le voici : " + ligneFromKafka.getBestPpn());
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/fr/abes/bestppn/kafka/TopicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class TopicProducer {
@Value("${topic.name.target.noticeimprime}")
private String topicNoticeImprimee;

@Value("${topic.name.target.ppnFromKbart}")
private String topicKbartPpnToCreate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

Expand All @@ -44,15 +47,24 @@ public void sendKbart(List<LigneKbartDto> kbart, Headers headers) throws JsonPro
}
setHeadersAndSend(headers, mapper.writeValueAsString(ligne), topicKbart);
}
log.debug("message envoyé vers {}", topicKbart);
}



@Transactional(transactionManager = "kafkaTransactionManager")
public void sendPrintNotice(List<PpnKbartProviderDto> ppnKbartProviderDtoList, Headers headers) throws JsonProcessingException {
for (PpnKbartProviderDto ppnToCreate : ppnKbartProviderDtoList) {
setHeadersAndSend(headers, mapper.writeValueAsString(ppnToCreate), topicNoticeImprimee);
}
log.debug("message envoyé vers {}", topicNoticeImprimee);
}

@Transactional(transactionManager = "kafkaTransactionManager")
public void sendPpnExNihilo(List<LigneKbartDto> ppnFromKbartToCreate, Headers headers) throws JsonProcessingException {
for (LigneKbartDto ligne : ppnFromKbartToCreate) {
setHeadersAndSend(headers, mapper.writeValueAsString(ligne), topicKbartPpnToCreate);
}
log.debug("message envoyé vers {}", topicKbartPpnToCreate);
}

private void setHeadersAndSend(Headers headers, String value, String topic) {
Expand All @@ -65,4 +77,5 @@ private void setHeadersAndSend(Headers headers, String value, String topic) {
Message<String> message = messageBuilder.build();
kafkaTemplate.send(message);
}

}
Loading
Loading