Skip to content

Commit

Permalink
disable tx listener
Browse files Browse the repository at this point in the history
  • Loading branch information
Panos Karampis committed Oct 10, 2024
1 parent 5137882 commit 138b78f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 83 deletions.
160 changes: 80 additions & 80 deletions src/main/java/dk/panos/promofacie/TransactionListener.java
Original file line number Diff line number Diff line change
@@ -1,80 +1,80 @@
package dk.panos.promofacie;

import dk.panos.promofacie.kafka.KafkaVerificationService;
import dk.panos.promofacie.redis.RedisVerificationService;
import dk.panos.promofacie.redis.redis_model.Verification;
import dk.panos.promofacie.service.BlockchainVerificationService;
import io.quarkus.arc.profile.IfBuildProfile;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.function.Function;

@ApplicationScoped
@IfBuildProfile("prod")
public class TransactionListener {
private static Logger log = LoggerFactory.getLogger(TransactionListener.class);
@Inject
RedisVerificationService verificationService;
@Inject
BlockchainVerificationService blockchainVerificationService;
@Inject
KafkaVerificationService kafkaVerificationService;

public void listen(@Observes StartupEvent ev) {
Multi<Tuple2<Verification, Boolean>> transactionExistsCheck = verificationService.process()
.onItem().invoke(items -> log.debug("I have the following items for verification {}", items))
.toMulti()
.onItem()
.transformToIterable(Function.identity())
.onItem()
.invoke(verification -> log.debug("Picked {} for verification", verification))
.onItem()
.transformToUni(verification ->
blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime())
.onFailure().invoke(err -> log.error(err.getMessage()))
.onFailure().retry().withBackOff(Duration.ofMillis(100))
.atMost(100)
.map(outcome -> Tuple2.of(verification, outcome))
)
.merge();

Multi.createFrom().ticks().every(Duration.of(5, ChronoUnit.MINUTES))
.onItem()
.invoke(tick -> log.debug("Checking for new transactions on tick {}", tick)).onItem()
.transformToMulti(tick -> transactionExistsCheck)
.merge()
.onItem()
.transformToUni(outcome -> {
Verification verification = outcome.getItem1();
boolean success = outcome.getItem2();
if (success) {
log.debug("Sending success for {}", verification);
return kafkaVerificationService.sendSuccess(verification);
} else {
return verificationService.queue(verification)
.onItem().transformToUni(itemsAdded -> {
if (itemsAdded > 0) {
log.debug("Retrying {}", verification);
return Uni.createFrom().voidItem();
} else {
log.debug("Sending failure for {} due to reached max amount of tries", verification);
return kafkaVerificationService.sendFailure(verification);
}
});
}
}).merge()
.onFailure().retry().indefinitely()
.subscribe().with(nothing -> {
});
}
}
//package dk.panos.promofacie;
//
//import dk.panos.promofacie.kafka.KafkaVerificationService;
//import dk.panos.promofacie.redis.RedisVerificationService;
//import dk.panos.promofacie.redis.redis_model.Verification;
//import dk.panos.promofacie.service.BlockchainVerificationService;
//import io.quarkus.arc.profile.IfBuildProfile;
//import io.quarkus.runtime.StartupEvent;
//import io.smallrye.mutiny.Multi;
//import io.smallrye.mutiny.Uni;
//import io.smallrye.mutiny.tuples.Tuple2;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.enterprise.event.Observes;
//import jakarta.inject.Inject;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import java.time.Duration;
//import java.time.temporal.ChronoUnit;
//import java.util.function.Function;
//
//@ApplicationScoped
////@IfBuildProfile("prod")
//public class TransactionListener {
// private static Logger log = LoggerFactory.getLogger(TransactionListener.class);
// @Inject
// RedisVerificationService verificationService;
// @Inject
// BlockchainVerificationService blockchainVerificationService;
// @Inject
// KafkaVerificationService kafkaVerificationService;
//
// public void listen(@Observes StartupEvent ev) {
// Multi<Tuple2<Verification, Boolean>> transactionExistsCheck = verificationService.process()
// .onItem().invoke(items -> log.debug("I have the following items for verification {}", items))
// .toMulti()
// .onItem()
// .transformToIterable(Function.identity())
// .onItem()
// .invoke(verification -> log.debug("Picked {} for verification", verification))
// .onItem()
// .transformToUni(verification ->
// blockchainVerificationService.transactionExists(verification.address(), verification.discordId(), verification.dateTime())
// .onFailure().invoke(err -> log.error(err.getMessage()))
// .onFailure().retry().withBackOff(Duration.ofMillis(100))
// .atMost(100)
// .map(outcome -> Tuple2.of(verification, outcome))
// )
// .merge();
//
// Multi.createFrom().ticks().every(Duration.of(5, ChronoUnit.MINUTES))
// .onItem()
// .invoke(tick -> log.debug("Checking for new transactions on tick {}", tick)).onItem()
// .transformToMulti(tick -> transactionExistsCheck)
// .merge()
// .onItem()
// .transformToUni(outcome -> {
// Verification verification = outcome.getItem1();
// boolean success = outcome.getItem2();
// if (success) {
// log.debug("Sending success for {}", verification);
// return kafkaVerificationService.sendSuccess(verification);
// } else {
// return verificationService.queue(verification)
// .onItem().transformToUni(itemsAdded -> {
// if (itemsAdded > 0) {
// log.debug("Retrying {}", verification);
// return Uni.createFrom().voidItem();
// } else {
// log.debug("Sending failure for {} due to reached max amount of tries", verification);
// return kafkaVerificationService.sendFailure(verification);
// }
// });
// }
// }).merge()
// .onFailure().retry().indefinitely()
// .subscribe().with(nothing -> {
// });
// }
//}
5 changes: 3 additions & 2 deletions src/main/java/dk/panos/promofacie/task/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dk.panos.promofacie.service.RoleService;
import io.quarkus.arc.profile.IfBuildProfile;
import io.quarkus.hibernate.reactive.panache.common.WithSession;
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.mutiny.Uni;
Expand All @@ -18,8 +19,8 @@ public class Scheduler {
@Inject
RoleService roleService;

@Scheduled(every = "5m")
@WithTransaction
@Scheduled(every = "10m")
@WithSession
Uni<Void> cronJobWithExpressionInConfig() {
log.debug("Scheduling with cron job");
return roleService.applyRoles().onItem()
Expand Down
1 change: 0 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
quarkus.log.category."dk.promofacie".level=${LOG_LEVEL:INFO}
quarkus.rest-client.radix-api.url=https://mainnet.radixdlt.com
quarkus.rest-client.radix-api.connect-timeout=150000
quarkus.rest-client.radix-api.read-timeout=150000
Expand Down

0 comments on commit 138b78f

Please sign in to comment.