diff --git a/src/main/java/dk/panos/promofacie/service/RoleService.java b/src/main/java/dk/panos/promofacie/service/RoleService.java index 69a9c3b..7a8a01e 100644 --- a/src/main/java/dk/panos/promofacie/service/RoleService.java +++ b/src/main/java/dk/panos/promofacie/service/RoleService.java @@ -7,8 +7,12 @@ import io.quarkus.hibernate.reactive.panache.common.WithTransaction; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.tuples.Tuple2; import io.smallrye.mutiny.tuples.Tuple3; +import io.smallrye.mutiny.vertx.MutinyHelper; +import io.vertx.core.Context; +import io.vertx.core.Vertx; import io.vertx.mutiny.pgclient.PgPool; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -66,7 +70,8 @@ public Uni applyRoles() { }) .onItem() .transformToUni(wallet -> radixClient.getAddressStateDetails(new GetAddressDetails(List.of(wallet.getAddress()))) - .onItem().transform(detail -> { + .onItem() + .transform(detail -> { Optional vaultHoldingDANOpt = detail.items().stream() .flatMap(item -> item.fungibleResources().items().stream()) .filter(resourceItem -> resourceItem.explicitMetadata().items().stream() @@ -128,15 +133,18 @@ public Uni applyRoles() { } Map> additions = tuple2.getItem1(); Map> removals = tuple2.getItem2(); + Context context = Vertx.currentContext(); Set> addOperation = additions.entrySet().stream() .flatMap(entry -> { return entry.getValue().stream().map(member -> Uni .createFrom() .item(addRoles(entry.getKey(), member).get()) .onItem() - .delayIt().by(Duration.of(1, ChronoUnit.SECONDS)) - .onItem() .invoke(RestAction::queue) + .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) // Switch to a worker thread + .emitOn(MutinyHelper.executor(context)) + .onItem() + .delayIt().by(Duration.of(500, ChronoUnit.MILLIS)) .onItem() .transform(any -> member)); }).collect(Collectors.toSet()); @@ -147,9 +155,11 @@ public Uni applyRoles() { .createFrom() .item(removeRoles(entry.getKey(), member).get()) .onItem() - .delayIt().by(Duration.of(1, ChronoUnit.SECONDS)) - .onItem() .invoke(RestAction::queue) + .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) // Switch to a worker thread + .emitOn(MutinyHelper.executor(context)) + .onItem() + .delayIt().by(Duration.of(500, ChronoUnit.MILLIS)) .onItem() .transform(any -> member)); }).collect(Collectors.toSet()); diff --git a/src/main/java/dk/panos/promofacie/task/Scheduler.java b/src/main/java/dk/panos/promofacie/task/Scheduler.java index 462b09c..150f189 100644 --- a/src/main/java/dk/panos/promofacie/task/Scheduler.java +++ b/src/main/java/dk/panos/promofacie/task/Scheduler.java @@ -11,7 +11,7 @@ import org.slf4j.LoggerFactory; @ApplicationScoped -@IfBuildProfile("prod") +//@IfBuildProfile("prod") public class Scheduler { private static final Logger log = LoggerFactory.getLogger(Scheduler.class);