Skip to content

Commit

Permalink
Implémentation Task
Browse files Browse the repository at this point in the history
  • Loading branch information
jvk88511334 committed Nov 29, 2023
1 parent 6b9e166 commit 8d64100
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 2 deletions.
87 changes: 87 additions & 0 deletions src/main/java/fr/abes/bestppn/kafka/Task.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package fr.abes.bestppn.kafka;

import fr.abes.bestppn.exception.BestPpnException;
import fr.abes.bestppn.exception.IllegalDoiException;
import fr.abes.bestppn.service.EmailService;
import fr.abes.bestppn.service.ExecutionReportService;
import fr.abes.bestppn.service.KbartService;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

public class Task implements Runnable {
private final List<ConsumerRecord<String, String>> lignesKbartRecords;
private final KbartService service;
private final String providerName;
private final boolean isForced;
private volatile boolean stopped = false;
private volatile boolean started = false;
private volatile boolean finished = false;
private final CompletableFuture<Long> completion = new CompletableFuture<>();
private final ReentrantLock startStopLock = new ReentrantLock();
private final AtomicLong currentOffset = new AtomicLong();
private Logger log = LoggerFactory.getLogger(Task.class);

public Task(List<ConsumerRecord<String, String>> records, KbartService service, String providerName, boolean isForced) {
this.lignesKbartRecords = records;
this.service = service;
this.providerName = providerName;
this.isForced = isForced;
}

@SneakyThrows
@Override
public void run() {
startStopLock.lock();
if (stopped){
return;
}
started = true;
startStopLock.unlock();

for (ConsumerRecord<String, String> record : lignesKbartRecords) {
if (stopped)
break;
// process record here and make sure you catch all exceptions;
this.service.processConsumerRecord(record, this.providerName, this.isForced);
currentOffset.set(record.offset() + 1);
}
finished = true;
completion.complete(currentOffset.get());
}

public long getCurrentOffset() {
return currentOffset.get();
}

public void stop() {
startStopLock.lock();
this.stopped = true;
if (!started) {
finished = true;
completion.complete(currentOffset.get());
}
startStopLock.unlock();
}

public long waitForCompletion() {
try {
return completion.get();
} catch (InterruptedException | ExecutionException e) {
return -1;
}
}

public boolean isFinished() {
return finished;
}
}
25 changes: 23 additions & 2 deletions src/main/java/fr/abes/bestppn/kafka/TopicConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -50,6 +53,9 @@ public class TopicConsumer {

private AtomicInteger nbLignesTraitees;

private final Map<TopicPartition, Task> activeTasks = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

public TopicConsumer(KbartService service, EmailService emailService, ProviderRepository providerRepository, ExecutionReportService executionReportService, LogFileService logFileService) {
this.service = service;
this.emailService = emailService;
Expand All @@ -73,7 +79,6 @@ public void initExecutor() {
@KafkaListener(topics = {"${topic.name.source.kbart}",}, groupId = "${topic.groupid.source.kbart}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
public void kbartFromkafkaListener(ConsumerRecord<String, String> lignesKbart) {
log.info("Paquet reçu : Partition : " + lignesKbart.partition() + " / offset " + lignesKbart.offset() + " / value : " + lignesKbart.value());

try {
//traitement de chaque ligne kbart
this.filename = extractFilenameFromHeader(lignesKbart.headers().toArray());
Expand Down Expand Up @@ -115,7 +120,6 @@ public void kbartFromkafkaListener(ConsumerRecord<String, String> lignesKbart) {
addDataError(e.getMessage());
}
}

private void handleFichier(int nbLines) {
executionReportService.setNbtotalLines(nbLines);
try {
Expand Down Expand Up @@ -144,6 +148,23 @@ private void handleFichier(int nbLines) {
}
}

//TODO https://github.com/inovatrend/mtc-demo/blob/master/src/main/java/com/inovatrend/mtcdemo/Task.java
//TODO https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/

private void handleFetchedRecords(ConsumerRecords<String, String> records, KbartService service, String providerName, boolean isForced) {
if (records.count() > 0) {
List<TopicPartition> partitionsToPause = new ArrayList<>();
records.partitions().forEach(partition -> {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
Task task = new Task(partitionRecords, service, providerName, isForced);
partitionsToPause.add(partition);
executorService.execute(task);
activeTasks.put(partition, task);
});
//TODO reprendre consumer.pause(partitionsToPause);
}
}

@KafkaListener(topics = {"${topic.name.source.kbart.errors}"}, groupId = "${topic.groupid.source.errors}", containerFactory = "kafkaKbartListenerContainerFactory")
public void errorsListener(ConsumerRecord<String, String> error) {
try {
Expand Down

0 comments on commit 8d64100

Please sign in to comment.