Skip to content

Commit

Permalink
switch to scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Panos Karampis committed Oct 10, 2024
1 parent 138b78f commit 538b3d1
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 83 deletions.
152 changes: 72 additions & 80 deletions src/main/java/dk/panos/promofacie/TransactionListener.java
Original file line number Diff line number Diff line change
@@ -1,80 +1,72 @@
//package dk.panos.promofacie;
//
//import dk.panos.promofacie.kafka.KafkaVerificationService;
//import dk.panos.promofacie.redis.RedisVerificationService;
//import dk.panos.promofacie.redis.redis_model.Verification;
//import dk.panos.promofacie.service.BlockchainVerificationService;
//import io.quarkus.arc.profile.IfBuildProfile;
//import io.quarkus.runtime.StartupEvent;
//import io.smallrye.mutiny.Multi;
//import io.smallrye.mutiny.Uni;
//import io.smallrye.mutiny.tuples.Tuple2;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.enterprise.event.Observes;
//import jakarta.inject.Inject;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import java.time.Duration;
//import java.time.temporal.ChronoUnit;
//import java.util.function.Function;
//
//@ApplicationScoped
////@IfBuildProfile("prod")
//public class TransactionListener {
// private static Logger log = LoggerFactory.getLogger(TransactionListener.class);
// @Inject
// RedisVerificationService verificationService;
// @Inject
// BlockchainVerificationService blockchainVerificationService;
// @Inject
// KafkaVerificationService kafkaVerificationService;
//
// public void listen(@Observes StartupEvent ev) {
// Multi<Tuple2<Verification, Boolean>> transactionExistsCheck = verificationService.process()
// .onItem().invoke(items -> log.debug("I have the following items for verification {}", items))
// .toMulti()
// .onItem()
// .transformToIterable(Function.identity())
// .onItem()
// .invoke(verification -> log.debug("Picked {} for verification", verification))
// .onItem()
// .transformToUni(verification ->
// blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime())
// .onFailure().invoke(err -> log.error(err.getMessage()))
// .onFailure().retry().withBackOff(Duration.ofMillis(100))
// .atMost(100)
// .map(outcome -> Tuple2.of(verification, outcome))
// )
// .merge();
//
// Multi.createFrom().ticks().every(Duration.of(5, ChronoUnit.MINUTES))
// .onItem()
// .invoke(tick -> log.debug("Checking for new transactions on tick {}", tick)).onItem()
// .transformToMulti(tick -> transactionExistsCheck)
// .merge()
// .onItem()
// .transformToUni(outcome -> {
// Verification verification = outcome.getItem1();
// boolean success = outcome.getItem2();
// if (success) {
// log.debug("Sending success for {}", verification);
// return kafkaVerificationService.sendSuccess(verification);
// } else {
// return verificationService.queue(verification)
// .onItem().transformToUni(itemsAdded -> {
// if (itemsAdded > 0) {
// log.debug("Retrying {}", verification);
// return Uni.createFrom().voidItem();
// } else {
// log.debug("Sending failure for {} due to reached max amount of tries", verification);
// return kafkaVerificationService.sendFailure(verification);
// }
// });
// }
// }).merge()
// .onFailure().retry().indefinitely()
// .subscribe().with(nothing -> {
// });
// }
//}
package dk.panos.promofacie;

import dk.panos.promofacie.kafka.KafkaVerificationService;
import dk.panos.promofacie.redis.RedisVerificationService;
import dk.panos.promofacie.redis.redis_model.Verification;
import dk.panos.promofacie.service.BlockchainVerificationService;
import io.quarkus.arc.profile.IfBuildProfile;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.function.Function;

@ApplicationScoped
@IfBuildProfile("prod")
public class TransactionListener {
private static Logger log = LoggerFactory.getLogger(TransactionListener.class);
@Inject
RedisVerificationService verificationService;
@Inject
BlockchainVerificationService blockchainVerificationService;
@Inject
KafkaVerificationService kafkaVerificationService;

@Scheduled(every = "5m")
Uni<Void> listen() {
return verificationService.process()
.onItem().invoke(items -> log.debug("I have the following items for verification {}", items))
.toMulti()
.onItem()
.transformToIterable(Function.identity())
.onItem()
.invoke(verification -> log.debug("Picked {} for verification", verification))
.onItem()
.transformToUni(verification ->
blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime())
.onFailure().invoke(err -> log.error(err.getMessage()))
.onFailure().retry().withBackOff(Duration.ofMillis(100))
.atMost(100)
.map(outcome -> Tuple2.of(verification, outcome))
).merge(4).onItem()
.transformToUni(outcome -> {
Verification verification = outcome.getItem1();
boolean success = outcome.getItem2();
if (success) {
log.debug("Sending success for {}", verification);
return kafkaVerificationService.sendSuccess(verification);
} else {
return verificationService.queue(verification)
.onItem().transformToUni(itemsAdded -> {
if (itemsAdded > 0) {
log.debug("Retrying {}", verification);
return Uni.createFrom().voidItem();
} else {
log.debug("Sending failure for {} due to reached max amount of tries", verification);
return kafkaVerificationService.sendFailure(verification);
}
});
}
})
.concatenate().collect().asList().replaceWithVoid();
}
}
3 changes: 2 additions & 1 deletion src/main/java/dk/panos/promofacie/service/RoleService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dk.panos.promofacie.radix.RadixClient;
import dk.panos.promofacie.radix.model.AddressStateDetails;
import dk.panos.promofacie.radix.model.GetAddressDetails;
import io.quarkus.hibernate.reactive.panache.common.WithSession;
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -60,7 +61,7 @@ public RoleService(JDA discordAPI) {
}


@WithTransaction
@WithSession
public Uni<Void> applyRoles() {
return Wallet.<Wallet>listAll().onItem()
.transformToMulti(list -> Multi.createFrom().iterable(list))
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/dk/panos/promofacie/task/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ public class Scheduler {
@Inject
RoleService roleService;

@Scheduled(every = "10m")
@WithSession
@Scheduled(every = "5m")
Uni<Void> cronJobWithExpressionInConfig() {
log.debug("Scheduling with cron job");
return roleService.applyRoles().onItem()
Expand Down

0 comments on commit 538b3d1

Please sign in to comment.