diff --git a/src/main/java/dk/panos/promofacie/TransactionListener.java b/src/main/java/dk/panos/promofacie/TransactionListener.java index 3083093..a787153 100644 --- a/src/main/java/dk/panos/promofacie/TransactionListener.java +++ b/src/main/java/dk/panos/promofacie/TransactionListener.java @@ -1,80 +1,72 @@ -//package dk.panos.promofacie; -// -//import dk.panos.promofacie.kafka.KafkaVerificationService; -//import dk.panos.promofacie.redis.RedisVerificationService; -//import dk.panos.promofacie.redis.redis_model.Verification; -//import dk.panos.promofacie.service.BlockchainVerificationService; -//import io.quarkus.arc.profile.IfBuildProfile; -//import io.quarkus.runtime.StartupEvent; -//import io.smallrye.mutiny.Multi; -//import io.smallrye.mutiny.Uni; -//import io.smallrye.mutiny.tuples.Tuple2; -//import jakarta.enterprise.context.ApplicationScoped; -//import jakarta.enterprise.event.Observes; -//import jakarta.inject.Inject; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.time.Duration; -//import java.time.temporal.ChronoUnit; -//import java.util.function.Function; -// -//@ApplicationScoped -////@IfBuildProfile("prod") -//public class TransactionListener { -// private static Logger log = LoggerFactory.getLogger(TransactionListener.class); -// @Inject -// RedisVerificationService verificationService; -// @Inject -// BlockchainVerificationService blockchainVerificationService; -// @Inject -// KafkaVerificationService kafkaVerificationService; -// -// public void listen(@Observes StartupEvent ev) { -// Multi> transactionExistsCheck = verificationService.process() -// .onItem().invoke(items -> log.debug("I have the following items for verification {}", items)) -// .toMulti() -// .onItem() -// .transformToIterable(Function.identity()) -// .onItem() -// .invoke(verification -> log.debug("Picked {} for verification", verification)) -// .onItem() -// .transformToUni(verification -> -// blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime()) -// .onFailure().invoke(err -> log.error(err.getMessage())) -// .onFailure().retry().withBackOff(Duration.ofMillis(100)) -// .atMost(100) -// .map(outcome -> Tuple2.of(verification, outcome)) -// ) -// .merge(); -// -// Multi.createFrom().ticks().every(Duration.of(5, ChronoUnit.MINUTES)) -// .onItem() -// .invoke(tick -> log.debug("Checking for new transactions on tick {}", tick)).onItem() -// .transformToMulti(tick -> transactionExistsCheck) -// .merge() -// .onItem() -// .transformToUni(outcome -> { -// Verification verification = outcome.getItem1(); -// boolean success = outcome.getItem2(); -// if (success) { -// log.debug("Sending success for {}", verification); -// return kafkaVerificationService.sendSuccess(verification); -// } else { -// return verificationService.queue(verification) -// .onItem().transformToUni(itemsAdded -> { -// if (itemsAdded > 0) { -// log.debug("Retrying {}", verification); -// return Uni.createFrom().voidItem(); -// } else { -// log.debug("Sending failure for {} due to reached max amount of tries", verification); -// return kafkaVerificationService.sendFailure(verification); -// } -// }); -// } -// }).merge() -// .onFailure().retry().indefinitely() -// .subscribe().with(nothing -> { -// }); -// } -//} +package dk.panos.promofacie; + +import dk.panos.promofacie.kafka.KafkaVerificationService; +import dk.panos.promofacie.redis.RedisVerificationService; +import dk.panos.promofacie.redis.redis_model.Verification; +import dk.panos.promofacie.service.BlockchainVerificationService; +import io.quarkus.arc.profile.IfBuildProfile; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.scheduler.Scheduled; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.tuples.Tuple2; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.function.Function; + +@ApplicationScoped +@IfBuildProfile("prod") +public class TransactionListener { + private static Logger log = LoggerFactory.getLogger(TransactionListener.class); + @Inject + RedisVerificationService verificationService; + @Inject + BlockchainVerificationService blockchainVerificationService; + @Inject + KafkaVerificationService kafkaVerificationService; + + @Scheduled(every = "5m") + Uni listen() { + return verificationService.process() + .onItem().invoke(items -> log.debug("I have the following items for verification {}", items)) + .toMulti() + .onItem() + .transformToIterable(Function.identity()) + .onItem() + .invoke(verification -> log.debug("Picked {} for verification", verification)) + .onItem() + .transformToUni(verification -> + blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime()) + .onFailure().invoke(err -> log.error(err.getMessage())) + .onFailure().retry().withBackOff(Duration.ofMillis(100)) + .atMost(100) + .map(outcome -> Tuple2.of(verification, outcome)) + ).merge(4).onItem() + .transformToUni(outcome -> { + Verification verification = outcome.getItem1(); + boolean success = outcome.getItem2(); + if (success) { + log.debug("Sending success for {}", verification); + return kafkaVerificationService.sendSuccess(verification); + } else { + return verificationService.queue(verification) + .onItem().transformToUni(itemsAdded -> { + if (itemsAdded > 0) { + log.debug("Retrying {}", verification); + return Uni.createFrom().voidItem(); + } else { + log.debug("Sending failure for {} due to reached max amount of tries", verification); + return kafkaVerificationService.sendFailure(verification); + } + }); + } + }) + .concatenate().collect().asList().replaceWithVoid(); + } +} diff --git a/src/main/java/dk/panos/promofacie/service/RoleService.java b/src/main/java/dk/panos/promofacie/service/RoleService.java index 7a8a01e..53f42e5 100644 --- a/src/main/java/dk/panos/promofacie/service/RoleService.java +++ b/src/main/java/dk/panos/promofacie/service/RoleService.java @@ -4,6 +4,7 @@ import dk.panos.promofacie.radix.RadixClient; import dk.panos.promofacie.radix.model.AddressStateDetails; import dk.panos.promofacie.radix.model.GetAddressDetails; +import io.quarkus.hibernate.reactive.panache.common.WithSession; import io.quarkus.hibernate.reactive.panache.common.WithTransaction; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -60,7 +61,7 @@ public RoleService(JDA discordAPI) { } - @WithTransaction + @WithSession public Uni applyRoles() { return Wallet.listAll().onItem() .transformToMulti(list -> Multi.createFrom().iterable(list)) diff --git a/src/main/java/dk/panos/promofacie/task/Scheduler.java b/src/main/java/dk/panos/promofacie/task/Scheduler.java index 4fc9618..081b82d 100644 --- a/src/main/java/dk/panos/promofacie/task/Scheduler.java +++ b/src/main/java/dk/panos/promofacie/task/Scheduler.java @@ -19,8 +19,7 @@ public class Scheduler { @Inject RoleService roleService; - @Scheduled(every = "10m") - @WithSession + @Scheduled(every = "5m") Uni cronJobWithExpressionInConfig() { log.debug("Scheduling with cron job"); return roleService.applyRoles().onItem()