Skip to content

Commit

Permalink
feat: added priorities for jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mckrava committed Dec 7, 2023
1 parent 548bc45 commit 7fcaab0
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 69 deletions.
48 changes: 24 additions & 24 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,27 @@ services:
networks:
- aggregator_dev

aggregator_app_nest:
build:
context: ./
dockerfile: local.Dockerfile
image: aggregator_app_nest
container_name: aggregator_app_nest
depends_on:
- aggregator_queue_redis
- aggregator_main_pg_db
environment:
AGGREGATOR_DB_HOST: aggregator_main_pg_db
AGGREGATOR_DB_PORT: 5432
AGGREGATOR_REDIS_HOST: aggregator_queue_redis
AGGREGATOR_REDIS_PORT: 6379
AGGREGATOR_HISTORY_RENEW_INTERVAL_MS: 30000
APP_PORT: 8080
NODE_ENV: development
ports:
- '8080:8080'
volumes:
- ./:/app
- /app/node_modules
networks:
- aggregator_dev
# aggregator_app_nest:
# build:
# context: ./
# dockerfile: local.Dockerfile
# image: aggregator_app_nest
# container_name: aggregator_app_nest
# depends_on:
# - aggregator_queue_redis
# - aggregator_main_pg_db
# environment:
# AGGREGATOR_DB_HOST: aggregator_main_pg_db
# AGGREGATOR_DB_PORT: 5432
# AGGREGATOR_REDIS_HOST: aggregator_queue_redis
# AGGREGATOR_REDIS_PORT: 6379
# AGGREGATOR_HISTORY_RENEW_INTERVAL_MS: 30000
# APP_PORT: 8080
# NODE_ENV: development
# ports:
# - '8080:8080'
# volumes:
# - ./:/app
# - /app/node_modules
# networks:
# - aggregator_dev
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class AccountSyncSchedulerService {
jobName: SubIdAggregatorJobName.REFRESH_TX_HISTORY_FOR_ACCOUNT_SCHEDULED,
jobOptions: {
jobId: this.getHistoryRenewJobId(decoratedPublicKey),
priority: 2,
repeat: {
every: this.appConfig.AGGREGATOR_HISTORY_RENEW_INTERVAL_MS,
limit: 120_960, // 7 days with interval 5 sec
Expand Down
5 changes: 4 additions & 1 deletion src/modules/apiGateway/gql/transactionsHistory.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export class TransactionsHistoryResolver {
@Args('args')
args: EnqueueAccountAggregationJobInput,
): Promise<RefreshTxHistoryResponseDto> {
await this.accountAggregationFlowProducer.enqueueTask(args);
await this.accountAggregationFlowProducer.enqueueTask({
...args,
jobOptions: { ...args, priority: 1 },
});

return { success: true };
}
Expand Down
7 changes: 3 additions & 4 deletions src/modules/dataAggregator/services/aggregation.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,15 @@ export class AggregationHelper {

const aggregationChunkResults = await Promise.allSettled(
chunksRanges.map((range) => {
return this.datasourceChunksParallelHandlingProducer.enqueueAndWaitCollectTransferEventDataChunk(
return this.datasourceChunksParallelHandlingProducer.enqueueAndWaitCollectTransferEventDataChunkJobProducer(
{
blockchainTag: inputData.blockchainTag,
event: inputData.event,
publicKey: inputData.publicKey,
sourceUrl: inputData.sourceUrl,
chunkStartBlock: range[0],
chunkEndBlock: range[1],
onDemand: inputData.onDemand,
},
);
}),
Expand Down Expand Up @@ -226,9 +227,7 @@ export class AggregationHelper {
const runQuery = async (offset: number = 0) => {
const currentOffset = offset;
console.log(
`${pubicKeyShort} :: query START :: ${inputData.blockchainTag} :: ${
inputData.chunkStartBlock
}/${inputData.chunkEndBlock} :: offset ${currentOffset}`,
`${pubicKeyShort} :: query START :: ${inputData.blockchainTag} :: ${inputData.chunkStartBlock}/${inputData.chunkEndBlock} :: offset ${currentOffset}`,
);

const resp = await this.dataSourceUtils.getTransfersByAccount({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export class DataAggregatorService {
const txAccount = await this.accountService.getOrCreateAccount(
publicKeyDecorated,
);
const isJobOnDemand = job.name.indexOf('ON_DEMAND') >= 0;

const aggregationResultByChain = await Promise.allSettled(
this.blockchainService.blockchainDataSourceConfigs
Expand All @@ -41,13 +42,14 @@ export class DataAggregatorService {
sourceUrl: chainData.events[eventName],
latestProcessedBlock:
txAccount.latestProcessedBlock[chainData.tag][eventName],
onDemand: isJobOnDemand,
});
}
return chainEvents;
})
.flat()
.map((collectReqInput) =>
this.datasourceHandlingProducer.collectEventDataFromDataSource(
this.datasourceHandlingProducer.collectEventDataFromDataSourceJobProducer(
collectReqInput,
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ export class CollectEventDataChunkFromDataSourceInput {
sourceUrl: string;
chunkStartBlock: number;
chunkEndBlock: number | null;
onDemand?: boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export class CollectEventDataFromDataSourceInput {
publicKey: string;
sourceUrl: string;
latestProcessedBlock: number;
onDemand?: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class DatasourceChunksParallelHandlingProducer {
private datasourceChunksParallelHandlingQueue: Queue,
) {}

async enqueueAndWaitCollectTransferEventDataChunk(
async enqueueAndWaitCollectTransferEventDataChunkJobProducer(
requestData: CollectEventDataChunkFromDataSourceInput,
) {
return new Promise<{
Expand All @@ -31,7 +31,7 @@ export class DatasourceChunksParallelHandlingProducer {
removeOnComplete: true,
removeOnFail: false,
stackTraceLimit: 100,

priority: requestData.onDemand ? 1 : 2,
},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class DatasourceHandlingProducer {
return crypto.randomUUID();
}

async collectEventDataFromDataSource(
async collectEventDataFromDataSourceJobProducer(
requestData: CollectEventDataFromDataSourceInput,
) {
return new Promise<CollectEventDataFromDataSourceResponse>(
Expand All @@ -37,19 +37,10 @@ export class DatasourceHandlingProducer {
jobId: this.getJobId(requestData),
removeOnComplete: true,
removeOnFail: false,
priority: requestData.onDemand ? 1 : 2,
},
);

// const logsInterval = setInterval(async () => {
// const logs = await this.datasourceHandlingQueue.getJobLogs(job.id);
// if (logs.count !== 0) {
// console.log(`Job ${job.name}/${job.id}`);
// console.dir(logs.logs, {
// depth: null,
// });
// }
// }, 500);

const jobResult = await job.finished();

// TODO add result check
Expand All @@ -59,30 +50,30 @@ export class DatasourceHandlingProducer {
},
);
}

async enqueueAndWaitCollectTransferEventDataChunk(
requestData: CollectEventDataChunkFromDataSourceInput,
) {
return new Promise<{
jobResult: CollectTransfersChunkHandlerResponseResponse;
}>(async (resolve, reject) => {
const job = await this.datasourceHandlingQueue.add(
'TRANSFER_CHUNK',
requestData,
{
attempts: 20,
jobId: crypto.randomUUID(),
removeOnComplete: false,
removeOnFail: false,
},
);

const jobResult = await job.finished();

// TODO add result check
// TODO Add a watchdog to check if the job has finished periodically. Since pubsub does not give any guarantees.

resolve({ jobResult: JSON.parse(jobResult) });
});
}
//
// async enqueueAndWaitCollectTransferEventDataChunkJobProducer(
// requestData: CollectEventDataChunkFromDataSourceInput,
// ) {
// return new Promise<{
// jobResult: CollectTransfersChunkHandlerResponseResponse;
// }>(async (resolve, reject) => {
// const job = await this.datasourceHandlingQueue.add(
// 'TRANSFER_CHUNK',
// requestData,
// {
// attempts: 20,
// jobId: crypto.randomUUID(),
// removeOnComplete: false,
// removeOnFail: false,
// },
// );
//
// const jobResult = await job.finished();
//
// // TODO add result check
// // TODO Add a watchdog to check if the job has finished periodically. Since pubsub does not give any guarantees.
//
// resolve({ jobResult: JSON.parse(jobResult) });
// });
// }
}

0 comments on commit 7fcaab0

Please sign in to comment.