diff --git a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java index 5a68edc..2be867e 100644 --- a/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java +++ b/src/main/java/fr/abes/bestppn/kafka/TopicProducer.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -59,12 +58,14 @@ public class TopicProducer { private ExecutorService executorService; private UtilsMapper utilsMapper; + private AtomicInteger lastThreadUsed; @Autowired public TopicProducer(KafkaTemplate kafkaTemplateConnect, KafkaTemplate kafkaTemplateImprime, UtilsMapper utilsMapper) { this.kafkaTemplateConnect = kafkaTemplateConnect; this.kafkaTemplateImprime = kafkaTemplateImprime; this.utilsMapper = utilsMapper; + this.lastThreadUsed = new AtomicInteger(0); } @PostConstruct @@ -85,9 +86,10 @@ public void sendKbart(List kbart, ProviderPackage provider, Strin /** * Méthode d'nvoi d'une ligne kbart vers topic kafka si option bypass activée - * @param kbart : ligne kbart à envoyer - * @param provider : provider - * @param filename : nom du fichier du traitement en cours + * + * @param kbart : ligne kbart à envoyer + * @param provider : provider + * @param filename : nom du fichier du traitement en cours */ public void sendBypassToLoad(List kbart, ProviderPackage provider, String filename) { sendToTopic(kbart, provider, filename, topicKbartBypassToload); @@ -95,19 +97,22 @@ public void sendBypassToLoad(List kbart, ProviderPackage provider private void sendToTopic(List kbart, ProviderPackage provider, String filename, String destinationTopic) { Integer nbLigneTotal = kbart.size(); + List
headerList = new ArrayList<>(); + headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes())); AtomicInteger index = new AtomicInteger(0); + //construction de la liste des lignes à envoyer dans le topic + List lignesToSend = new ArrayList<>(); for (LigneKbartDto ligneKbartDto : kbart) { ligneKbartDto.setIdProviderPackage(provider.getIdProviderPackage()); ligneKbartDto.setProviderPackagePackage(provider.getPackageName()); ligneKbartDto.setProviderPackageDateP(provider.getDateP()); ligneKbartDto.setProviderPackageIdtProvider(provider.getProviderIdtProvider()); - LigneKbartConnect ligneKbartConnect = utilsMapper.map(ligneKbartDto, LigneKbartConnect.class); - List
headerList = new ArrayList<>(); + lignesToSend.add(utilsMapper.map(ligneKbartDto, LigneKbartConnect.class)); + } + lignesToSend.forEach(ligneKbartConnect -> { executorService.execute(() -> { - headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLigneTotal).getBytes())); - ProducerRecord record = new ProducerRecord<>(destinationTopic, new Random().nextInt(nbThread), filename+"_"+index.getAndIncrement(), ligneKbartConnect, headerList); + ProducerRecord record = new ProducerRecord<>(destinationTopic, calculatePartition(this.nbThread), filename + "_" + index.getAndIncrement(), ligneKbartConnect, headerList); CompletableFuture> result = kafkaTemplateConnect.send(record); - assert result != null : "Result est null, donc exception"; result.whenComplete((sr, ex) -> { try { logEnvoi(result.get(), record); @@ -116,7 +121,7 @@ private void sendToTopic(List kbart, ProviderPackage provider, St } }); }); - } + }); } /** @@ -129,7 +134,7 @@ public void sendPrintNotice(List ligneKbartImprimes, String f Integer nbLigneTotal = ligneKbartImprimes.size(); int index = 0; for (LigneKbartImprime ppnToCreate : ligneKbartImprimes) { - sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename+"_"+(index++), nbLigneTotal); + sendNoticeImprime(ppnToCreate, topicNoticeImprimee, filename + "_" + (index++), nbLigneTotal); } if (!ligneKbartImprimes.isEmpty()) log.debug("message envoyé vers {}", topicNoticeImprimee); @@ -138,55 +143,64 @@ public void sendPrintNotice(List ligneKbartImprimes, String f /** * Méthode d'envoi d'une ligne Kbart pour création de notice ExNihilo + * * @param ppnFromKbartToCreate : liste de lignes kbart - * @param filename : nom du fichier à traiter + * @param filename : nom du fichier à traiter */ public void sendPpnExNihilo(List ppnFromKbartToCreate, ProviderPackage provider, String filename) { Integer nbLigneTotal = ppnFromKbartToCreate.size(); - int index = 0; + List lignesToSend = new ArrayList<>(); for (LigneKbartDto ligne : ppnFromKbartToCreate) { ligne.setIdProviderPackage(provider.getIdProviderPackage()); ligne.setProviderPackagePackage(provider.getPackageName()); ligne.setProviderPackageDateP(provider.getDateP()); ligne.setProviderPackageIdtProvider(provider.getProviderIdtProvider()); - sendNoticeExNihilo(ligne, topicKbartPpnToCreate, filename+"_"+(index++), nbLigneTotal); + lignesToSend.add(ligne); } + sendNoticeExNihilo(lignesToSend, topicKbartPpnToCreate, filename, nbLigneTotal); if (!ppnFromKbartToCreate.isEmpty()) log.debug("message envoyé vers {}", topicKbartPpnToCreate); } /** * Méthode envoyant un objet de notice imprimé sur un topic Kafka - * @param ligneKbartDto : ligne contenant la ligne kbart, et le provider - * @param topic : topic d'envoi de la ligne - * @param filemame : clé kafka de la ligne correspondant au nom de fichier + * + * @param lignesKbartDto : ligne contenant la ligne kbart, et le provider + * @param topic : topic d'envoi de la ligne + * @param filename : clé kafka de la ligne correspondant au nom de fichier */ - private void sendNoticeExNihilo(LigneKbartDto ligneKbartDto, String topic, String filemame, Integer nbLignesTotal) { + private void sendNoticeExNihilo(List lignesKbartDto, String topic, String filename, Integer nbLignesTotal) { List
headerList = new ArrayList<>(); - LigneKbartConnect ligne = utilsMapper.map(ligneKbartDto, LigneKbartConnect.class); - try { - headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes())); - ProducerRecord record = new ProducerRecord<>(topic, null, filemame, ligne, headerList); - final SendResult result = kafkaTemplateConnect.send(record).get(); - logEnvoi(result, record); - } catch (Exception e) { - String message = "Error sending message to topic " + topic; - throw new RuntimeException(message, e); - } + headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes())); + List lignesToSend = new ArrayList<>(); + lignesKbartDto.forEach(ligne -> lignesToSend.add(utilsMapper.map(lignesKbartDto, LigneKbartConnect.class))); + AtomicInteger index = new AtomicInteger(); + lignesToSend.forEach(ligne -> { + try { + ProducerRecord record = new ProducerRecord<>(topic, null, filename + "_" + index.getAndIncrement(), ligne, headerList); + final SendResult result = kafkaTemplateConnect.send(record).get(); + logEnvoi(result, record); + } catch (Exception e) { + String message = "Error sending message to topic " + topic; + throw new RuntimeException(message, e); + } + }); } /** * Méthode envoyant un objet de notice imprimé sur un topic Kafka - * @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider - * @param topic : topic d'envoi de la ligne - * @param key : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel + * + * @param ligne : ligne contenant la ligne kbart, le ppn de la notice imprimée et le provider + * @param topic : topic d'envoi de la ligne + * @param filename : clé kafka de la ligne correspondant au nom du fichier + numéro séquenciel * @param nbLignesTotal : nombre de lignes totales du fichier */ - private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String key, Integer nbLignesTotal) { + private void sendNoticeImprime(LigneKbartImprime ligne, String topic, String filename, Integer nbLignesTotal) { List
headerList = new ArrayList<>(); headerList.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesTotal).getBytes())); + AtomicInteger index = new AtomicInteger(0); try { - ProducerRecord record = new ProducerRecord<>(topic, null, key, ligne, headerList); + ProducerRecord record = new ProducerRecord<>(topic, null, filename + "_" + index.getAndIncrement(), ligne, headerList); final SendResult result = kafkaTemplateImprime.send(record).get(); logEnvoi(result, record); } catch (Exception e) { @@ -200,4 +214,17 @@ private void logEnvoi(SendResult result, ProducerRecord re log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d, headers=%s)", record.key(), record.value(), metadata.topic(), metadata.partition(), metadata.offset(), Stream.of(result.getProducerRecord().headers().toArray()).map(h -> h.key() + ":" + Arrays.toString(h.value())).collect(Collectors.joining(";")))); } + + public Integer calculatePartition(int nbPartitions) throws ArithmeticException { + + if (nbPartitions == 0) { + throw new ArithmeticException("Nombre de threads = 0"); + } + + if (lastThreadUsed.get() >= nbPartitions) { + lastThreadUsed.set(0); + } + + return lastThreadUsed.getAndIncrement(); + } }