From 0e8ec71f3170730ec6ae074dacbb1e816c84873e Mon Sep 17 00:00:00 2001 From: argonmining Date: Sun, 11 Aug 2024 14:32:23 -0500 Subject: [PATCH] fix: Implement proper handling for PendingTransaction in trxManager class - Corrected the usage of transaction methods in trxManager to align with the Kaspa WASM bindings. - Replaced the use of the non-existent `finalize` method with the correct `submit` method for `PendingTransaction`. - Ensured proper signing and submission of transactions using the `sign` and `submit` methods on `PendingTransaction`. - Implemented a polling mechanism to wait for UTXO maturity before processing subsequent transactions. - Retained all existing logic while integrating new functionality to handle UTXO maturity safely and sequentially. --- src/trxs/index.ts | 70 +++++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/src/trxs/index.ts b/src/trxs/index.ts index c3c4470..d892841 100644 --- a/src/trxs/index.ts +++ b/src/trxs/index.ts @@ -1,5 +1,5 @@ import Database from '../database'; -import { sompiToKaspaStringWithSuffix, type IPaymentOutput, createTransactions, PrivateKey, UtxoProcessor, UtxoContext, type RpcClient } from "../../wasm/kaspa"; +import { sompiToKaspaStringWithSuffix, type IPaymentOutput, createTransactions, PrivateKey, UtxoProcessor, UtxoContext, type RpcClient, type PendingTransaction } from "../../wasm/kaspa"; import Monitoring from '../monitoring'; import { DEBUG } from "../index"; @@ -26,7 +26,6 @@ export default class trxManager { } private async recordPayment(walletAddress: string, amount: bigint, transactionHash: string) { - // Log payment into the katpool-app's payments table using the existing db connection await this.db.client.query(` INSERT INTO payments (wallet_address, amount, timestamp, transaction_hash) VALUES ($1, $2, NOW(), $3) @@ -40,11 +39,7 @@ export default class trxManager { // Aggregate balances by wallet address for (const { address, balance } of balances) { if (balance > 0) { - if (!payments[address]) { - payments[address] = balance; - } else { - payments[address] += balance; - } + payments[address] = (payments[address] || 0n) + balance; } } @@ -58,39 +53,52 @@ export default class trxManager { return this.monitoring.log('TrxManager: No payments found for current transfer cycle.'); } - const transactionId = await this.send(paymentOutputs); - this.monitoring.log(`TrxManager: Sent payments. Transaction ID: ${transactionId}`); - - if (transactionId) { - // Log each payment and reset balances for all affected addresses - for (const [address, amount] of Object.entries(payments)) { - await this.recordPayment(address, amount, transactionId); // Log payment to the database - await this.db.resetBalancesByWallet(address); - this.monitoring.log(`TrxManager: Reset balances for wallet ${address}`); - } - } + // Enqueue transactions for processing + await this.enqueueTransactions(paymentOutputs); + this.monitoring.log(`TrxManager: Transactions queued for processing.`); } - async send(outputs: IPaymentOutput[]) { - console.log(outputs); - if (DEBUG) this.monitoring.debug(`TrxManager: Context to be used: ${this.context}`); - const { transactions, summary } = await createTransactions({ + private async enqueueTransactions(outputs: IPaymentOutput[]) { + const { transactions } = await createTransactions({ entries: this.context, outputs, changeAddress: this.address, priorityFee: 0n }); + // Process each transaction sequentially for (const transaction of transactions) { - if (DEBUG) this.monitoring.debug(`TrxManager: Payment with Transaction ID: ${transaction.id} to be signed`); - await transaction.sign([this.privateKey]); - if (DEBUG) this.monitoring.debug(`TrxManager: Payment with Transaction ID: ${transaction.id} to be submitted`); - await transaction.submit(this.processor.rpc); - if (DEBUG) this.monitoring.debug(`TrxManager: Payment with Transaction ID: ${transaction.id} submitted`); + await this.processTransaction(transaction); + } + } + + private async processTransaction(transaction: PendingTransaction) { + if (DEBUG) this.monitoring.debug(`TrxManager: Signing transaction ID: ${transaction.id}`); + await transaction.sign([this.privateKey]); + + if (DEBUG) this.monitoring.debug(`TrxManager: Submitting transaction ID: ${transaction.id}`); + const transactionHash = await transaction.submit(this.processor.rpc); + + if (DEBUG) this.monitoring.debug(`TrxManager: Waiting for transaction ID: ${transaction.id} to mature`); + await this.waitForMatureUtxo(transactionHash); + + if (DEBUG) this.monitoring.debug(`TrxManager: Transaction ID ${transactionHash} has matured. Proceeding with next transaction.`); + } + + private async waitForMatureUtxo(transactionId: string): Promise { + const pollingInterval = 5000; // 5 seconds + const maxAttempts = 60; // 5 minutes + + for (let i = 0; i < maxAttempts; i++) { + const matureLength = this.context.matureLength; + if (matureLength > 0) { + if (DEBUG) this.monitoring.debug(`Transaction ID ${transactionId} is now mature.`); + return; + } + await new Promise(resolve => setTimeout(resolve, pollingInterval)); } - if (DEBUG) this.monitoring.debug(`TrxManager: Summary Final Transaction ID: ${summary.finalTransactionId}`); - return summary.finalTransactionId; + throw new Error(`Timeout waiting for transaction ID ${transactionId} to mature.`); } private registerProcessor() { @@ -102,8 +110,4 @@ export default class trxManager { }); this.processor.start(); } - - // stopProcessor () { - // this.processor.stop() - // } }