Skip to content

Commit

Permalink
[MODAUD-195]. Add worker pool size env vars to each consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
BKadirkhodjaev committed Nov 1, 2024
1 parent 0cc1dee commit c0a0784
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions mod-audit-server/src/main/java/org/folio/rest/impl/InitAPIs.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,23 @@ public class InitAPIs implements InitAPI {

@Value("${acq.orders.kafka.consumer.instancesNumber:1}")
private int acqOrderConsumerInstancesNumber;
@Value("${acq.orders.kafka.consumer.pool.size:5}")
private int acqOrderConsumerPoolSize;

@Value("${acq.order-lines.kafka.consumer.instancesNumber:1}")
private int acqOrderLineConsumerInstancesNumber;
@Value("${acq.order-lines.kafka.consumer.pool.size:5}")
private int acqOrderLineConsumerPoolSize;

@Value("${acq.pieces.kafka.consumer.instancesNumber:1}")
private int acqPieceConsumerInstancesNumber;
@Value("${acq.orders.kafka.consumer.pool.size:5}")
private int acqPieceConsumerPoolSize;

@Value("${acq.invoices.kafka.consumer.instancesNumber:1}")
private int acqInvoiceConsumerInstancesNumber;
@Value("${acq.orders.kafka.consumer.pool.size:5}")
private int acqInvoiceConsumerPoolSize;

@Override
public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> handler) {
Expand Down Expand Up @@ -70,21 +81,10 @@ private Future<?> deployConsumersVerticles(Vertx vertx) {
Promise<String> pieceEventsConsumer = Promise.promise();
Promise<String> invoiceEventsConsumer = Promise.promise();

vertx.deployVerticle(getVerticleName(verticleFactory, OrderEventConsumersVerticle.class),
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)
.setInstances(acqOrderConsumerInstancesNumber), orderEventsConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, OrderLineEventConsumersVerticle.class),
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)
.setInstances(acqOrderLineConsumerInstancesNumber), orderLineEventsConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, PieceEventConsumersVerticle.class),
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)
.setInstances(acqPieceConsumerInstancesNumber), pieceEventsConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, InvoiceEventConsumersVerticle.class),
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)
.setInstances(acqInvoiceConsumerInstancesNumber), invoiceEventsConsumer);
deployVerticle(vertx, verticleFactory, OrderEventConsumersVerticle.class, acqOrderConsumerInstancesNumber, acqOrderConsumerPoolSize, orderEventsConsumer);
deployVerticle(vertx, verticleFactory, OrderLineEventConsumersVerticle.class, acqOrderLineConsumerInstancesNumber, acqOrderLineConsumerPoolSize, orderLineEventsConsumer);
deployVerticle(vertx, verticleFactory, PieceEventConsumersVerticle.class, acqPieceConsumerInstancesNumber, acqPieceConsumerPoolSize, pieceEventsConsumer);
deployVerticle(vertx, verticleFactory, InvoiceEventConsumersVerticle.class, acqInvoiceConsumerInstancesNumber, acqInvoiceConsumerPoolSize, invoiceEventsConsumer);

LOGGER.info("deployConsumersVerticles:: All consumer verticles were successfully deployed");
return GenericCompositeFuture.all(Arrays.asList(
Expand All @@ -94,6 +94,13 @@ private Future<?> deployConsumersVerticles(Vertx vertx) {
invoiceEventsConsumer.future()));
}

private <T> void deployVerticle(Vertx vertx, VerticleFactory verticleFactory, Class<T> consumerClass,
int acqOrderConsumerInstancesNumber, int acqOrderConsumerPoolSize, Promise<String> orderEventsConsumer) {
DeploymentOptions deploymentOptions = new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)
.setInstances(acqOrderConsumerInstancesNumber).setWorkerPoolSize(acqOrderConsumerPoolSize);
vertx.deployVerticle(getVerticleName(verticleFactory, consumerClass), deploymentOptions, orderEventsConsumer);
}

private <T> String getVerticleName(VerticleFactory verticleFactory, Class<T> clazz) {
LOGGER.debug("getVerticleName:: Retrieving Verticle name");
return verticleFactory.prefix() + ":" + clazz.getName();
Expand Down

0 comments on commit c0a0784

Please sign in to comment.