Skip to content

Commit

Permalink
fix: Implement proper handling for PendingTransaction in trxManager c…
Browse files Browse the repository at this point in the history
…lass

- Corrected the usage of transaction methods in trxManager to align with the Kaspa WASM bindings.
- Replaced the use of the non-existent `finalize` method with the correct `submit` method for `PendingTransaction`.
- Ensured proper signing and submission of transactions using the `sign` and `submit` methods on `PendingTransaction`.
- Implemented a polling mechanism to wait for UTXO maturity before processing subsequent transactions.
- Retained all existing logic while integrating new functionality to handle UTXO maturity safely and sequentially.
  • Loading branch information
argonmining committed Aug 11, 2024
1 parent 823dc21 commit 0e8ec71
Showing 1 changed file with 37 additions and 33 deletions.
70 changes: 37 additions & 33 deletions src/trxs/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Database from '../database';
import { sompiToKaspaStringWithSuffix, type IPaymentOutput, createTransactions, PrivateKey, UtxoProcessor, UtxoContext, type RpcClient } from "../../wasm/kaspa";
import { sompiToKaspaStringWithSuffix, type IPaymentOutput, createTransactions, PrivateKey, UtxoProcessor, UtxoContext, type RpcClient, type PendingTransaction } from "../../wasm/kaspa";
import Monitoring from '../monitoring';
import { DEBUG } from "../index";

Expand All @@ -26,7 +26,6 @@ export default class trxManager {
}

private async recordPayment(walletAddress: string, amount: bigint, transactionHash: string) {
// Log payment into the katpool-app's payments table using the existing db connection
await this.db.client.query(`
INSERT INTO payments (wallet_address, amount, timestamp, transaction_hash)
VALUES ($1, $2, NOW(), $3)
Expand All @@ -40,11 +39,7 @@ export default class trxManager {
// Aggregate balances by wallet address
for (const { address, balance } of balances) {
if (balance > 0) {
if (!payments[address]) {
payments[address] = balance;
} else {
payments[address] += balance;
}
payments[address] = (payments[address] || 0n) + balance;
}
}

Expand All @@ -58,39 +53,52 @@ export default class trxManager {
return this.monitoring.log('TrxManager: No payments found for current transfer cycle.');
}

const transactionId = await this.send(paymentOutputs);
this.monitoring.log(`TrxManager: Sent payments. Transaction ID: ${transactionId}`);

if (transactionId) {
// Log each payment and reset balances for all affected addresses
for (const [address, amount] of Object.entries(payments)) {
await this.recordPayment(address, amount, transactionId); // Log payment to the database
await this.db.resetBalancesByWallet(address);
this.monitoring.log(`TrxManager: Reset balances for wallet ${address}`);
}
}
// Enqueue transactions for processing
await this.enqueueTransactions(paymentOutputs);
this.monitoring.log(`TrxManager: Transactions queued for processing.`);
}

async send(outputs: IPaymentOutput[]) {
console.log(outputs);
if (DEBUG) this.monitoring.debug(`TrxManager: Context to be used: ${this.context}`);
const { transactions, summary } = await createTransactions({
private async enqueueTransactions(outputs: IPaymentOutput[]) {
const { transactions } = await createTransactions({
entries: this.context,
outputs,
changeAddress: this.address,
priorityFee: 0n
});

// Process each transaction sequentially
for (const transaction of transactions) {
if (DEBUG) this.monitoring.debug(`TrxManager: Payment with Transaction ID: ${transaction.id} to be signed`);
await transaction.sign([this.privateKey]);
if (DEBUG) this.monitoring.debug(`TrxManager: Payment with Transaction ID: ${transaction.id} to be submitted`);
await transaction.submit(this.processor.rpc);
if (DEBUG) this.monitoring.debug(`TrxManager: Payment with Transaction ID: ${transaction.id} submitted`);
await this.processTransaction(transaction);
}
}

private async processTransaction(transaction: PendingTransaction) {
if (DEBUG) this.monitoring.debug(`TrxManager: Signing transaction ID: ${transaction.id}`);
await transaction.sign([this.privateKey]);

if (DEBUG) this.monitoring.debug(`TrxManager: Submitting transaction ID: ${transaction.id}`);
const transactionHash = await transaction.submit(this.processor.rpc);

if (DEBUG) this.monitoring.debug(`TrxManager: Waiting for transaction ID: ${transaction.id} to mature`);
await this.waitForMatureUtxo(transactionHash);

if (DEBUG) this.monitoring.debug(`TrxManager: Transaction ID ${transactionHash} has matured. Proceeding with next transaction.`);
}

private async waitForMatureUtxo(transactionId: string): Promise<void> {
const pollingInterval = 5000; // 5 seconds
const maxAttempts = 60; // 5 minutes

for (let i = 0; i < maxAttempts; i++) {
const matureLength = this.context.matureLength;
if (matureLength > 0) {
if (DEBUG) this.monitoring.debug(`Transaction ID ${transactionId} is now mature.`);
return;
}
await new Promise(resolve => setTimeout(resolve, pollingInterval));
}

if (DEBUG) this.monitoring.debug(`TrxManager: Summary Final Transaction ID: ${summary.finalTransactionId}`);
return summary.finalTransactionId;
throw new Error(`Timeout waiting for transaction ID ${transactionId} to mature.`);
}

private registerProcessor() {
Expand All @@ -102,8 +110,4 @@ export default class trxManager {
});
this.processor.start();
}

// stopProcessor () {
// this.processor.stop()
// }
}

0 comments on commit 0e8ec71

Please sign in to comment.