Skip to content

Commit

Permalink
CDE-427 : suppression fonction random pour sélection de la partition
Browse files Browse the repository at this point in the history
Optimisation des méthodes d'envois à Kafka : construction des objets et des listes en dehors des boucles d'envoi
  • Loading branch information
pierre-maraval committed Aug 30, 2024
1 parent 6193066 commit 2da9787
Showing 1 changed file with 60 additions and 33 deletions.
93 changes: 60 additions & 33 deletions src/main/java/fr/abes/bestppn/kafka/TopicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -59,12 +58,14 @@ public class TopicProducer {
private ExecutorService executorService;

private UtilsMapper utilsMapper;
private AtomicInteger lastThreadUsed;

@Autowired
public TopicProducer(KafkaTemplate<String, LigneKbartConnect> kafkaTemplateConnect, KafkaTemplate<String, LigneKbartImprime> kafkaTemplateImprime, UtilsMapper utilsMapper) {
this.kafkaTemplateConnect = kafkaTemplateConnect;
this.kafkaTemplateImprime = kafkaTemplateImprime;
this.utilsMapper = utilsMapper;
this.lastThreadUsed = new AtomicInteger(0);
}

@PostConstruct
Expand All @@ -85,29 +86,33 @@ public void sendKbart(List<LigneKbartDto> kbart, ProviderPackage provider, Strin

/**
* Méthode d'nvoi d'une ligne kbart vers topic kafka si option bypass activée
* @param kbart : ligne kbart à envoyer
* @param provider : provider
* @param filename : nom du fichier du traitement en cours
*
* @param kbart : ligne kbart à envoyer
* @param provider : provider
* @param filename : nom du fichier du traitement en cours
*/
public void sendBypassToLoad(List<LigneKbartDto> kbart, ProviderPackage provider, String filename) {
sendToTopic(kbart, provider, filename, topicKbartBypassToload);
}

private void sendToTopic(List<LigneKbartDto> kbart, ProviderPackage provider, String filename, String destinationTopic) {
Integer nbLigneTotal = kbart.size();
List<Header> headerList = new ArrayList<>();
headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes()));
AtomicInteger index = new AtomicInteger(0);
//construction de la liste des lignes à envoyer dans le topic
List<LigneKbartConnect> lignesToSend = new ArrayList<>();
for (LigneKbartDto ligneKbartDto : kbart) {
ligneKbartDto.setIdProviderPackage(provider.getIdProviderPackage());
ligneKbartDto.setProviderPackagePackage(provider.getPackageName());
ligneKbartDto.setProviderPackageDateP(provider.getDateP());
ligneKbartDto.setProviderPackageIdtProvider(provider.getProviderIdtProvider());
LigneKbartConnect ligneKbartConnect = utilsMapper.map(ligneKbartDto, LigneKbartConnect.class);
List<Header> headerList = new ArrayList<>();
lignesToSend.add(utilsMapper.map(ligneKbartDto, LigneKbartConnect.class));
}
lignesToSend.forEach(ligneKbartConnect -> {
executorService.execute(() -> {
headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes()));
ProducerRecord<String, LigneKbartConnect> record = new ProducerRecord<>(destinationTopic, new Random().nextInt(nbThread), filename+"_"+index.getAndIncrement(), ligneKbartConnect, headerList);
ProducerRecord<String, LigneKbartConnect> record = new ProducerRecord<>(destinationTopic, calculatePartition(this.nbThread), filename + "_" + index.getAndIncrement(), ligneKbartConnect, headerList);
CompletableFuture<SendResult<String, LigneKbartConnect>> result = kafkaTemplateConnect.send(record);
assert result != null : "Result est null, donc exception";
result.whenComplete((sr, ex) -> {
try {
logEnvoi(result.get(), record);
Expand All @@ -116,7 +121,7 @@ private void sendToTopic(List<LigneKbartDto> kbart, ProviderPackage provider, St
}
});
});
}
});
}

/**
Expand All @@ -129,7 +134,7 @@ public void sendPrintNotice(List<LigneKbartImprime> ligneKbartImprimes, String f
Integer nbLigneTotal = ligneKbartImprimes.size();
int index = 0;
for (LigneKbartImprime ppnToCreate : ligneKbartImprimes) {
sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename+"_"+(index++), nbLigneTotal);
sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename + "_" + (index++), nbLigneTotal);
}
if (!ligneKbartImprimes.isEmpty())
log.debug("message envoyé vers {}", topicNoticeImprimee);
Expand All @@ -138,55 +143,64 @@ public void sendPrintNotice(List<LigneKbartImprime> ligneKbartImprimes, String f

/**
* Méthode d'envoi d'une ligne Kbart pour création de notice ExNihilo
*
* @param ppnFromKbartToCreate : liste de lignes kbart
* @param filename : nom du fichier à traiter
* @param filename : nom du fichier à traiter
*/
public void sendPpnExNihilo(List<LigneKbartDto> ppnFromKbartToCreate, ProviderPackage provider, String filename) {
Integer nbLigneTotal = ppnFromKbartToCreate.size();
int index = 0;
List<LigneKbartDto> lignesToSend = new ArrayList<>();
for (LigneKbartDto ligne : ppnFromKbartToCreate) {
ligne.setIdProviderPackage(provider.getIdProviderPackage());
ligne.setProviderPackagePackage(provider.getPackageName());
ligne.setProviderPackageDateP(provider.getDateP());
ligne.setProviderPackageIdtProvider(provider.getProviderIdtProvider());
sendNoticeExNihilo(ligne, topicKbartPpnToCreate, filename+"_"+(index++), nbLigneTotal);
lignesToSend.add(ligne);
}
sendNoticeExNihilo(lignesToSend, topicKbartPpnToCreate, filename, nbLigneTotal);
if (!ppnFromKbartToCreate.isEmpty())
log.debug("message envoyé vers {}", topicKbartPpnToCreate);
}

/**
* Méthode envoyant un objet de notice imprimé sur un topic Kafka
* @param ligneKbartDto : ligne contenant la ligne kbart, et le provider
* @param topic : topic d'envoi de la ligne
* @param filemame : clé kafka de la ligne correspondant au nom de fichier
*
* @param lignesKbartDto : ligne contenant la ligne kbart, et le provider
* @param topic : topic d'envoi de la ligne
* @param filename : clé kafka de la ligne correspondant au nom de fichier
*/
private void sendNoticeExNihilo(LigneKbartDto ligneKbartDto, String topic, String filemame, Integer nbLignesTotal) {
private void sendNoticeExNihilo(List<LigneKbartDto> lignesKbartDto, String topic, String filename, Integer nbLignesTotal) {
List<Header> headerList = new ArrayList<>();
LigneKbartConnect ligne = utilsMapper.map(ligneKbartDto, LigneKbartConnect.class);
try {
headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes()));
ProducerRecord<String, LigneKbartConnect> record = new ProducerRecord<>(topic, null, filemame, ligne, headerList);
final SendResult<String, LigneKbartConnect> result = kafkaTemplateConnect.send(record).get();
logEnvoi(result, record);
} catch (Exception e) {
String message = "Error sending message to topic " + topic;
throw new RuntimeException(message, e);
}
headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes()));
List<LigneKbartConnect> lignesToSend = new ArrayList<>();
lignesKbartDto.forEach(ligne -> lignesToSend.add(utilsMapper.map(lignesKbartDto, LigneKbartConnect.class)));
AtomicInteger index = new AtomicInteger();
lignesToSend.forEach(ligne -> {
try {
ProducerRecord<String, LigneKbartConnect> record = new ProducerRecord<>(topic, null, filename + "_" + index.getAndIncrement(), ligne, headerList);
final SendResult<String, LigneKbartConnect> result = kafkaTemplateConnect.send(record).get();
logEnvoi(result, record);
} catch (Exception e) {
String message = "Error sending message to topic " + topic;
throw new RuntimeException(message, e);
}
});
}

/**
* Méthode envoyant un objet de notice imprimé sur un topic Kafka
* @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider
* @param topic : topic d'envoi de la ligne
* @param key : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel
*
* @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider
* @param topic : topic d'envoi de la ligne
* @param filename : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel
* @param nbLignesTotal : nombre de lignes totales du fichier
*/
private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String key, Integer nbLignesTotal) {
private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String filename, Integer nbLignesTotal) {
List<Header> headerList = new ArrayList<>();
headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes()));
AtomicInteger index = new AtomicInteger(0);
try {
ProducerRecord<String, LigneKbartImprime> record = new ProducerRecord<>(topic, null, key, ligne, headerList);
ProducerRecord<String, LigneKbartImprime> record = new ProducerRecord<>(topic, null, filename + "_" + index.getAndIncrement(), ligne, headerList);
final SendResult<String, LigneKbartImprime> result = kafkaTemplateImprime.send(record).get();
logEnvoi(result, record);
} catch (Exception e) {
Expand All @@ -200,4 +214,17 @@ private void logEnvoi(SendResult<String, ?> result, ProducerRecord<String, ?> re
log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d, headers=%s)",
record.key(), record.value(), metadata.topic(), metadata.partition(), metadata.offset(), Stream.of(result.getProducerRecord().headers().toArray()).map(h -> h.key() + ":" + Arrays.toString(h.value())).collect(Collectors.joining(";"))));
}

public Integer calculatePartition(int nbPartitions) throws ArithmeticException {

if (nbPartitions == 0) {
throw new ArithmeticException("Nombre de threads = 0");
}

if (lastThreadUsed.get() >= nbPartitions) {
lastThreadUsed.set(0);
}

return lastThreadUsed.getAndIncrement();
}
}

0 comments on commit 2da9787

Please sign in to comment.