Skip to content

Commit

Permalink
feat: moved chunk workers from parallel to concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
mckrava committed Nov 22, 2023
1 parent 2bac075 commit c8d234e
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 72 deletions.
24 changes: 17 additions & 7 deletions src/modules/dataAggregator/services/aggregation.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,22 @@ export class AggregationHelper {
const responseBuffer: GetTransfersByAccountQuery['transfers'] = [];
let index = 1;

const pubicKeyShort = `${inputData.publicKey.substring(
0,
5,
)}..${inputData.publicKey.substring(
inputData.publicKey.length - 5,
inputData.publicKey.length - 1,
)}`;

const runQuery = async (offset: number = 0) => {
const currentOffset = offset;
console.log(
`${pubicKeyShort} :: query START :: ${inputData.blockchainTag} :: ${
inputData.chunkStartBlock
}/${inputData.chunkEndBlock} :: offset ${currentOffset}`,
);

const resp = await this.dataSourceUtils.getTransfersByAccount({
limit: pageSize,
offset: currentOffset,
Expand All @@ -225,16 +239,12 @@ export class AggregationHelper {
blockNumber_lt: inputData.chunkEndBlock,
queryUrl: inputData.sourceUrl,
});
console.log(
`${pubicKeyShort} :: query COMPLETED :: ${inputData.blockchainTag} :: ${inputData.chunkStartBlock}/${inputData.chunkEndBlock} `,
);
if (resp.transfers.length === 0) return;
responseBuffer.push(...resp.transfers);

console.log(
`runQuery :: ${inputData.blockchainTag} :: ${
inputData.chunkStartBlock
}/${inputData.chunkEndBlock} :: index ${index} :: offset ${
currentOffset + pageSize
}`,
);
index++;
await runQuery(currentOffset + pageSize);
};
Expand Down
20 changes: 10 additions & 10 deletions src/modules/queueProcessor/queueProcessor.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ import { DatasourceChunkParallelHandlingConsumer } from './services/consumers/da
},
{
name: SubIdAggregatorQueueName.DATASOURCE_CHUNKS_PARALLEL_HANDLING,
processors: [
{
concurrency: 101,
name: 'TRANSFER_CHUNK',
path: join(
__dirname,
'services/workers/collectTransfersDataChunk.worker.js',
),
},
],
// processors: [
// {
// concurrency: 101,
// name: 'TRANSFER_CHUNK',
// path: join(
// __dirname,
// 'services/workers/collectTransfersDataChunk.worker.js',
// ),
// },
// ],
},
),
BullBoardModule.forFeature(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,28 @@ import { CollectEventDataChunkFromDataSourceInput } from '../../dto/collectEvent
export class DatasourceChunkParallelHandlingConsumer {
constructor(private aggregationHelper: AggregationHelper) {}

// @Process({
// name: 'TRANSFER_CHUNK',
// concurrency: 200,
// })
// async collectAccountTransfersChunk(
// job: Job<CollectEventDataChunkFromDataSourceInput>,
// ) {
// await job.takeLock();
//
// try {
// const result = await this.aggregationHelper.collectTransferEventDataChunk(
// job.data,
// );
//
// await job.releaseLock();
// await job.moveToCompleted(JSON.stringify(result), true);
// } catch (e) {
// await job.releaseLock();
// await job.moveToFailed({
// message: (e as Error).message || 'Something went wrong.',
// });
// }
// return {};
// }
@Process({
name: 'TRANSFER_CHUNK',
concurrency: 200,
})
async collectAccountTransfersChunk(
job: Job<CollectEventDataChunkFromDataSourceInput>,
) {
await job.takeLock();

try {
const result = await this.aggregationHelper.collectTransferEventDataChunk(
job.data,
);

await job.releaseLock();
await job.moveToCompleted(JSON.stringify(result), true);
} catch (e) {
await job.releaseLock();
await job.moveToFailed({
message: (e as Error).message || 'Something went wrong.',
});
}
return {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,30 @@ export class DatasourceHandlingConsumer {
return {};
}

@Process({
name: 'TRANSFER_CHUNK',
concurrency: 200,
})
async collectAccountTransfersChunk(
job: Job<CollectEventDataChunkFromDataSourceInput>,
) {
await job.takeLock();

try {
const result = await this.aggregationHelper.collectTransferEventDataChunk(
job.data,
);

await job.releaseLock();
await job.moveToCompleted(JSON.stringify(result), true);
} catch (e) {
await job.releaseLock();
await job.moveToFailed({
message: (e as Error).message || 'Something went wrong.',
});
}
return {};
}
// @Process({
// name: 'TRANSFER_CHUNK',
// concurrency: 200,
// })
// async collectAccountTransfersChunk(
// job: Job<CollectEventDataChunkFromDataSourceInput>,
// ) {
// await job.takeLock();
//
// try {
// const result = await this.aggregationHelper.collectTransferEventDataChunk(
// job.data,
// );
//
// await job.releaseLock();
// await job.moveToCompleted(JSON.stringify(result), true);
// } catch (e) {
// await job.releaseLock();
// await job.moveToFailed({
// message: (e as Error).message || 'Something went wrong.',
// });
// }
// return {};
// }

@Process({
name: NativeTransactionKind.VOTE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ export class DatasourceChunksParallelHandlingProducer {
'TRANSFER_CHUNK',
requestData,
{
attempts: 20,
attempts: 5,
timeout: 60 * 1000,
jobId: crypto.randomUUID(),
removeOnComplete: false,
removeOnComplete: true,
removeOnFail: false,
stackTraceLimit: 100,

},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ export default async function (job: Job, cb: DoneCallback) {

const runQuery = async (offset: number = 0) => {
const currentOffset = offset;
console.log(
`runQuery started :: ${inputData.blockchainTag} :: ${
inputData.chunkStartBlock
}/${inputData.chunkEndBlock} :: index ${index} :: offset ${
currentOffset + pageSize
}`,
);

const resp = await dataSourceUtils.getTransfersByAccount({
limit: pageSize,
offset: currentOffset,
Expand All @@ -31,7 +39,7 @@ export default async function (job: Job, cb: DoneCallback) {
responseBuffer.push(...resp.transfers);

console.log(
`runQuery :: ${inputData.blockchainTag} :: ${
`runQuery completed :: ${inputData.blockchainTag} :: ${
inputData.chunkStartBlock
}/${inputData.chunkEndBlock} :: index ${index} :: offset ${
currentOffset + pageSize
Expand Down
2 changes: 1 addition & 1 deletion src/modulesConfig/bullModule.forRoot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export default {
settings: {
lockDuration: 20000, // Check for stalled jobs each 2 min
lockRenewTime: 10000,
stalledInterval: 10 * 60 * 1000,
stalledInterval: 5 * 60 * 1000,
maxStalledCount: 1,
},
};
Expand Down
6 changes: 3 additions & 3 deletions src/utils/dataSourceUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ export class DataSourceUtils {
}

async getTransfersByAccount(data: GetTransfersByAccountArgs) {
console.log(
`request started :: ${data.blockNumber_gt}/${data.blockNumber_lt}`,
);
// console.log(
// `request started :: ${data.blockNumber_gt}/${data.blockNumber_lt}`,
// );
const res = await this.requestWithRetry<GetTransfersByAccountQuery>(
this.squidQueryRequest<GetTransfersByAccountQuery, QueryTransfersArgs>(
{
Expand Down

0 comments on commit c8d234e

Please sign in to comment.