From 138b78f72d80c370264822f881427b185eca3b69 Mon Sep 17 00:00:00 2001 From: Panos Karampis Date: Thu, 10 Oct 2024 23:28:38 +0200 Subject: [PATCH] disable tx listener --- .../panos/promofacie/TransactionListener.java | 160 +++++++++--------- .../dk/panos/promofacie/task/Scheduler.java | 5 +- src/main/resources/application.properties | 1 - 3 files changed, 83 insertions(+), 83 deletions(-) diff --git a/src/main/java/dk/panos/promofacie/TransactionListener.java b/src/main/java/dk/panos/promofacie/TransactionListener.java index 3a040bc..3083093 100644 --- a/src/main/java/dk/panos/promofacie/TransactionListener.java +++ b/src/main/java/dk/panos/promofacie/TransactionListener.java @@ -1,80 +1,80 @@ -package dk.panos.promofacie; - -import dk.panos.promofacie.kafka.KafkaVerificationService; -import dk.panos.promofacie.redis.RedisVerificationService; -import dk.panos.promofacie.redis.redis_model.Verification; -import dk.panos.promofacie.service.BlockchainVerificationService; -import io.quarkus.arc.profile.IfBuildProfile; -import io.quarkus.runtime.StartupEvent; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.tuples.Tuple2; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.function.Function; - -@ApplicationScoped -@IfBuildProfile("prod") -public class TransactionListener { - private static Logger log = LoggerFactory.getLogger(TransactionListener.class); - @Inject - RedisVerificationService verificationService; - @Inject - BlockchainVerificationService blockchainVerificationService; - @Inject - KafkaVerificationService kafkaVerificationService; - - public void listen(@Observes StartupEvent ev) { - Multi> transactionExistsCheck = verificationService.process() - .onItem().invoke(items -> log.debug("I have the following items for verification {}", items)) - .toMulti() - .onItem() - .transformToIterable(Function.identity()) - .onItem() - .invoke(verification -> log.debug("Picked {} for verification", verification)) - .onItem() - .transformToUni(verification -> - blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime()) - .onFailure().invoke(err -> log.error(err.getMessage())) - .onFailure().retry().withBackOff(Duration.ofMillis(100)) - .atMost(100) - .map(outcome -> Tuple2.of(verification, outcome)) - ) - .merge(); - - Multi.createFrom().ticks().every(Duration.of(5, ChronoUnit.MINUTES)) - .onItem() - .invoke(tick -> log.debug("Checking for new transactions on tick {}", tick)).onItem() - .transformToMulti(tick -> transactionExistsCheck) - .merge() - .onItem() - .transformToUni(outcome -> { - Verification verification = outcome.getItem1(); - boolean success = outcome.getItem2(); - if (success) { - log.debug("Sending success for {}", verification); - return kafkaVerificationService.sendSuccess(verification); - } else { - return verificationService.queue(verification) - .onItem().transformToUni(itemsAdded -> { - if (itemsAdded > 0) { - log.debug("Retrying {}", verification); - return Uni.createFrom().voidItem(); - } else { - log.debug("Sending failure for {} due to reached max amount of tries", verification); - return kafkaVerificationService.sendFailure(verification); - } - }); - } - }).merge() - .onFailure().retry().indefinitely() - .subscribe().with(nothing -> { - }); - } -} +//package dk.panos.promofacie; +// +//import dk.panos.promofacie.kafka.KafkaVerificationService; +//import dk.panos.promofacie.redis.RedisVerificationService; +//import dk.panos.promofacie.redis.redis_model.Verification; +//import dk.panos.promofacie.service.BlockchainVerificationService; +//import io.quarkus.arc.profile.IfBuildProfile; +//import io.quarkus.runtime.StartupEvent; +//import io.smallrye.mutiny.Multi; +//import io.smallrye.mutiny.Uni; +//import io.smallrye.mutiny.tuples.Tuple2; +//import jakarta.enterprise.context.ApplicationScoped; +//import jakarta.enterprise.event.Observes; +//import jakarta.inject.Inject; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.time.Duration; +//import java.time.temporal.ChronoUnit; +//import java.util.function.Function; +// +//@ApplicationScoped +////@IfBuildProfile("prod") +//public class TransactionListener { +// private static Logger log = LoggerFactory.getLogger(TransactionListener.class); +// @Inject +// RedisVerificationService verificationService; +// @Inject +// BlockchainVerificationService blockchainVerificationService; +// @Inject +// KafkaVerificationService kafkaVerificationService; +// +// public void listen(@Observes StartupEvent ev) { +// Multi> transactionExistsCheck = verificationService.process() +// .onItem().invoke(items -> log.debug("I have the following items for verification {}", items)) +// .toMulti() +// .onItem() +// .transformToIterable(Function.identity()) +// .onItem() +// .invoke(verification -> log.debug("Picked {} for verification", verification)) +// .onItem() +// .transformToUni(verification -> +// blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime()) +// .onFailure().invoke(err -> log.error(err.getMessage())) +// .onFailure().retry().withBackOff(Duration.ofMillis(100)) +// .atMost(100) +// .map(outcome -> Tuple2.of(verification, outcome)) +// ) +// .merge(); +// +// Multi.createFrom().ticks().every(Duration.of(5, ChronoUnit.MINUTES)) +// .onItem() +// .invoke(tick -> log.debug("Checking for new transactions on tick {}", tick)).onItem() +// .transformToMulti(tick -> transactionExistsCheck) +// .merge() +// .onItem() +// .transformToUni(outcome -> { +// Verification verification = outcome.getItem1(); +// boolean success = outcome.getItem2(); +// if (success) { +// log.debug("Sending success for {}", verification); +// return kafkaVerificationService.sendSuccess(verification); +// } else { +// return verificationService.queue(verification) +// .onItem().transformToUni(itemsAdded -> { +// if (itemsAdded > 0) { +// log.debug("Retrying {}", verification); +// return Uni.createFrom().voidItem(); +// } else { +// log.debug("Sending failure for {} due to reached max amount of tries", verification); +// return kafkaVerificationService.sendFailure(verification); +// } +// }); +// } +// }).merge() +// .onFailure().retry().indefinitely() +// .subscribe().with(nothing -> { +// }); +// } +//} diff --git a/src/main/java/dk/panos/promofacie/task/Scheduler.java b/src/main/java/dk/panos/promofacie/task/Scheduler.java index 150f189..4fc9618 100644 --- a/src/main/java/dk/panos/promofacie/task/Scheduler.java +++ b/src/main/java/dk/panos/promofacie/task/Scheduler.java @@ -2,6 +2,7 @@ import dk.panos.promofacie.service.RoleService; import io.quarkus.arc.profile.IfBuildProfile; +import io.quarkus.hibernate.reactive.panache.common.WithSession; import io.quarkus.hibernate.reactive.panache.common.WithTransaction; import io.quarkus.scheduler.Scheduled; import io.smallrye.mutiny.Uni; @@ -18,8 +19,8 @@ public class Scheduler { @Inject RoleService roleService; - @Scheduled(every = "5m") - @WithTransaction + @Scheduled(every = "10m") + @WithSession Uni cronJobWithExpressionInConfig() { log.debug("Scheduling with cron job"); return roleService.applyRoles().onItem() diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f43041b..7536466 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,4 +1,3 @@ -quarkus.log.category."dk.promofacie".level=${LOG_LEVEL:INFO} quarkus.rest-client.radix-api.url=https://mainnet.radixdlt.com quarkus.rest-client.radix-api.connect-timeout=150000 quarkus.rest-client.radix-api.read-timeout=150000