Skip to content

Commit

Permalink
vertex stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Panos Karampis committed Oct 10, 2024
1 parent 6f12677 commit 5137882
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
20 changes: 15 additions & 5 deletions src/main/java/dk/panos/promofacie/service/RoleService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.mutiny.tuples.Tuple3;
import io.smallrye.mutiny.vertx.MutinyHelper;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.mutiny.pgclient.PgPool;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -66,7 +70,8 @@ public Uni<Void> applyRoles() {
})
.onItem()
.transformToUni(wallet -> radixClient.getAddressStateDetails(new GetAddressDetails(List.of(wallet.getAddress())))
.onItem().transform(detail -> {
.onItem()
.transform(detail -> {
Optional<AddressStateDetails.ResourceItem> vaultHoldingDANOpt = detail.items().stream()
.flatMap(item -> item.fungibleResources().items().stream())
.filter(resourceItem -> resourceItem.explicitMetadata().items().stream()
Expand Down Expand Up @@ -128,15 +133,18 @@ public Uni<Void> applyRoles() {
}
Map<Role, List<Member>> additions = tuple2.getItem1();
Map<Role, List<Member>> removals = tuple2.getItem2();
Context context = Vertx.currentContext();
Set<Uni<Member>> addOperation = additions.entrySet().stream()
.flatMap(entry -> {
return entry.getValue().stream().map(member -> Uni
.createFrom()
.item(addRoles(entry.getKey(), member).get())
.onItem()
.delayIt().by(Duration.of(1, ChronoUnit.SECONDS))
.onItem()
.invoke(RestAction::queue)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) // Switch to a worker thread
.emitOn(MutinyHelper.executor(context))
.onItem()
.delayIt().by(Duration.of(500, ChronoUnit.MILLIS))
.onItem()
.transform(any -> member));
}).collect(Collectors.toSet());
Expand All @@ -147,9 +155,11 @@ public Uni<Void> applyRoles() {
.createFrom()
.item(removeRoles(entry.getKey(), member).get())
.onItem()
.delayIt().by(Duration.of(1, ChronoUnit.SECONDS))
.onItem()
.invoke(RestAction::queue)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) // Switch to a worker thread
.emitOn(MutinyHelper.executor(context))
.onItem()
.delayIt().by(Duration.of(500, ChronoUnit.MILLIS))
.onItem()
.transform(any -> member));
}).collect(Collectors.toSet());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/dk/panos/promofacie/task/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.slf4j.LoggerFactory;

@ApplicationScoped
@IfBuildProfile("prod")
//@IfBuildProfile("prod")
public class Scheduler {
private static final Logger log = LoggerFactory.getLogger(Scheduler.class);

Expand Down

0 comments on commit 5137882

Please sign in to comment.