From 8f79fdcb527f5f9011f62cf7e1d029145edc92a1 Mon Sep 17 00:00:00 2001 From: koekiebox Date: Thu, 29 Aug 2024 10:05:49 +0300 Subject: [PATCH] feat(workweek): asset caching. --- .../provisioning/dashboards/example.json | 38 ++++-------- .../backend/src/accounting/psql/service.ts | 2 + packages/backend/src/accounting/service.ts | 8 ++- .../src/accounting/tigerbeetle/service.ts | 2 + packages/backend/src/asset/service.ts | 26 ++++++-- packages/backend/src/config/app.ts | 8 +-- packages/backend/src/index.ts | 23 ++++--- .../open_payments/payment/incoming/service.ts | 47 +++++++++++---- .../src/open_payments/quote/service.ts | 22 +++++-- .../src/open_payments/wallet_address/model.ts | 17 ++++-- .../wallet_address/service.test.ts | 3 +- .../open_payments/wallet_address/service.ts | 60 +++++++++++-------- .../src/payment-method/ilp/connector/index.ts | 2 +- .../src/payment-method/ilp/peer/service.ts | 47 ++++++++------- pnpm-lock.yaml | 23 ------- .../scripts/create-outgoing-payments.js | 2 +- 16 files changed, 189 insertions(+), 141 deletions(-) diff --git a/localenv/telemetry/grafana/provisioning/dashboards/example.json b/localenv/telemetry/grafana/provisioning/dashboards/example.json index 58ef92d6c7..1df8c88614 100644 --- a/localenv/telemetry/grafana/provisioning/dashboards/example.json +++ b/localenv/telemetry/grafana/provisioning/dashboards/example.json @@ -90,9 +90,7 @@ "id": 12, "options": { "legend": { - "calcs": [ - "p95" - ], + "calcs": ["p95"], "displayMode": "table", "placement": "right", "showLegend": true @@ -201,9 +199,7 @@ "id": 9, "options": { "legend": { - "calcs": [ - "p95" - ], + "calcs": ["p95"], "displayMode": "table", "placement": "right", "showLegend": true @@ -365,9 +361,7 @@ "id": 7, "options": { "legend": { - "calcs": [ - "p95" - ], + "calcs": ["p95"], "displayMode": "table", "placement": "right", "showLegend": true @@ -624,9 +618,7 @@ "id": 10, "options": { "legend": { - "calcs": [ - "p95" - ], + "calcs": ["p95"], "displayMode": "table", "placement": "right", "showLegend": true @@ -776,9 +768,7 @@ "namePlacement": "auto", "orientation": "horizontal", "reduceOptions": { - "calcs": [ - "lastNotNull" - ], + "calcs": ["lastNotNull"], "fields": "", "values": false }, @@ -850,9 +840,7 @@ "footer": { "countRows": false, "fields": "", - "reducer": [ - "sum" - ], + "reducer": ["sum"], "show": false }, "showHeader": true, @@ -881,9 +869,7 @@ "operator": "=", "scope": "resource", "tag": "service.name", - "value": [ - "RAFIKI_NETWORK" - ], + "value": ["RAFIKI_NETWORK"], "valueType": "string" }, { @@ -1150,9 +1136,7 @@ "orientation": "auto", "percentChangeColorMode": "standard", "reduceOptions": { - "calcs": [ - "lastNotNull" - ], + "calcs": ["lastNotNull"], "fields": "", "values": false }, @@ -1247,9 +1231,7 @@ "orientation": "auto", "percentChangeColorMode": "standard", "reduceOptions": { - "calcs": [ - "lastNotNull" - ], + "calcs": ["lastNotNull"], "fields": "", "values": false }, @@ -1330,4 +1312,4 @@ "uid": "fdr58stwkr6yof", "version": 2, "weekStart": "" -} \ No newline at end of file +} diff --git a/packages/backend/src/accounting/psql/service.ts b/packages/backend/src/accounting/psql/service.ts index ecdf64f177..35f5d58a8e 100644 --- a/packages/backend/src/accounting/psql/service.ts +++ b/packages/backend/src/accounting/psql/service.ts @@ -35,8 +35,10 @@ import { } from './ledger-transfer' import { LedgerTransfer, LedgerTransferType } from './ledger-transfer/model' import { TelemetryService } from '../../telemetry/service' +import { AssetService } from '../../asset/service' export interface ServiceDependencies extends BaseService { + fetchAssetService?: () => Promise knex: TransactionOrKnex withdrawalThrottleDelay?: number telemetry?: TelemetryService diff --git a/packages/backend/src/accounting/service.ts b/packages/backend/src/accounting/service.ts index bdbbc3c2fc..f8e36e53fa 100644 --- a/packages/backend/src/accounting/service.ts +++ b/packages/backend/src/accounting/service.ts @@ -1,6 +1,7 @@ import { TransactionOrKnex } from 'objection' import { BaseService } from '../shared/baseService' import { TransferError, isTransferError } from './errors' +import { AssetService } from '../asset/service' export enum LiquidityAccountType { ASSET = 'ASSET', @@ -40,6 +41,7 @@ export interface LiquidityAccount { export interface OnCreditOptions { totalReceived: bigint withdrawalThrottleDelay?: number + fetchAssetService?: () => Promise } export interface OnDebitOptions { @@ -133,6 +135,7 @@ export interface TransferToCreate { } export interface BaseAccountingServiceDependencies extends BaseService { + fetchAssetService?: () => Promise withdrawalThrottleDelay?: number } @@ -160,7 +163,7 @@ export async function createAccountToAccountTransfer( getAccountBalance } = args - const { withdrawalThrottleDelay } = deps + const { withdrawalThrottleDelay, fetchAssetService } = deps const { sourceAccount, destinationAccount, sourceAmount, destinationAmount } = transferArgs @@ -226,7 +229,8 @@ export async function createAccountToAccountTransfer( await destinationAccount.onCredit({ totalReceived, - withdrawalThrottleDelay + withdrawalThrottleDelay, + fetchAssetService }) } }, diff --git a/packages/backend/src/accounting/tigerbeetle/service.ts b/packages/backend/src/accounting/tigerbeetle/service.ts index 479bb754d5..30adbac492 100644 --- a/packages/backend/src/accounting/tigerbeetle/service.ts +++ b/packages/backend/src/accounting/tigerbeetle/service.ts @@ -32,6 +32,7 @@ import { } from './transfers' import { toTigerBeetleId } from './utils' import { TelemetryService } from '../../telemetry/service' +import { AssetService } from '../../asset/service' export enum TigerBeetleAccountCode { LIQUIDITY_WEB_MONETIZATION = 1, @@ -68,6 +69,7 @@ export const convertToTigerBeetleTransferCode: { } export interface ServiceDependencies extends BaseService { + fetchAssetService?: () => Promise tigerBeetle: Client withdrawalThrottleDelay?: number telemetry?: TelemetryService diff --git a/packages/backend/src/asset/service.ts b/packages/backend/src/asset/service.ts index 3070a1d905..d29de805e8 100644 --- a/packages/backend/src/asset/service.ts +++ b/packages/backend/src/asset/service.ts @@ -7,6 +7,8 @@ import { BaseService } from '../shared/baseService' import { AccountingService, LiquidityAccountType } from '../accounting/service' import { WalletAddress } from '../open_payments/wallet_address/model' import { Peer } from '../payment-method/ilp/peer/model' +import { Quote } from '../open_payments/quote/model' +import { IncomingPayment } from '../open_payments/payment/incoming/model' export interface AssetOptions { code: string @@ -28,30 +30,33 @@ export interface DeleteOptions { deletedAt: Date } +export type ToSetOn = Quote | IncomingPayment | WalletAddress | Peer | undefined + export interface AssetService { create(options: CreateOptions): Promise update(options: UpdateOptions): Promise delete(options: DeleteOptions): Promise get(id: string): Promise + setOn(obj: ToSetOn): Promise getPage(pagination?: Pagination, sortOrder?: SortOrder): Promise getAll(): Promise } interface ServiceDependencies extends BaseService { - accountingService: AccountingService, + accountingService: AccountingService assetCache: Map } export async function createAssetService({ logger, knex, - accountingService + accountingService, + assetCache }: ServiceDependencies): Promise { const log = logger.child({ service: 'AssetService' }) - const assetCache = new Map() const deps: ServiceDependencies = { logger: log, knex, @@ -64,6 +69,7 @@ export async function createAssetService({ update: (options) => updateAsset(deps, options), delete: (options) => deleteAsset(deps, options), get: (id) => getAsset(deps, id), + setOn: (toSetOn) => setAssetOn(deps, toSetOn), getPage: (pagination?, sortOrder?) => getAssetsPage(deps, pagination, sortOrder), getAll: () => getAll(deps) @@ -182,7 +188,10 @@ async function getAsset( const inMem = deps.assetCache.get(id) if (inMem) return inMem - return Asset.query(deps.knex).whereNull('deletedAt').findById(id) + const asset = await Asset.query(deps.knex).whereNull('deletedAt').findById(id) + if (asset) deps.assetCache.set(asset.id, asset) + + return asset } async function getAssetsPage( @@ -198,3 +207,12 @@ async function getAssetsPage( async function getAll(deps: ServiceDependencies): Promise { return Asset.query(deps.knex).whereNull('deletedAt') } + +async function setAssetOn( + deps: ServiceDependencies, + obj: ToSetOn +): Promise { + if (!obj) return + const asset = await getAsset(deps, obj.assetId) + if (asset) obj.asset = asset +} diff --git a/packages/backend/src/config/app.ts b/packages/backend/src/config/app.ts index 9368e43a82..6a9bbc081d 100644 --- a/packages/backend/src/config/app.ts +++ b/packages/backend/src/config/app.ts @@ -122,16 +122,16 @@ export const Config = { quoteLifespan: envInt('QUOTE_LIFESPAN', 5 * 60_000), // milliseconds walletAddressWorkers: envInt('WALLET_ADDRESS_WORKERS', 1), - walletAddressWorkerIdle: envInt('WALLET_ADDRESS_WORKER_IDLE', 200), // milliseconds + walletAddressWorkerIdle: envInt('WALLET_ADDRESS_WORKER_IDLE', 3000), // milliseconds authServerGrantUrl: envString('AUTH_SERVER_GRANT_URL'), authServerIntrospectionUrl: envString('AUTH_SERVER_INTROSPECTION_URL'), outgoingPaymentWorkers: envInt('OUTGOING_PAYMENT_WORKERS', 1), - outgoingPaymentWorkerIdle: envInt('OUTGOING_PAYMENT_WORKER_IDLE', 200), // milliseconds + outgoingPaymentWorkerIdle: envInt('OUTGOING_PAYMENT_WORKER_IDLE', 3000), // milliseconds incomingPaymentWorkers: envInt('INCOMING_PAYMENT_WORKERS', 1), - incomingPaymentWorkerIdle: envInt('INCOMING_PAYMENT_WORKER_IDLE', 200), // milliseconds + incomingPaymentWorkerIdle: envInt('INCOMING_PAYMENT_WORKER_IDLE', 3000), // milliseconds pollIncomingPaymentCreatedWebhook: envBool( 'POLL_INCOMING_PAYMENT_CREATED_WEBHOOK', false @@ -146,7 +146,7 @@ export const Config = { ), // milliseconds webhookWorkers: envInt('WEBHOOK_WORKERS', 1), - webhookWorkerIdle: envInt('WEBHOOK_WORKER_IDLE', 200), // milliseconds + webhookWorkerIdle: envInt('WEBHOOK_WORKER_IDLE', 3000), // milliseconds webhookUrl: envString('WEBHOOK_URL'), webhookTimeout: envInt('WEBHOOK_TIMEOUT', 2000), // milliseconds webhookMaxRetry: envInt('WEBHOOK_MAX_RETRY', 10), diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 6123bde11e..2d46234ac1 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -56,6 +56,7 @@ import { createStreamCredentialsService } from './payment-method/ilp/stream-cred import { createRatesService } from './rates/service' import { TelemetryService, createTelemetryService } from './telemetry/service' import { createWebhookService } from './webhook/service' +import { Asset } from './asset/model' BigInt.prototype.toJSON = function () { return this.toString() @@ -202,9 +203,10 @@ export function initIocContainer( const logger = await deps.use('logger') const knex = await deps.use('knex') return await createAssetService({ - logger: logger, - knex: knex, - accountingService: await deps.use('accountingService') + logger, + knex, + accountingService: await deps.use('accountingService'), + assetCache: new Map() }) }) @@ -232,7 +234,8 @@ export function initIocContainer( knex, tigerBeetle, withdrawalThrottleDelay: config.withdrawalThrottleDelay, - telemetry + telemetry, + fetchAssetService: () => Promise.resolve(deps.use('assetService')) }) } @@ -240,7 +243,8 @@ export function initIocContainer( logger, knex, withdrawalThrottleDelay: config.withdrawalThrottleDelay, - telemetry + telemetry, + fetchAssetService: () => Promise.resolve(deps.use('assetService')) }) }) container.singleton('peerService', async (deps) => { @@ -280,7 +284,8 @@ export function initIocContainer( knex: await deps.use('knex'), logger: logger, accountingService: await deps.use('accountingService'), - webhookService: await deps.use('webhookService') + webhookService: await deps.use('webhookService'), + assetService: await deps.use('assetService') }) }) container.singleton('spspRoutes', async (deps) => { @@ -298,7 +303,8 @@ export function initIocContainer( knex: await deps.use('knex'), accountingService: await deps.use('accountingService'), walletAddressService: await deps.use('walletAddressService'), - config: await deps.use('config') + config: await deps.use('config'), + assetService: await deps.use('assetService') }) }) container.singleton('remoteIncomingPaymentService', async (deps) => { @@ -469,7 +475,8 @@ export function initIocContainer( ), telemetry: config.enableTelemetry ? await deps.use('telemetry') - : undefined + : undefined, + assetService: await deps.use('assetService') }) }) diff --git a/packages/backend/src/open_payments/payment/incoming/service.ts b/packages/backend/src/open_payments/payment/incoming/service.ts index 6d7bd48e0a..b043329379 100644 --- a/packages/backend/src/open_payments/payment/incoming/service.ts +++ b/packages/backend/src/open_payments/payment/incoming/service.ts @@ -18,6 +18,7 @@ import { Amount } from '../../amount' import { IncomingPaymentError } from './errors' import { IAppConfig } from '../../../config/app' import { poll } from '../../../shared/utils' +import { AssetService } from '../../../asset/service' export const POSITIVE_SLIPPAGE = BigInt(1) // First retry waits 10 seconds @@ -49,6 +50,7 @@ export interface ServiceDependencies extends BaseService { knex: TransactionOrKnex accountingService: AccountingService walletAddressService: WalletAddressService + assetService: AssetService config: IAppConfig } @@ -79,9 +81,11 @@ async function getIncomingPayment( ): Promise { const incomingPayment = await IncomingPayment.query(deps.knex) .get(options) - .withGraphFetched('[asset, walletAddress]') - if (incomingPayment) return await addReceivedAmount(deps, incomingPayment) - else return + .withGraphFetched('[walletAddress]') + await deps.assetService.setOn(incomingPayment) + if (incomingPayment) { + return await addReceivedAmount(deps, incomingPayment) + } else return } async function createIncomingPayment( @@ -133,7 +137,8 @@ async function createIncomingPayment( state: IncomingPaymentState.Pending, processAt: expiresAt }) - .withGraphFetched('[asset, walletAddress]') + .withGraphFetched('[walletAddress]') + await deps.assetService.setOn(incomingPayment) await IncomingPaymentEvent.query(trx || deps.knex).insert({ incomingPaymentId: incomingPayment.id, @@ -185,11 +190,13 @@ async function getApprovedOrCanceledIncomingPayment( deps: ServiceDependencies, options: GetOptions ) { - return IncomingPayment.query(deps.knex) + const incomingPayment = await IncomingPayment.query(deps.knex) .get(options) - .withGraphFetched('[asset, walletAddress]') + .withGraphFetched('[walletAddress]') .whereNotNull('approvedAt') .orWhereNotNull('cancelledAt') + await deps.assetService.setOn(incomingPayment) + return incomingPayment } // Fetch (and lock) an incoming payment for work. @@ -206,7 +213,13 @@ async function processNextIncomingPayment( // If an incoming payment is locked, don't wait — just come back for it later. .skipLocked() .where('processAt', '<=', now) - .withGraphFetched('[asset, walletAddress]') + .withGraphFetched('[walletAddress]') + + if (incomingPayments && incomingPayments.length) { + for (const incomingPayment of incomingPayments) { + await deps_.assetService.setOn(incomingPayment) + } + } const incomingPayment = incomingPayments[0] if (!incomingPayment) return @@ -307,7 +320,14 @@ async function getWalletAddressPage( ): Promise { const page = await IncomingPayment.query(deps.knex) .list(options) - .withGraphFetched('[asset, walletAddress]') + .withGraphFetched('[walletAddress]') + .withGraphFetched('[walletAddress]') + if (page && page.length) { + for (const incomingPayment of page) { + await deps.assetService.setOn(incomingPayment) + } + } + const amounts = await deps.accountingService.getAccountsTotalReceived( page.map((payment: IncomingPayment) => payment.id) ) @@ -340,7 +360,8 @@ async function approveIncomingPayment( const payment = await IncomingPayment.query(trx) .findById(id) .forUpdate() - .withGraphFetched('[asset, walletAddress]') + .withGraphFetched('[walletAddress]') + await deps.assetService.setOn(payment) if (!payment) return IncomingPaymentError.UnknownPayment if (payment.state !== IncomingPaymentState.Pending) @@ -372,7 +393,8 @@ async function cancelIncomingPayment( const payment = await IncomingPayment.query(trx) .findById(id) .forUpdate() - .withGraphFetched('[asset, walletAddress]') + .withGraphFetched('[walletAddress]') + await deps.assetService.setOn(payment) if (!payment) return IncomingPaymentError.UnknownPayment if (payment.state !== IncomingPaymentState.Pending) @@ -404,8 +426,10 @@ async function completeIncomingPayment( const payment = await IncomingPayment.query(trx) .findById(id) .forUpdate() - .withGraphFetched('[asset, walletAddress]') + .withGraphFetched('[walletAddress]') if (!payment) return IncomingPaymentError.UnknownPayment + + await deps.assetService.setOn(payment) if ( ![IncomingPaymentState.Pending, IncomingPaymentState.Processing].includes( payment.state @@ -434,6 +458,5 @@ async function addReceivedAmount( assetCode: payment.asset.code, assetScale: payment.asset.scale } - return payment } diff --git a/packages/backend/src/open_payments/quote/service.ts b/packages/backend/src/open_payments/quote/service.ts index 3010af0f9e..4638800119 100644 --- a/packages/backend/src/open_payments/quote/service.ts +++ b/packages/backend/src/open_payments/quote/service.ts @@ -21,6 +21,7 @@ import { PaymentMethodHandlerErrorCode } from '../../payment-method/handler/errors' import { TelemetryService } from '../../telemetry/service' +import { AssetService } from '../../asset/service' const MAX_INT64 = BigInt('9223372036854775807') @@ -35,6 +36,7 @@ export interface ServiceDependencies extends BaseService { walletAddressService: WalletAddressService feeService: FeeService paymentMethodHandlerService: PaymentMethodHandlerService + assetService: AssetService telemetry?: TelemetryService } @@ -56,9 +58,11 @@ async function getQuote( deps: ServiceDependencies, options: GetOptions ): Promise { - return Quote.query(deps.knex) + const quote = await Quote.query(deps.knex) .get(options) - .withGraphFetched('[asset, fee, walletAddress]') + .withGraphFetched('[fee, walletAddress]') + await deps.assetService.setOn(quote) + return quote } interface QuoteOptionsBase { @@ -176,7 +180,9 @@ async function createQuote( feeId: sendingFee?.id, estimatedExchangeRate: quote.estimatedExchangeRate }) - .withGraphFetched('[asset, fee, walletAddress]') + .withGraphFetched('[fee, walletAddress]') + + await deps.assetService.setOn(createdQuote) return await finalizeQuote( { @@ -397,7 +403,13 @@ async function getWalletAddressPage( deps: ServiceDependencies, options: ListOptions ): Promise { - return await Quote.query(deps.knex) + const quotes = await Quote.query(deps.knex) .list(options) - .withGraphFetched('[asset, fee, walletAddress]') + .withGraphFetched('[fee, walletAddress]') + if (quotes && quotes.length) { + for (const quote of quotes) { + await deps.assetService.setOn(quote) + } + } + return quotes } diff --git a/packages/backend/src/open_payments/wallet_address/model.ts b/packages/backend/src/open_payments/wallet_address/model.ts index 81dd603a1d..bd8c0db2be 100644 --- a/packages/backend/src/open_payments/wallet_address/model.ts +++ b/packages/backend/src/open_payments/wallet_address/model.ts @@ -67,10 +67,11 @@ export class WalletAddress public async onCredit({ totalReceived, - withdrawalThrottleDelay + withdrawalThrottleDelay, + fetchAssetService }: OnCreditOptions): Promise { if (this.asset.withdrawalThreshold !== null) { - const walletAddress = await WalletAddress.query() + let walletAddressQuery = WalletAddress.query() .patchAndFetchById(this.id, { processAt: new Date() }) @@ -78,10 +79,16 @@ export class WalletAddress 'totalEventsAmount', totalReceived - this.asset.withdrawalThreshold ]) - .withGraphFetched('asset') - if (walletAddress) { - return walletAddress + if (!fetchAssetService) { + //TODO console.log('JASON: lets do the query please: ', fetchAssetService) + walletAddressQuery = walletAddressQuery.withGraphFetched('asset') + } else { + //TODO console.log('JASON: No Need!: ', fetchAssetService) } + const walletAddress = await walletAddressQuery + if (fetchAssetService) + await (await fetchAssetService()).setOn(walletAddress) + if (walletAddress) return walletAddress } if (withdrawalThrottleDelay !== undefined && !this.processAt) { await this.$query().patch({ diff --git a/packages/backend/src/open_payments/wallet_address/service.test.ts b/packages/backend/src/open_payments/wallet_address/service.test.ts index 40ce06f726..637cd6e81b 100644 --- a/packages/backend/src/open_payments/wallet_address/service.test.ts +++ b/packages/backend/src/open_payments/wallet_address/service.test.ts @@ -581,7 +581,8 @@ describe('Open Payments Wallet Address Service', (): void => { await expect( walletAddress.onCredit({ totalReceived: totalEventsAmount + BigInt(1), - withdrawalThrottleDelay + withdrawalThrottleDelay, + fetchAssetService: undefined }) ).resolves.toMatchObject({ processAt: startingProcessAt || delayProcessAt diff --git a/packages/backend/src/open_payments/wallet_address/service.ts b/packages/backend/src/open_payments/wallet_address/service.ts index 3a62c5e865..13c4813760 100644 --- a/packages/backend/src/open_payments/wallet_address/service.ts +++ b/packages/backend/src/open_payments/wallet_address/service.ts @@ -25,6 +25,7 @@ import { Pagination, SortOrder } from '../../shared/baseModel' import { WebhookService } from '../../webhook/service' import { poll } from '../../shared/utils' import { WalletAddressAdditionalProperty } from './additional_property/model' +import { AssetService } from '../../asset/service' interface Options { publicName?: string @@ -77,6 +78,7 @@ interface ServiceDependencies extends BaseService { knex: TransactionOrKnex accountingService: AccountingService webhookService: WebhookService + assetService: AssetService } export async function createWalletAddressService({ @@ -84,7 +86,8 @@ export async function createWalletAddressService({ config, knex, accountingService, - webhookService + webhookService, + assetService }: ServiceDependencies): Promise { const log = logger.child({ service: 'WalletAddressService' @@ -94,7 +97,8 @@ export async function createWalletAddressService({ logger: log, knex, accountingService, - webhookService + webhookService, + assetService } return { create: (options) => createWalletAddress(deps, options), @@ -164,14 +168,16 @@ async function createWalletAddress( ? cleanAdditionalProperties(options.additionalProperties) : undefined - return await WalletAddress.query(deps.knex) - .insertGraphAndFetch({ - url: options.url, - publicName: options.publicName, - assetId: options.assetId, - additionalProperties: additionalProperties - }) - .withGraphFetched('asset') + const walletAddress = await WalletAddress.query( + deps.knex + ).insertGraphAndFetch({ + url: options.url, + publicName: options.publicName, + assetId: options.assetId, + additionalProperties: additionalProperties + }) + await deps.assetService.setOn(walletAddress) + return walletAddress } catch (err) { if (err instanceof ForeignKeyViolationError) { if (err.constraint === 'walletaddresses_assetid_foreign') { @@ -203,8 +209,8 @@ async function updateWalletAddress( const updatedWalletAddress = await walletAddress .$query(trx) .patchAndFetch(update) - .withGraphFetched('asset') .throwIfNotFound() + await deps.assetService.setOn(updatedWalletAddress) // Override all existing additional properties if new ones are provided if (additionalProperties) { @@ -239,9 +245,9 @@ async function getWalletAddress( deps: ServiceDependencies, id: string ): Promise { - return await WalletAddress.query(deps.knex) - .findById(id) - .withGraphFetched('asset') + const walletAddress = await WalletAddress.query(deps.knex).findById(id) + await deps.assetService.setOn(walletAddress) + return walletAddress } async function getWalletAdditionalProperties( @@ -295,9 +301,8 @@ async function getWalletAddressByUrl( deps: ServiceDependencies, url: string ): Promise { - const walletAddress = await WalletAddress.query(deps.knex) - .findOne({ url }) - .withGraphFetched('asset') + const walletAddress = await WalletAddress.query(deps.knex).findOne({ url }) + await deps.assetService.setOn(walletAddress) return walletAddress || undefined } @@ -306,9 +311,15 @@ async function getWalletAddressPage( pagination?: Pagination, sortOrder?: SortOrder ): Promise { - return await WalletAddress.query(deps.knex) - .getPage(pagination, sortOrder) - .withGraphFetched('asset') + const walletAddresses = await WalletAddress.query(deps.knex).getPage( + pagination, + sortOrder + ) + if (walletAddresses && walletAddresses.length) { + for (const walletAddress of walletAddresses) + await deps.assetService.setOn(walletAddress) + } + return walletAddresses } // Returns the id of the processed wallet address (if any). @@ -342,7 +353,10 @@ async function processNextWalletAddresses( // If a wallet address is locked, don't wait — just come back for it later. .skipLocked() .where('processAt', '<=', now) - .withGraphFetched('asset') + if (walletAddresses && walletAddresses.length) { + for (const walletAddress of walletAddresses) + await deps_.assetService.setOn(walletAddress) + } const deps = { ...deps_, @@ -425,10 +439,6 @@ async function deactivateOpenIncomingPaymentsByWalletAddress( .where('expiresAt', '>', expiresAt) } -export interface CreateSubresourceOptions { - walletAddressId: string -} - export interface WalletAddressSubresourceService< M extends WalletAddressSubresource > { diff --git a/packages/backend/src/payment-method/ilp/connector/index.ts b/packages/backend/src/payment-method/ilp/connector/index.ts index ae3c859b6e..b7fa506c6a 100644 --- a/packages/backend/src/payment-method/ilp/connector/index.ts +++ b/packages/backend/src/payment-method/ilp/connector/index.ts @@ -148,7 +148,7 @@ function compose( if (i <= index) return Promise.reject(new Error('next() called multiple times')) index = i - let m = middlewares[i] + const m = middlewares[i] if (i === middlewares.length) m.fn = next if (!m.fn) return Promise.resolve() try { diff --git a/packages/backend/src/payment-method/ilp/peer/service.ts b/packages/backend/src/payment-method/ilp/peer/service.ts index fc39c49a60..9e3f995d91 100644 --- a/packages/backend/src/payment-method/ilp/peer/service.ts +++ b/packages/backend/src/payment-method/ilp/peer/service.ts @@ -114,7 +114,9 @@ async function getPeer( deps: ServiceDependencies, id: string ): Promise { - return Peer.query(deps.knex).findById(id).withGraphFetched('asset') + const peer = await Peer.query(deps.knex).findById(id) + await deps.assetService.setOn(peer) + return peer } async function createPeer( @@ -131,16 +133,15 @@ async function createPeer( try { return await Peer.transaction(deps.knex, async (trx) => { - const peer = await Peer.query(trx) - .insertAndFetch({ - assetId: options.assetId, - http: options.http, - maxPacketAmount: options.maxPacketAmount, - staticIlpAddress: options.staticIlpAddress, - name: options.name, - liquidityThreshold: options.liquidityThreshold - }) - .withGraphFetched('asset') + const peer = await Peer.query(trx).insertAndFetch({ + assetId: options.assetId, + http: options.http, + maxPacketAmount: options.maxPacketAmount, + staticIlpAddress: options.staticIlpAddress, + name: options.name, + liquidityThreshold: options.liquidityThreshold + }) + await deps.assetService.setOn(peer) if (options.http?.incoming) { const err = await addIncomingHttpTokens({ @@ -232,10 +233,12 @@ async function updatePeer( throw err } } - return await Peer.query(trx) + + const peer = await Peer.query(trx) .patchAndFetchById(options.id, options) - .withGraphFetched('asset') .throwIfNotFound() + await deps.assetService.setOn(peer) + return peer }) } catch (err) { if (err instanceof NotFoundError) { @@ -314,7 +317,6 @@ async function getPeerByDestinationAddress( // for `staticIlpAddress`s in the accounts table: // new RegExp('^' + staticIlpAddress + '($|\\.)')).test(destinationAddress) const peerQuery = Peer.query(deps.knex) - .withGraphFetched('asset') .where( raw('?', [destinationAddress]), 'like', @@ -344,6 +346,7 @@ async function getPeerByDestinationAddress( } const peer = await peerQuery.first() + await deps.assetService.setOn(peer) return peer || undefined } @@ -375,18 +378,18 @@ async function getPeersPage( pagination?: Pagination, sortOrder?: SortOrder ): Promise { - return await Peer.query(deps.knex) - .getPage(pagination, sortOrder) - .withGraphFetched('asset') + const peers = await Peer.query(deps.knex).getPage(pagination, sortOrder) + if (peers && peers.length) { + for (const peer of peers) await deps.assetService.setOn(peer) + } + return peers } async function deletePeer( deps: ServiceDependencies, id: string ): Promise { - return Peer.query(deps.knex) - .withGraphFetched('asset') - .deleteById(id) - .returning('*') - .first() + const peer = await Peer.query(deps.knex).deleteById(id).returning('*').first() + await deps.assetService.setOn(peer) + return peer } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4b90f661df..b8ee2545c4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -745,21 +745,6 @@ importers: specifier: ^6.7.5 version: 6.7.5 - test/grpc-http-benchmark: - dependencies: - '@grpc/grpc-js': - specifier: ^1.11.1 - version: 1.11.1 - '@grpc/proto-loader': - specifier: ^0.7.13 - version: 0.7.13 - '@koa/router': - specifier: ^12.0.0 - version: 12.0.0 - koa: - specifier: ^2.15.3 - version: 2.15.3 - test/integration: devDependencies: '@apollo/client': @@ -4218,14 +4203,6 @@ packages: '@js-sdsl/ordered-map': 4.4.2 dev: false - /@grpc/grpc-js@1.11.1: - resolution: {integrity: sha512-gyt/WayZrVPH2w/UTLansS7F9Nwld472JxxaETamrM8HNlsa+jSLNyKAZmhxI2Me4c3mQHFiS1wWHDY1g1Kthw==} - engines: {node: '>=12.10.0'} - dependencies: - '@grpc/proto-loader': 0.7.13 - '@js-sdsl/ordered-map': 4.4.2 - dev: false - /@grpc/proto-loader@0.7.13: resolution: {integrity: sha512-AiXO/bfe9bmxBjxxtYxFAXGZvMaN5s8kO+jBHAJCON8rJoB5YS/D6X7ZNc6XQkuHNmyl4CYaMI1fJ/Gn27RGGw==} engines: {node: '>=6'} diff --git a/test/performance/scripts/create-outgoing-payments.js b/test/performance/scripts/create-outgoing-payments.js index f77009a3d8..67ddd4cb0d 100644 --- a/test/performance/scripts/create-outgoing-payments.js +++ b/test/performance/scripts/create-outgoing-payments.js @@ -2,7 +2,7 @@ import http from 'k6/http' import { fail } from 'k6' export const options = { // A number specifying the number of VUs to run concurrently. - vus: 9, + vus: 10, // A string specifying the total duration of the test run. duration: '600s' }