Skip to content

Commit

Permalink
feat(2981): address review comments from @mkurapov. Enhancements.
Browse files Browse the repository at this point in the history
  • Loading branch information
koekiebox committed Nov 26, 2024
1 parent e0a0aed commit 285b2ef
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 90 deletions.
77 changes: 67 additions & 10 deletions packages/backend/src/open_payments/payment/incoming/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,16 @@ async function getIncomingPayment(
): Promise<IncomingPayment | undefined> {
const incomingPayment = await IncomingPayment.query(deps.knex)
.get(options)
.withGraphFetched('[asset, walletAddress]')
if (incomingPayment) return await addReceivedAmount(deps, incomingPayment)
if (incomingPayment) {
const asset = await deps.assetService.get(incomingPayment.assetId)
if (asset) incomingPayment.asset = asset

incomingPayment.walletAddress = await deps.walletAddressService.get(
incomingPayment.walletAddressId
)

return await addReceivedAmount(deps, incomingPayment)
}
else return
}

Expand Down Expand Up @@ -165,7 +173,13 @@ async function createIncomingPayment(
state: IncomingPaymentState.Pending,
processAt: expiresAt
})
.withGraphFetched('[asset, walletAddress]')

const asset = await deps.assetService.get(incomingPayment.assetId)
if (asset) incomingPayment.asset = asset

incomingPayment.walletAddress = await deps.walletAddressService.get(
incomingPayment.walletAddressId
)

await IncomingPaymentEvent.query(trx || deps.knex).insert({
incomingPaymentId: incomingPayment.id,
Expand Down Expand Up @@ -217,11 +231,19 @@ async function getApprovedOrCanceledIncomingPayment(
deps: ServiceDependencies,
options: GetOptions
) {
return IncomingPayment.query(deps.knex)
const incomingPayment = await IncomingPayment.query(deps.knex)
.get(options)
.withGraphFetched('[asset, walletAddress]')
.whereNotNull('approvedAt')
.orWhereNotNull('cancelledAt')
if (incomingPayment) {
const asset = await deps.assetService.get(incomingPayment.assetId)
if (asset) incomingPayment.asset = asset

incomingPayment.walletAddress = await deps.walletAddressService.get(
incomingPayment.walletAddressId
)
}
return incomingPayment
}

// Fetch (and lock) an incoming payment for work.
Expand All @@ -238,11 +260,17 @@ async function processNextIncomingPayment(
// If an incoming payment is locked, don't wait — just come back for it later.
.skipLocked()
.where('processAt', '<=', now)
.withGraphFetched('[asset, walletAddress]')

const incomingPayment = incomingPayments[0]
if (!incomingPayment) return

const asset = await deps_.assetService.get(incomingPayment.assetId)
if (asset) incomingPayment.asset = asset

incomingPayment.walletAddress = await deps_.walletAddressService.get(
incomingPayment.walletAddressId
)

const deps = {
...deps_,
knex: trx,
Expand Down Expand Up @@ -338,7 +366,15 @@ async function getWalletAddressPage(
): Promise<IncomingPayment[]> {
const page = await IncomingPayment.query(deps.knex)
.list(options)
.withGraphFetched('[asset, walletAddress]')
for (const payment of page) {
const asset = await deps.assetService.get(payment.assetId)
if (asset) payment.asset = asset

payment.walletAddress = await deps.walletAddressService.get(
payment.walletAddressId
)
}

const amounts = await deps.accountingService.getAccountsTotalReceived(
page.map((payment: IncomingPayment) => payment.id)
)
Expand Down Expand Up @@ -371,9 +407,16 @@ async function approveIncomingPayment(
const payment = await IncomingPayment.query(trx)
.findById(id)
.forUpdate()
.withGraphFetched('[asset, walletAddress]')

if (!payment) return IncomingPaymentError.UnknownPayment

const asset = await deps.assetService.get(payment.assetId)
if (asset) payment.asset = asset

payment.walletAddress = await deps.walletAddressService.get(
payment.walletAddressId
)

if (payment.state !== IncomingPaymentState.Pending)
return IncomingPaymentError.WrongState

Expand Down Expand Up @@ -403,9 +446,16 @@ async function cancelIncomingPayment(
const payment = await IncomingPayment.query(trx)
.findById(id)
.forUpdate()
.withGraphFetched('[asset, walletAddress]')

if (!payment) return IncomingPaymentError.UnknownPayment

const asset = await deps.assetService.get(payment.assetId)
if (asset) payment.asset = asset

payment.walletAddress = await deps.walletAddressService.get(
payment.walletAddressId
)

if (payment.state !== IncomingPaymentState.Pending)
return IncomingPaymentError.WrongState

Expand Down Expand Up @@ -435,8 +485,15 @@ async function completeIncomingPayment(
const payment = await IncomingPayment.query(trx)
.findById(id)
.forUpdate()
.withGraphFetched('[asset, walletAddress]')
if (!payment) return IncomingPaymentError.UnknownPayment

const asset = await deps.assetService.get(payment.assetId)
if (asset) payment.asset = asset

payment.walletAddress = await deps.walletAddressService.get(
payment.walletAddressId
)

if (
![IncomingPaymentState.Pending, IncomingPaymentState.Processing].includes(
payment.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async function cancelOutgoingPayment(
...(options.reason ? { cancellationReason: options.reason } : {})
}
})
.withGraphFetched('[quote]')
.withGraphFetched('quote')
const asset = await deps.assetService.get(payment.quote.assetId)
if (asset) payment.quote.asset = asset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ describe('Open Payments Wallet Address Service', (): void => {
).resolves.toBeUndefined()
})

test('Creating wallet address with case intensiveness', async (): Promise<void> => {
test('Creating wallet address with case insensitiveness', async (): Promise<void> => {
const url = 'https://Alice.me/pay'
await expect(
walletAddressService.create({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,100 +24,105 @@ export function createBalanceMiddleware(): ILPMiddleware {
const stopTimer = services.telemetry.startTimer('balance_middleware_next', {
callName: 'balanceMiddleware:next'
})
try {
const { amount } = request.prepare
const logger = services.logger.child(
{ module: 'balance-middleware' },
const { amount } = request.prepare
const logger = services.logger.child(
{ module: 'balance-middleware' },
{
redact: ['transferOptions.destinationAccount.http.outgoing.authToken']
}
)

// Ignore zero amount packets
if (amount === '0') {
await next()
stopTimer()
return
}

const sourceAmount = BigInt(amount)
const destinationAmountOrError = await services.rates.convertSource({
sourceAmount,
sourceAsset: accounts.incoming.asset,
destinationAsset: accounts.outgoing.asset
})
if (isConvertError(destinationAmountOrError)) {
logger.error(
{
redact: ['transferOptions.destinationAccount.http.outgoing.authToken']
}
amount,
destinationAmountOrError,
sourceAsset: accounts.incoming.asset,
destinationAsset: accounts.outgoing.asset
},
'Could not get rates'
)
throw new CannotReceiveError(
`Exchange rate error: ${destinationAmountOrError}`
)
}
const { amount: destinationAmount } = destinationAmountOrError

// Ignore zero amount packets
if (amount === '0') {
await next()
return
}
request.prepare.amount = destinationAmount.toString()

const sourceAmount = BigInt(amount)
const destinationAmountOrError = await services.rates.convertSource({
if (state.unfulfillable) {
await next()
stopTimer()
return
}

// Update balances on prepare:
const createPendingTransfer = async (): Promise<
Transaction | undefined
> => {
const transferOptions = {
sourceAccount: accounts.incoming,
destinationAccount: accounts.outgoing,
sourceAmount,
sourceAsset: accounts.incoming.asset,
destinationAsset: accounts.outgoing.asset
})
if (isConvertError(destinationAmountOrError)) {
destinationAmount,
transferType: TransferType.TRANSFER,
timeout: AppConfig.tigerBeetleTwoPhaseTimeout
}
const trxOrError =
await services.accounting.createTransfer(transferOptions)
if (isTransferError(trxOrError)) {
logger.error(
{
amount,
destinationAmountOrError,
sourceAsset: accounts.incoming.asset,
destinationAsset: accounts.outgoing.asset
},
'Could not get rates'
)
throw new CannotReceiveError(
`Exchange rate error: ${destinationAmountOrError}`
{ transferOptions, transferError: trxOrError },
'Could not create transfer'
)
switch (trxOrError) {
case TransferError.InsufficientBalance:
case TransferError.InsufficientLiquidity:
throw new InsufficientLiquidityError(trxOrError)
default:
// TODO: map transfer errors to ILP errors
ctxThrow(500, destinationAmountOrError.toString())
}
} else {
stopTimer()
return trxOrError
}
const { amount: destinationAmount } = destinationAmountOrError
}

request.prepare.amount = destinationAmount.toString()
if (state.streamDestination) {
await next()
stopTimer()
}

if (!state.streamDestination || response.fulfill) {
// TODO: make this single-phase if streamDestination === true
const trx = await createPendingTransfer()

if (state.unfulfillable) {
if (!state.streamDestination) {
await next()
return
stopTimer()
}

// Update balances on prepare:
const createPendingTransfer = async (): Promise<
Transaction | undefined
> => {
const transferOptions = {
sourceAccount: accounts.incoming,
destinationAccount: accounts.outgoing,
sourceAmount,
destinationAmount,
transferType: TransferType.TRANSFER,
timeout: AppConfig.tigerBeetleTwoPhaseTimeout
}
const trxOrError =
await services.accounting.createTransfer(transferOptions)
if (isTransferError(trxOrError)) {
logger.error(
{ transferOptions, transferError: trxOrError },
'Could not create transfer'
)
switch (trxOrError) {
case TransferError.InsufficientBalance:
case TransferError.InsufficientLiquidity:
throw new InsufficientLiquidityError(trxOrError)
default:
// TODO: map transfer errors to ILP errors
ctxThrow(500, destinationAmountOrError.toString())
}
if (trx) {
if (response.fulfill) {
await trx.post()
} else {
return trxOrError
await trx.void()
}
}

if (state.streamDestination) await next()

if (!state.streamDestination || response.fulfill) {
// TODO: make this single-phase if streamDestination === true
const trx = await createPendingTransfer()

if (!state.streamDestination) await next()

if (trx) {
if (response.fulfill) {
await trx.post()
} else {
await trx.void()
}
}
}
} finally {
stopTimer()
}
}
}

0 comments on commit 285b2ef

Please sign in to comment.