Skip to content

Commit

Permalink
feat: updated subquery api calls flow
Browse files Browse the repository at this point in the history
  • Loading branch information
mckrava committed Dec 25, 2023
1 parent b3bc76b commit 0be832d
Show file tree
Hide file tree
Showing 19 changed files with 2,097 additions and 1,245 deletions.
14 changes: 7 additions & 7 deletions deployment/feature-based/all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ data:
AGGREGATOR_REDIS_PREFIX: 'aggregator_queue_<BRANCH>'
AGGREGATOR_REDIS_PORT: '6379'
AGGREGATOR_REDIS_ENABLE_SSL: 'false'
AGGREGATOR_HISTORY_RENEW_INTERVAL_MS: '60000'
AGGREGATOR_GS_MAIN_CHUNK_BLOCKS_SIZE: '1000000'
AGGREGATOR_HISTORY_RENEW_INTERVAL_MS: '120000'
AGGREGATOR_GS_MAIN_CHUNK_BLOCKS_SIZE: '2000000'

DATA_SOURCE__SUBSQUID__POLKADOT__TRANSFER: 'https://squid.subsquid.io/gs-main-polkadot/graphql'
DATA_SOURCE__SUBSQUID__KUSAMA__TRANSFER: 'https://squid.subsquid.io/gs-main-kusama/graphql'
DATA_SOURCE__SUBSQUID__MOONBEAM__TRANSFER: 'https://squid.subsquid.io/gs-main-moonbeam/graphql'
DATA_SOURCE__SUBSQUID__MOONRIVER__TRANSFER: 'https://squid.subsquid.io/gs-main-moonriver/graphql'
DATA_SOURCE__SUBSQUID__ASTAR__TRANSFER: 'https://squid.subsquid.io/gs-main-astar/graphql'

DATA_SOURCE__SUBQUERY__POLKADOT__TRANSFER: 'https://api.subquery.network/sq/nova-wallet/nova-wallet-polkadot'
DATA_SOURCE__SUBQUERY__KUSAMA__TRANSFER: 'https://api.subquery.network/sq/nova-wallet/nova-wallet-kusama'
DATA_SOURCE__SUBQUERY__MOONBEAM__TRANSFER: 'https://api.subquery.network/sq/nova-wallet/nova-wallet-moonbeam'
DATA_SOURCE__SUBQUERY__MOONRIVER__TRANSFER: 'https://api.subquery.network/sq/nova-wallet/nova-wallet-moonriver'
DATA_SOURCE__SUBQUERY__ASTAR__TRANSFER: 'https://api.subquery.network/sq/nova-wallet/nova-wallet-astar'
DATA_SOURCE__SUBQUERY__POLKADOT__TRANSFER: 'QmaKL376q6hmmsEAAeaFjKGDXJUjPp9qRPC1qjkNNUNomR'
DATA_SOURCE__SUBQUERY__KUSAMA__TRANSFER: 'QmPgkh7a5ziXfiHE11AMXBA5qkF2d3NdoNKKKeo68VeouC'
DATA_SOURCE__SUBQUERY__MOONBEAM__TRANSFER: 'QmSZNKPCNxAuiSjgDyTiK7VJBFdCcp8b11NzuUYvE7akEi'
DATA_SOURCE__SUBQUERY__MOONRIVER__TRANSFER: 'QmQnXCkFo9hhxLJHMBotCB2S8zhMR4tBkeiFHrLTBHjXja'
DATA_SOURCE__SUBQUERY__ASTAR__TRANSFER: 'QmZt47LXCtv9unSAaD9SbdEFfQQTLv9CTMHQKNKh2VDfwX'

DATA_SOURCE_PROVIDER_TRANSFER: 'SUBQUERY'

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"codegen:subquery-nova-query": "graphql-codegen --require dotenv/config --config codegenConfig/codegen-subquery-nova-query.ts"
},
"dependencies": {
"@apollo/client": "^3.8.8",
"@apollo/server": "^4.7.2",
"@bull-board/api": "^5.8.4",
"@bull-board/express": "^5.8.4",
Expand All @@ -38,6 +39,7 @@
"@nestjs/schedule": "^4.0.0",
"@nestjs/typeorm": "^9.0.1",
"@polkadot/util-crypto": "^12.4.2",
"@subql/apollo-links": "^1.2.5",
"@subsocial/api": "^0.8.14",
"@subsocial/utils": "^0.8.14",
"@subsquid/openreader": "^3.1.7",
Expand Down
3 changes: 2 additions & 1 deletion src/dependencyServiceModule.module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Module } from '@nestjs/common';
import { Global, Module } from '@nestjs/common';
import { CryptoUtils } from './utils/cryptoUtils';
import { CommonUtils } from './utils/commonUtils';
import { DataSourceUtils } from './utils/dataSources/dataSourceUtils';

@Global()
@Module({
providers: [CryptoUtils, CommonUtils, DataSourceUtils],
exports: [CryptoUtils, CommonUtils, DataSourceUtils],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export class DataSourceEmptyResponsePlaceholderDto {
resultPlaceholder?: Record<any, any>
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export class GetIndexerLastProcessedHeightArgs {
queryUrl: string;
import { ApolloLink } from '@apollo/client/core';
import { DataSourceEmptyResponsePlaceholderDto } from './dataSourceEmptyResponsePlaceholder.dto';

export class GetIndexerLastProcessedHeightArgs extends DataSourceEmptyResponsePlaceholderDto {
queryUrl: string | ApolloLink;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { BlockchainTag } from '../../../constants/blockchain';
import { ApolloLink } from '@apollo/client/core';
import { DataSourceEmptyResponsePlaceholderDto } from './dataSourceEmptyResponsePlaceholder.dto';

export class GetTransfersByAccountArgs {
export class GetTransfersByAccountArgs extends DataSourceEmptyResponsePlaceholderDto {
blockchainTag: BlockchainTag;
limit: number;
offset: number;
blockNumber_gt: number;
blockNumber_lt: number | null;
publicKey?: string;
address?: string;
queryUrl: string;
queryUrl: string | ApolloLink;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TransferDirection } from '../../../constants/common';

export type TransferDecoratedDto = {
id: string;
direction?: TransferDirection | null;
transfer?: {
id: string;
Expand Down
10 changes: 9 additions & 1 deletion src/modules/dataAggregator/services/aggregation.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ export class AggregationHelper {
queryUrl: inputData.sourceUrl,
});

console.log(
`${inputData.blockchainTag} indexer height - ${sourceIndexerStatus.height}`,
);

let chunksRanges = this.getChunksRanges({
latestProcessedBlock: inputData.latestProcessedBlock,
totalBlocks: sourceIndexerStatus.height,
Expand Down Expand Up @@ -251,7 +255,11 @@ export class AggregationHelper {
const pageSize = 1000;
await runQuery();
return {
fetchedChunkData: responseBuffer,
fetchedChunkData:
this.dataSourceUtils.getListWithoutDuplicates<TransferDecoratedDto>(
responseBuffer,
'id',
),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export class DataAggregatorService {
.map((chainData) => {
const chainEvents: CollectEventDataFromDataSourceInput[] = [];
for (const eventName in chainData.events) {
console.log(chainData.events[eventName])
chainEvents.push({
event: eventName as NativeTransactionKind,
publicKey: publicKeyDecorated,
Expand Down
14 changes: 9 additions & 5 deletions src/modules/entities/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
supportedBlockchainDetails,
} from '../../../constants/blockchain';
import { AppConfig } from '../../../config.module';
import { DataSourceUtils } from '../../../utils/dataSources/dataSourceUtils';
import { NativeTransactionKind } from '../../../constants/common';

@Injectable()
export class BlockchainService {
Expand All @@ -17,6 +19,7 @@ export class BlockchainService {
@InjectRepository(Blockchain)
public readonly blockchainRepository: Repository<Blockchain>,
private appConfig: AppConfig,
private dataSourceUtils: DataSourceUtils,
) {
this.setDataSourceEndpoints();
}
Expand All @@ -27,14 +30,15 @@ export class BlockchainService {

for (const eventName in chainConfig.events) {
chainConfigUpdated.events[eventName] =
this.appConfig[
`DATA_SOURCE__${
this.appConfig[`DATA_SOURCE_PROVIDER_${eventName}`]
}__${chainConfig.tag}__${eventName}`
];
this.dataSourceUtils.getQueryEndpoint(
chainConfig.tag,
eventName as NativeTransactionKind,
);
}
this.blockchainDataSourceConfigs.push(chainConfigUpdated);
}

console.dir(this.blockchainDataSourceConfigs, { depth: null });
}

async getOrCreateBlockchain(blockchainId: string): Promise<Blockchain> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,30 @@ export class DatasourceChunksParallelHandlingProducer {
'TRANSFER_CHUNK',
requestData,
{
attempts: 5,
attempts: 3,
timeout: 60 * 1000,
jobId: crypto.randomUUID(),
removeOnComplete: true,
removeOnComplete: false,
removeOnFail: false,
stackTraceLimit: 100,
priority: requestData.onDemand ? 1 : 2,
},
);

const jobResult = await job.finished();
// TODO add result check
// TODO Add a watchdog to check if the job has finished periodically. Since pubsub does not give any guarantees.
resolve({ jobResult: JSON.parse(jobResult) });
// Watchdog to check if the job has finished periodically. Since pubsub does not give any guarantees.

const intervalInst = setInterval(async () => {
const jobStatus = await job.getState();

if (jobStatus === 'completed' || jobStatus === 'failed') {
clearInterval(intervalInst);
const jobRes =
await this.datasourceChunksParallelHandlingQueue.getJob(job.id);
await jobRes.remove();
resolve({ jobResult: JSON.parse(jobRes.returnvalue) });
return;
}
}, 500);
} catch (e) {
reject(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,27 @@ export class DatasourceHandlingProducer {
requestData.event,
requestData,
{
attempts: 20,
attempts: 5,
jobId: this.getJobId(requestData),
removeOnComplete: true,
removeOnComplete: false,
removeOnFail: false,
priority: requestData.onDemand ? 1 : 2,
},
);

const jobResult = await job.finished();
// Watchdog to check if the job has finished periodically. Since pubsub does not give any guarantees.

// TODO add result check
// TODO Add a watchdog to check if the job has finished periodically. Since pubsub does not give any guarantees.
const intervalInst = setInterval(async () => {
const jobStatus = await job.getState();

resolve({ jobResult: JSON.parse(jobResult), requestData });
if (jobStatus === 'completed' || jobStatus === 'failed') {
clearInterval(intervalInst);
const jobRes = await this.datasourceHandlingQueue.getJob(job.id);
await jobRes.remove();
resolve({ jobResult: JSON.parse(jobRes.returnvalue), requestData });
return;
}
}, 500);
},
);
}
Expand Down
48 changes: 48 additions & 0 deletions src/utils/dataSources/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
import { GraphQLClient, RequestOptions, Variables } from 'graphql-request';
import {
ApolloClient,
ApolloLink,
ApolloQueryResult,
InMemoryCache,
} from '@apollo/client/core';
import { ApolloClientOptions } from '@apollo/client/core/ApolloClient';
import { OperationVariables } from '@apollo/client/core/types';
import type { DocumentNode } from 'graphql';

export class CommonDataSourceUtils {
constructor() {}
Expand Down Expand Up @@ -37,4 +46,43 @@ export class CommonDataSourceUtils {
});
return client.request({ queryUrl, ...config });
}

async indexerQueryRequestApolloClient<
T,
V extends OperationVariables = OperationVariables,
>(
config: { variables: V; document: DocumentNode },
queryUrl: string | ApolloLink,
): Promise<T> {
if (!queryUrl) throw new Error('queryUrlOrLinks is not provided');

const options: ApolloClientOptions<any> = {
cache: new InMemoryCache(),
defaultOptions: {
watchQuery: {
fetchPolicy: 'no-cache',
},
query: {
fetchPolicy: 'no-cache',
},
},
};

if (typeof queryUrl === 'string') {
options.uri = queryUrl;
} else {
options.link = queryUrl;
}

const client = new ApolloClient(options);

return (
await client.query<T, V>({
query: config.document,
variables: config.variables,
fetchPolicy: 'no-cache',
})
).data;
}

}
4 changes: 4 additions & 0 deletions src/utils/dataSources/dataSourceDecorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ export class DataSourceDecorators {
for (const node of queryResponse.historyElements.nodes) {
const transferData = node.transfer as TransferDto;
decoratedData.transfers.push({
id: this.commonUtils.getTransferId({
blockNumber: node.blockNumber,
eventIndex: transferData.eventIdx,
}),
direction: this.getTransferDirection({
from: transferData.from,
to: transferData.to,
Expand Down
58 changes: 40 additions & 18 deletions src/utils/dataSources/dataSourceUtils.subQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ import {
GetTransfersByAccountSubQueryQueryVariables,
} from '../graphQl/subQueryNova/subquery-nova-query';
import { CryptoUtils } from '../cryptoUtils';
import { deploymentHttpLink } from '@subql/apollo-links';

// @Injectable()
export class DataSourceUtilsSubQuery extends CommonDataSourceUtils {
private cryptoUtils: CryptoUtils = new CryptoUtils();

async getTransfersByAccount(data: GetTransfersByAccountArgs) {
const res = await this.requestWithRetry<GetTransfersByAccountSubQueryQuery>(
this.indexerQueryRequest<
let res = data.resultPlaceholder;

try {
res = await this.indexerQueryRequestApolloClient<
GetTransfersByAccountSubQueryQuery,
GetTransfersByAccountSubQueryQueryVariables
>(
Expand Down Expand Up @@ -48,27 +51,46 @@ export class DataSourceUtilsSubQuery extends CommonDataSourceUtils {
},
},
data.queryUrl,
),
{ retries: 5, everyMs: 1_500 },
);
);
} catch (e) {
console.log(e);
}

return res;
}

async getIndexerLastProcessedHeight(data: GetIndexerLastProcessedHeightArgs) {
const res =
await this.requestWithRetry<GetIndexerLastProcessedHeightSubQueryQuery>(
this.indexerQueryRequest<
GetIndexerLastProcessedHeightSubQueryQuery,
{}
>(
{
document: GET_INDEXER_LAST_PROCESSED_HEIGHT,
variables: {},
},
data.queryUrl,
),
{ retries: 5, everyMs: 1_500 },
let res = data.resultPlaceholder;

try {
res = this.indexerQueryRequestApolloClient<
GetIndexerLastProcessedHeightSubQueryQuery,
{}
>(
{
document: GET_INDEXER_LAST_PROCESSED_HEIGHT,
variables: {},
},
data.queryUrl,
);
} catch (e) {
console.log(e);
}

return res;
}

getQueryEndpointApolloLinks(deploymentId: string) {
const options = {
authUrl: 'https://kepler-auth.subquery.network', // this is for testnet, use https://kepler-auth.subquery.network for kepler
deploymentId: deploymentId,
httpOptions: { fetchOptions: { timeout: 5000 } },
maxRetries: 30,
logger: console, // or any other custom logger
};

const { link, cleanup } = deploymentHttpLink(options);

return link;
}
}
Loading

0 comments on commit 0be832d

Please sign in to comment.