Skip to content

Commit

Permalink
feat(backend): add tracing, metrics, get peer tweak
Browse files Browse the repository at this point in the history
  • Loading branch information
BlairCurrey committed Aug 27, 2024
1 parent 9d2e24e commit 1b728ee
Show file tree
Hide file tree
Showing 10 changed files with 521 additions and 357 deletions.
430 changes: 308 additions & 122 deletions localenv/telemetry/grafana/provisioning/dashboards/example.json

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion packages/backend/src/accounting/psql/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,28 @@ export async function getAccountTotalSent(
deps: ServiceDependencies,
accountRef: string
): Promise<bigint | undefined> {
deps.telemetry &&
deps.telemetry.startTimer('psql_getAccountTotalSent', {
callName: 'psql_getAccountTotalSent'
})
const account = await getLiquidityAccount(deps, accountRef)

if (!account) {
deps.telemetry &&
deps.telemetry.startTimer('psql_getAccountTotalSent', {
callName: 'psql_getAccountTotalSent'
})
return
}

return (await getAccountBalances(deps, account)).debitsPosted
const totalsSent = (await getAccountBalances(deps, account)).debitsPosted

deps.telemetry &&
deps.telemetry.startTimer('psql_getAccountTotalSent', {
callName: 'psql_getAccountTotalSent'
})

return totalsSent
}

export async function getAccountsTotalSent(
Expand Down
8 changes: 8 additions & 0 deletions packages/backend/src/accounting/tigerbeetle/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
getAccountTransfers
} from './transfers'
import { toTigerBeetleId } from './utils'
import { TelemetryService } from '../../telemetry/service'

export enum TigerBeetleAccountCode {
LIQUIDITY_WEB_MONETIZATION = 1,
Expand Down Expand Up @@ -69,6 +70,7 @@ export const convertToTigerBeetleTransferCode: {
export interface ServiceDependencies extends BaseService {
tigerBeetle: Client
withdrawalThrottleDelay?: number
telemetry?: TelemetryService
}

export function createAccountingService(
Expand Down Expand Up @@ -218,10 +220,16 @@ export async function getAccountTotalSent(
deps: ServiceDependencies,
id: string
): Promise<bigint | undefined> {
deps.telemetry &&
deps.telemetry.startTimer('tb_getAccountTotalSent', {
callName: 'tb_getAccountTotalSent'
})
const account = (await getAccounts(deps, [id]))[0]
if (account) {
deps.telemetry && deps.telemetry.stopTimer('tb_getAccountTotalSent')
return account.debits_posted
}
deps.telemetry && deps.telemetry.stopTimer('tb_getAccountTotalSent')
}

export async function getAccountsTotalSent(
Expand Down
11 changes: 6 additions & 5 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ export function initIocContainer(
const logger = await deps.use('logger')
const knex = await deps.use('knex')
const config = await deps.use('config')
const telemetry = await deps.use('telemetry')

if (config.useTigerBeetle) {
container.singleton('tigerBeetle', async (deps) => {
Expand All @@ -228,17 +229,16 @@ export function initIocContainer(
logger,
knex,
tigerBeetle,
withdrawalThrottleDelay: config.withdrawalThrottleDelay
withdrawalThrottleDelay: config.withdrawalThrottleDelay,
telemetry
})
}

return createPsqlAccountingService({
logger,
knex,
withdrawalThrottleDelay: config.withdrawalThrottleDelay,
telemetry: config.enableTelemetry
? await deps.use('telemetry')
: undefined
telemetry
})
})
container.singleton('peerService', async (deps) => {
Expand Down Expand Up @@ -346,7 +346,8 @@ export function initIocContainer(
walletAddressService: await deps.use('walletAddressService'),
remoteIncomingPaymentService: await deps.use(
'remoteIncomingPaymentService'
)
),
telemetry: deps.use('telemetry') ? await deps.use('telemetry') : undefined
})
})

Expand Down
37 changes: 25 additions & 12 deletions packages/backend/src/open_payments/payment/outgoing/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,24 @@ export async function handleSending(
throw LifecycleError.BadState
}

const payStartTime = Date.now()
await deps.paymentMethodHandlerService.pay('ILP', {
receiver,
outgoingPayment: payment,
finalDebitAmount: maxDebitAmount,
finalReceiveAmount: maxReceiveAmount
})
const payEndTime = Date.now()
deps.telemetry &&
deps.telemetry.startTimer('ilp_pay_time_ms', {
description: 'Time to complete an ILP payment',
callName: 'paymentMethodHandlerService.pay (ILP)'
}),
await deps.paymentMethodHandlerService.pay('ILP', {
receiver,
outgoingPayment: payment,
finalDebitAmount: maxDebitAmount,
finalReceiveAmount: maxReceiveAmount
})

if (deps.telemetry) {
const payDuration = payEndTime - payStartTime
await Promise.all([
deps.telemetry.incrementCounter('transactions_total', 1, {
description: 'Count of funded transactions'
}),
deps.telemetry.recordHistogram('ilp_pay_time_ms', payDuration, {
description: 'Time to complete an ILP payment'
}),
deps.telemetry.stopTimer('ilp_pay_time_ms'),
deps.telemetry.incrementCounterWithTransactionAmountDifference(
'transaction_fee_amounts',
payment.sentAmount,
Expand Down Expand Up @@ -144,17 +144,24 @@ export async function handleFailed(
payment: OutgoingPayment,
error: string
): Promise<void> {
deps.telemetry &&
deps.telemetry.startTimer('handleFailed', { callName: 'handleFailed ' })
await payment.$query(deps.knex).patch({
state: OutgoingPaymentState.Failed,
error
})
await sendWebhookEvent(deps, payment, OutgoingPaymentEventType.PaymentFailed)
deps.telemetry && deps.telemetry.startTimer('handleFailed')
}

async function handleCompleted(
deps: ServiceDependencies,
payment: OutgoingPayment
): Promise<void> {
deps.telemetry &&
deps.telemetry.startTimer('handleCompleted', {
callName: 'handleCompleted'
})
await payment.$query(deps.knex).patch({
state: OutgoingPaymentState.Completed
})
Expand All @@ -164,6 +171,7 @@ async function handleCompleted(
payment,
OutgoingPaymentEventType.PaymentCompleted
)
deps.telemetry && deps.telemetry.stopTimer('handleCompleted')
}

export async function sendWebhookEvent(
Expand All @@ -172,6 +180,10 @@ export async function sendWebhookEvent(
type: OutgoingPaymentEventType,
trx?: TransactionOrKnex
): Promise<void> {
deps.telemetry &&
deps.telemetry.startTimer('sendWebhookEvent', {
callName: 'outgoingPaymentLifecycle_sendwebhookEvent'
})
// TigerBeetle accounts are only created as the OutgoingPayment is funded.
// So default the amountSent and balance to 0 for outgoing payments still in the funding state
const amountSent =
Expand Down Expand Up @@ -201,6 +213,7 @@ export async function sendWebhookEvent(
data: payment.toData({ amountSent, balance }),
withdrawal
})
deps.telemetry && deps.telemetry.stopTimer('sendWebhookEvent')
}

function validateAssets(
Expand Down
13 changes: 11 additions & 2 deletions packages/backend/src/open_payments/receiver/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
errorToMessage as receiverErrorToMessage
} from './errors'
import { isRemoteIncomingPaymentError } from '../payment/incoming_remote/errors'
import { TelemetryService } from '../../telemetry/service'

interface CreateReceiverArgs {
walletAddressUrl: string
Expand All @@ -33,6 +34,7 @@ export interface ServiceDependencies extends BaseService {
incomingPaymentService: IncomingPaymentService
walletAddressService: WalletAddressService
remoteIncomingPaymentService: RemoteIncomingPaymentService
telemetry?: TelemetryService
}

const INCOMING_PAYMENT_URL_REGEX =
Expand Down Expand Up @@ -136,21 +138,28 @@ async function getReceiver(
deps: ServiceDependencies,
url: string
): Promise<Receiver | undefined> {
deps.telemetry &&
deps.telemetry.startTimer('getReceiver', { callName: 'getReceiver' })
try {
const localIncomingPayment = await getLocalIncomingPayment(deps, url)
if (localIncomingPayment) {
return new Receiver(localIncomingPayment, true)
const receiver = new Receiver(localIncomingPayment, true)
deps.telemetry && deps.telemetry.stopTimer('getReceiver')
return receiver
}

const remoteIncomingPayment = await getRemoteIncomingPayment(deps, url)
if (remoteIncomingPayment) {
return new Receiver(remoteIncomingPayment, false)
const receiver = new Receiver(remoteIncomingPayment, false)
deps.telemetry && deps.telemetry.stopTimer('getReceiver')
return receiver
}
} catch (err) {
deps.logger.error(
{ errorMessage: err instanceof Error && err.message },
'Could not get incoming payment'
)
deps.telemetry && deps.telemetry.stopTimer('getReceiver')
}
}

Expand Down
109 changes: 82 additions & 27 deletions packages/backend/src/payment-method/ilp/connector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,49 +68,104 @@ export async function createConnectorService({
streamServer,
telemetry
},
compose([
// Incoming Rules
createIncomingErrorHandlerMiddleware(ilpAddress),
createStreamAddressMiddleware(),
createAccountMiddleware(ilpAddress),
createIncomingMaxPacketAmountMiddleware(),
createIncomingRateLimitMiddleware({}),
createIncomingThroughputMiddleware(),
createIldcpMiddleware(ilpAddress),
compose(
[
// Incoming Rules
{
name: 'createIncomingErrorHandlerMiddleware',
fn: createIncomingErrorHandlerMiddleware(ilpAddress)
},
{
name: 'createStreamAddressMiddleware',
fn: createStreamAddressMiddleware()
},
{
name: 'createAccountMiddleware',
fn: createAccountMiddleware(ilpAddress)
},
{
name: 'createIncomingMaxPacketAmountMiddleware',
fn: createIncomingMaxPacketAmountMiddleware()
},
{
name: 'createIncomingRateLimitMiddleware',
fn: createIncomingRateLimitMiddleware({})
},
{
name: 'createIncomingThroughputMiddleware',
fn: createIncomingThroughputMiddleware()
},
{
name: 'createIldcpMiddleware',
fn: createIldcpMiddleware(ilpAddress)
},

// Local pay
createBalanceMiddleware(),
// Local pay
{ name: 'createBalanceMiddleware', fn: createBalanceMiddleware() },

// Outgoing Rules
createStreamController(),
createOutgoingThroughputMiddleware(),
createOutgoingReduceExpiryMiddleware({}),
createOutgoingExpireMiddleware(),
createOutgoingValidateFulfillmentMiddleware(),
// Outgoing Rules
{ name: 'createStreamController', fn: createStreamController() },
{
name: 'createOutgoingThroughputMiddleware',
fn: createOutgoingThroughputMiddleware()
},
{
name: 'createOutgoingReduceExpiryMiddleware',
fn: createOutgoingReduceExpiryMiddleware({})
},
{
name: 'createOutgoingExpireMiddleware',
fn: createOutgoingExpireMiddleware()
},
{
name: 'createOutgoingValidateFulfillmentMiddleware',
fn: createOutgoingValidateFulfillmentMiddleware()
},

// Send outgoing packets
createClientController()
])
// Send outgoing packets
{ name: 'createClientController', fn: createClientController() }
],
telemetry
)
)
}

// Adapted from koa-compose
function compose(middlewares: ILPMiddleware[]): ILPMiddleware {
function compose(
middlewares: { name: string; fn: ILPMiddleware }[],
telemetry: TelemetryService | undefined
): ILPMiddleware {
return function (ctx: ILPContext, next: () => Promise<void>): Promise<void> {
// last called middleware
let index = -1
return (function dispatch(i: number): Promise<void> {

telemetry &&
telemetry.startTimer('connector_middleware_stack', {
callName: 'connector_middleware_stack'
})

async function dispatch(i: number): Promise<void> {
if (i <= index)
return Promise.reject(new Error('next() called multiple times'))
index = i
let fn = middlewares[i]
if (i === middlewares.length) fn = next
if (!fn) return Promise.resolve()
let m = middlewares[i]
if (i === middlewares.length) m.fn = next
if (!m.fn) return Promise.resolve()
try {
return Promise.resolve(fn(ctx, dispatch.bind(null, i + 1)))
telemetry &&
telemetry.startTimer('connector_middleware', {
callName: m.name
})
const p = Promise.resolve(m.fn(ctx, dispatch.bind(null, i + 1)))
telemetry && telemetry.stopTimer('connector_middleware')
return p
} catch (err) {
return Promise.reject(err)
}
})(0)
}

return dispatch(0).finally(() => {
telemetry && telemetry.stopTimer('connector_middleware_stack')
})
}
}
2 changes: 1 addition & 1 deletion packages/backend/src/payment-method/ilp/peer/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ async function getPeerByDestinationAddress(
// for `staticIlpAddress`s in the accounts table:
// new RegExp('^' + staticIlpAddress + '($|\\.)')).test(destinationAddress)
const peerQuery = Peer.query(deps.knex)
.withGraphJoined('asset')
.withGraphFetched('asset')
.where(
raw('?', [destinationAddress]),
'like',
Expand Down
Loading

0 comments on commit 1b728ee

Please sign in to comment.