From b81bb07065af79cf8c41b1525948a70c5fe804b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakob=20Pov=C5=A1i=C4=8D?= Date: Fri, 8 Dec 2023 13:03:18 +0100 Subject: [PATCH 1/3] Update configuration and add new methods --- apps/alpha-liquidator/src/config.ts | 6 +- apps/alpha-liquidator/src/liquidator.ts | 112 ++++++++++++++++++++-- apps/alpha-liquidator/src/utils/chunks.ts | 108 +++++++++++++-------- packages/marginfi-client-v2/src/client.ts | 32 ++++++- 4 files changed, 210 insertions(+), 48 deletions(-) diff --git a/apps/alpha-liquidator/src/config.ts b/apps/alpha-liquidator/src/config.ts index b842f51a1d..d79eb19df7 100644 --- a/apps/alpha-liquidator/src/config.ts +++ b/apps/alpha-liquidator/src/config.ts @@ -31,6 +31,7 @@ if (!process.env.RPC_ENDPOINT) { /*eslint sort-keys: "error"*/ let envSchema = z.object({ ACCOUNT_COOL_DOWN_SECONDS: z.string().default("120").transform((s) => parseInt(s, 10)), + ACCOUNT_REFRESH_INTERVAL_SECONDS: z.string().default("600").transform((s) => parseInt(s, 10)), EXCLUDE_ISOLATED_BANKS: z.string().optional().default("false").transform((s) => s === "true" || s === "1"), IS_DEV: z .string() @@ -67,9 +68,9 @@ let envSchema = z.object({ .default("false") .transform((s) => s === "true" || s === "1"), SENTRY_DSN: z.string().optional(), - SLEEP_INTERVAL: z + SLEEP_INTERVAL_SECONDS: z .string() - .default("10000") + .default("5") .transform((s) => parseInt(s, 10)), SORT_ACCOUNTS_MODE: z .string() @@ -83,6 +84,7 @@ let envSchema = z.object({ return Keypair.fromSecretKey(new Uint8Array(JSON.parse(keypairStr))); } }), + WS_RESET_INTERVAL_SECONDS: z.string().optional().default("300").transform((s) => parseInt(s, 10)), }); type EnvSchema = z.infer; diff --git a/apps/alpha-liquidator/src/liquidator.ts b/apps/alpha-liquidator/src/liquidator.ts index 2fcdc0fa02..035779fe71 100644 --- a/apps/alpha-liquidator/src/liquidator.ts +++ b/apps/alpha-liquidator/src/liquidator.ts @@ -1,4 +1,4 @@ -import { Connection, LAMPORTS_PER_SOL, PublicKey, VersionedTransaction } from "@solana/web3.js"; +import { AccountInfo, Connection, LAMPORTS_PER_SOL, PublicKey, VersionedTransaction } from "@solana/web3.js"; import { MarginRequirementType, MarginfiAccount, @@ -15,6 +15,7 @@ import { captureException, captureMessage, env_config } from "./config"; import BN, { min } from "bn.js"; import { BankMetadataMap, loadBankMetadatas } from "./utils/bankMetadata"; import { Bank } from "@mrgnlabs/marginfi-client-v2/dist/models/bank"; +import { chunkedGetRawMultipleAccountInfos, convertBase64StringArrayToBuffer } from "./utils/chunks"; const DUST_THRESHOLD = new BigNumber(10).pow(USDC_DECIMALS - 2); const DUST_THRESHOLD_UI = new BigNumber(0.01); @@ -34,6 +35,8 @@ function getDebugLogger(context: string) { class Liquidator { private bankMetadataMap: BankMetadataMap; private accountCooldowns: Map = new Map(); + accountInfos: Map = new Map(); + accountKeys: PublicKey[] = []; constructor( readonly connection: Connection, @@ -78,6 +81,7 @@ class Liquidator { console.log("Start with DEBUG=mfi:* to see more logs"); + await this.startLiquidatorDataLoader(); await this.mainLoop(); } @@ -96,6 +100,16 @@ class Liquidator { await this.account.reload(); } + private async rebalanceIfNeeded(): Promise { + if (await this.needsToBeRebalanced()) { + await this.rebalancingStage(); + + return true; + } + + return false; + } + private async mainLoop() { const debug = getDebugLogger("main-loop"); drawSpinner("Scanning"); @@ -105,21 +119,20 @@ class Liquidator { while (true) { await this.swapNonUsdcInTokenAccounts(); debug("Started main loop iteration"); - if (await this.needsToBeRebalanced()) { - await this.rebalancingStage(); + if (await this.rebalanceIfNeeded()) { continue; } // Don't sleep after liquidating an account, start rebalance immediately if (!(await this.liquidationStage())) { - await sleep(env_config.SLEEP_INTERVAL); + await sleep(env_config.SLEEP_INTERVAL_SECONDS * 1000); this.reload(); } } } catch (e) { console.error(e); captureException(e); - await sleep(env_config.SLEEP_INTERVAL); + await sleep(env_config.SLEEP_INTERVAL_SECONDS * 1000); } } } @@ -193,6 +206,91 @@ class Liquidator { debug("Swap transaction sent: %s", txid); } + private async startLiquidatorDataLoader() { + const debug = getDebugLogger("start-liquidator-data-loader"); + debug("Starting liquidator data loader"); + + // Start a job that periodically loads all marginfi account pubkeys, and then refreshes them in batches. + // Start a websocket that updates the accounts. + debug("Loading all Marginfi accounts for the first time"); + await this.loadAllMarginfiAccounts(); + debug("Starting websocket account updater"); + this.startWebsocketAccountUpdater(); + + setInterval(async () => { + debug("Refreshing all Marginfi accounts"); + await this.loadAllMarginfiAccounts(); + }, env_config.ACCOUNT_REFRESH_INTERVAL_SECONDS * 1000); + } + + private async loadAllMarginfiAccounts() { + console.log("Loading data, this may take a moment...") + const debug = getDebugLogger("load-all-marginfi-accounts"); + debug("Loading all Marginfi accounts"); + const allKeys = (await this.client.getAllMarginfiAccountAddresses()); + debug("Retrieved all Marginfi account addresses, found: %d", allKeys.length); + const [slot, ais] = await chunkedGetRawMultipleAccountInfos(this.connection, allKeys.map((k) => k.toBase58()), 16 * 64, 64); + debug("Received account information for slot %d, got: %d accounts", slot, ais.size); + this.accountKeys = allKeys; + + const totalAccounts = ais.size; + let processedAccounts = 0; + for (const [key, accountInfo] of ais) { + + const pubkey = new PublicKey(key); + const account = MarginfiAccountWrapper.fromAccountDataRaw(pubkey, this.client, accountInfo.data); + this.accountInfos.set(pubkey, account); + + processedAccounts++; + if (processedAccounts % 1000 === 0) { + const progress = ((processedAccounts / totalAccounts) * 100).toFixed(2); + debug("Processed %d accounts out of %d (%s%%)", processedAccounts, totalAccounts, progress); + } + } + if (processedAccounts % 1000 !== 0) { + const progress = ((processedAccounts / totalAccounts) * 100).toFixed(2); + debug("Final progress: %s%%", progress); + } + + console.log("Finished loading all Marginfi accounts"); + } + + private async startWebsocketAccountUpdater() { + const debug = getDebugLogger("start-websocket-account-updater"); + debug("Starting websocket account updater"); + /// Start a websocket that updates the accounts. + let connection = 0; + + const fn = () => { + if (connection != 0) { + debug("Resetting websocket connection"); + this.connection.removeAccountChangeListener(connection); + } + + debug("Starting websocket connection"); + connection = this.connection.onProgramAccountChange(this.client.program.programId, (info) => { + const pubkey = info.accountId; + const accountInfo = info.accountInfo; + + if (accountInfo.data.length !== this.client.program.account.marginfiAccount.size) { + debug("Received account update for account with public key: %s, but data length is incorrect", pubkey.toBase58()); + return; + } + + try { + const account = MarginfiAccountWrapper.fromAccountDataRaw(pubkey, this.client, accountInfo.data); + this.accountInfos.set(pubkey, account); + debug("Updated Marginfi account for public key: %s", pubkey.toBase58()); + } catch (error) { + debug("Failed to decode Marginfi account for public key: %s, Error: %s", pubkey.toBase58(), error); + } + }); + } + + setInterval(() => fn, env_config.WS_RESET_INTERVAL_SECONDS * 1000); + fn() + } + /** * 1. step of the account re-balancing @@ -236,7 +334,7 @@ class Liquidator { this.reload(); } - this.swapNonUsdcInTokenAccounts(); + await this.swapNonUsdcInTokenAccounts(); } /** @@ -476,7 +574,7 @@ class Liquidator { private async liquidationStage(): Promise { const debug = getDebugLogger("liquidation-stage"); debug("Started liquidation stage"); - const allAccounts = await this.client.getAllMarginfiAccounts(); + const allAccounts = Array.from(this.accountInfos.values()); const targetAccounts = allAccounts.filter((account) => { if (this.account_whitelist) { return ( diff --git a/apps/alpha-liquidator/src/utils/chunks.ts b/apps/alpha-liquidator/src/utils/chunks.ts index 366ccb580f..48aa1e8044 100644 --- a/apps/alpha-liquidator/src/utils/chunks.ts +++ b/apps/alpha-liquidator/src/utils/chunks.ts @@ -1,5 +1,6 @@ -import { AccountInfo, Connection } from "@solana/web3.js"; +import { AccountInfo, Connection, PublicKey } from "@solana/web3.js"; import { commitment } from "./connection"; +import { deserializeAccountInfosMap } from "./accountInfos"; export function chunks(array: T[], size: number): T[][] { return Array.apply(0, new Array(Math.ceil(array.length / size))).map((_, index) => @@ -20,46 +21,77 @@ export async function chunkedGetRawMultipleAccountInfos( pks: string[], batchChunkSize: number = 1000, maxAccountsChunkSize: number = 100 -): Promise<[number, Map>]> { +): Promise<[number, Map>]> { const accountInfoMap = new Map>(); let contextSlot = 0; + const debug = require("debug")("mfi:chunkedGetRawMultipleAccountInfos"); + debug(`Starting chunkedGetRawMultipleAccountInfos with ${pks.length} public keys`); - const accountInfos: Array | null> = ( - await Promise.all( - chunks(pks, batchChunkSize).map(async (batchPubkeys) => { - const batch = chunks(batchPubkeys, maxAccountsChunkSize).map((pubkeys) => ({ - methodName: "getMultipleAccounts", - // @ts-expect-error solana web3.js doesnt type zstd but infact it is supported - // Using zstd instead of base64 because it is faster when fetching - // base64 was 3x slower than zstd fetching from rpc - args: connection._buildArgs([pubkeys], commitment, "base64+zstd"), - })); - - return ( - // getMultipleAccounts is quite slow, so we use fetch directly - connection - // @ts-ignore - ._rpcBatchRequest(batch) - .then((batchResults: Result[]) => { - contextSlot = Math.max(...batchResults.map((res) => res.result.context.slot)); - - const accounts = batchResults.reduce((acc, res) => { - acc.push(...res.result.value); - return acc; - }, [] as Result["result"]["value"]); - return accounts; - }) - ); - }) - ) - ).flat(); - - accountInfos.forEach((item, index) => { - const publicKey = pks[index]; - if (item) { - accountInfoMap.set(publicKey, item); + const batches = chunkArray(pks, batchChunkSize); + + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + debug(`Processing batch ${i + 1}/${batches.length} of ${batch.length} public keys`); + + const batchRequest = chunkArray(batch, maxAccountsChunkSize).map((pubkeys) => ({ + methodName: "getMultipleAccounts", + // @ts-expect-error solana web3.js doesnt type zstd but infact it is supported + // Using zstd instead of base64 because it is faster when fetching + // base64 was 3x slower than zstd fetching from rpc + args: connection._buildArgs([pubkeys], commitment, "base64+zstd"), + })); + + let accountInfos: Array | null> = []; + let retries = 0; + const maxRetries = 3; + + while (retries < maxRetries && accountInfos.length === 0) { + try { + accountInfos = await connection + // @ts-ignore + ._rpcBatchRequest(batchRequest) + .then((batchResults: Result[]) => { + contextSlot = Math.max(...batchResults.map((res) => res.result.context.slot)); + + const accounts = batchResults.reduce((acc, res) => { + acc.push(...res.result.value); + return acc; + }, [] as Result["result"]["value"]); + + debug(`Received batch result with ${accounts.length} accounts`); + return accounts; + }); + } catch (error) { + debug(`Batch request failed on retry ${retries}: ${error}`); + retries++; + } + } + + if (accountInfos.length === 0) { + throw new Error(`Failed to fetch account infos after ${maxRetries} retries`); } - }); - return [contextSlot, accountInfoMap]; + accountInfos.forEach((item, index) => { + const publicKey = batch[index]; + if (item) { + accountInfoMap.set(publicKey, item); + } + }); + } + + return [contextSlot, await deserializeAccountInfosMap(accountInfoMap)]; +} + +export function convertBase64StringArrayToBuffer(stringArray: string[]): Buffer { + return Buffer.concat(stringArray.map(s => Buffer.from(s, 'base64'))); } + +export function chunkArray(array: T[], chunkSize: number): T[][] { + const chunks: T[][] = []; + for (let i = 0; i < array.length; i += chunkSize) { + chunks.push(array.slice(i, i + chunkSize)); + } + return chunks; +} + + diff --git a/packages/marginfi-client-v2/src/client.ts b/packages/marginfi-client-v2/src/client.ts index fc31acfbd3..7652d3df31 100644 --- a/packages/marginfi-client-v2/src/client.ts +++ b/packages/marginfi-client-v2/src/client.ts @@ -29,7 +29,7 @@ import { Wallet, } from "@mrgnlabs/mrgn-common"; import { MarginfiGroup } from "./models/group"; -import { BankRaw, parseOracleSetup, parsePriceInfo, Bank, OraclePrice, ADDRESS_LOOKUP_TABLE_FOR_GROUP } from "."; +import { BankRaw, parseOracleSetup, parsePriceInfo, Bank, OraclePrice, ADDRESS_LOOKUP_TABLE_FOR_GROUP, MarginfiAccountRaw } from "."; import { MarginfiAccountWrapper } from "./models/account/wrapper"; import { ProcessTransactionError, ProcessTransactionErrorType, parseErrorFromLogs } from "./errors"; @@ -245,6 +245,36 @@ class MarginfiClient { ).map((a) => MarginfiAccountWrapper.fromAccountParsed(a.publicKey, this, a.account as MarginfiAccountRaw)); } + async getAllMarginfiAccountPubkeys(): Promise { + return (await this.provider.connection.getProgramAccounts(this.programId, { + filters: [{ + memcmp: { + bytes: this.config.groupPk.toBase58(), + offset: 8, // marginfiGroup is the first field in the account, so only offset is the discriminant + }, + }], + dataSlice: { offset: 0, length: 0 } + })).map(a => a.pubkey); + } + + + /** + * Fetches multiple marginfi accounts based on an array of public keys using the getMultipleAccounts RPC call. + * + * @param pubkeys - The public keys of the marginfi accounts to fetch. + * @returns An array of MarginfiAccountWrapper instances. + */ + async getMultipleMarginfiAccounts(pubkeys: PublicKey[]): Promise { + const accountsInfo = await this.provider.connection.getMultipleAccountsInfo(pubkeys); + return accountsInfo + .map((accountInfo, index) => { + if (accountInfo === null) { + throw new Error(`Account not found for pubkey: ${pubkeys[index].toBase58()}`); + } + return MarginfiAccountWrapper.fromAccountParsed(pubkeys[index], this, this.program.coder.accounts.decode("MarginfiAccount", accountInfo.data)); + }); + } + /** * Retrieves the addresses of all marginfi accounts in the underlying group. * From 4b0d619498c9f87a06bfe6af843054bb5ad5b99d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakob=20Pov=C5=A1i=C4=8D?= Date: Fri, 8 Dec 2023 13:07:14 +0100 Subject: [PATCH 2/3] Remove unused import in client.ts --- packages/marginfi-client-v2/src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/marginfi-client-v2/src/client.ts b/packages/marginfi-client-v2/src/client.ts index 7652d3df31..bd9443f5cf 100644 --- a/packages/marginfi-client-v2/src/client.ts +++ b/packages/marginfi-client-v2/src/client.ts @@ -18,7 +18,7 @@ import { AccountType, Environment, MarginfiConfig, MarginfiProgram } from "./typ import { MARGINFI_IDL } from "./idl"; import { getConfig } from "./config"; import instructions from "./instructions"; -import { MarginRequirementType, MarginfiAccountRaw } from "./models/account"; +import { MarginRequirementType } from "./models/account"; import { DEFAULT_COMMITMENT, DEFAULT_CONFIRM_OPTS, From 7df90a746792cbe5dc0caed28aa4953fb3dc1bf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakob=20Pov=C5=A1i=C4=8D?= Date: Sat, 9 Dec 2023 17:44:53 +0100 Subject: [PATCH 3/3] Update configuration and fix rebalancing bug --- apps/alpha-liquidator/src/config.ts | 4 ++- apps/alpha-liquidator/src/liquidator.ts | 33 ++++++++++++++----- apps/alpha-liquidator/src/utils/connection.ts | 1 + packages/marginfi-client-v2/src/client.ts | 4 +-- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/apps/alpha-liquidator/src/config.ts b/apps/alpha-liquidator/src/config.ts index d79eb19df7..c66e5cf6f5 100644 --- a/apps/alpha-liquidator/src/config.ts +++ b/apps/alpha-liquidator/src/config.ts @@ -31,7 +31,8 @@ if (!process.env.RPC_ENDPOINT) { /*eslint sort-keys: "error"*/ let envSchema = z.object({ ACCOUNT_COOL_DOWN_SECONDS: z.string().default("120").transform((s) => parseInt(s, 10)), - ACCOUNT_REFRESH_INTERVAL_SECONDS: z.string().default("600").transform((s) => parseInt(s, 10)), + /// 30 minutes + ACCOUNT_REFRESH_INTERVAL_SECONDS: z.string().default("1800").transform((s) => parseInt(s, 10)), EXCLUDE_ISOLATED_BANKS: z.string().optional().default("false").transform((s) => s === "true" || s === "1"), IS_DEV: z .string() @@ -84,6 +85,7 @@ let envSchema = z.object({ return Keypair.fromSecretKey(new Uint8Array(JSON.parse(keypairStr))); } }), + WS_ENDPOINT: z.string().url().optional(), WS_RESET_INTERVAL_SECONDS: z.string().optional().default("300").transform((s) => parseInt(s, 10)), }); diff --git a/apps/alpha-liquidator/src/liquidator.ts b/apps/alpha-liquidator/src/liquidator.ts index 035779fe71..6dbf7b1819 100644 --- a/apps/alpha-liquidator/src/liquidator.ts +++ b/apps/alpha-liquidator/src/liquidator.ts @@ -81,6 +81,12 @@ class Liquidator { console.log("Start with DEBUG=mfi:* to see more logs"); + try { + await this.rebalanceIfNeeded(); + } catch (e) { + console.error("Error during initial rebalance: ", e); + } + await this.startLiquidatorDataLoader(); await this.mainLoop(); } @@ -227,7 +233,15 @@ class Liquidator { console.log("Loading data, this may take a moment...") const debug = getDebugLogger("load-all-marginfi-accounts"); debug("Loading all Marginfi accounts"); - const allKeys = (await this.client.getAllMarginfiAccountAddresses()); + let allKeys = []; + + // If whitelist is set, filter out all accounts that are not in the whitelist + if (env_config.MARGINFI_ACCOUNT_WHITELIST) { + allKeys = env_config.MARGINFI_ACCOUNT_WHITELIST; + } else { + allKeys = (await this.client.getAllMarginfiAccountAddresses()); + } + debug("Retrieved all Marginfi account addresses, found: %d", allKeys.length); const [slot, ais] = await chunkedGetRawMultipleAccountInfos(this.connection, allKeys.map((k) => k.toBase58()), 16 * 64, 64); debug("Received account information for slot %d, got: %d accounts", slot, ais.size); @@ -242,15 +256,11 @@ class Liquidator { this.accountInfos.set(pubkey, account); processedAccounts++; - if (processedAccounts % 1000 === 0) { + if (processedAccounts % 5000 === 0) { const progress = ((processedAccounts / totalAccounts) * 100).toFixed(2); debug("Processed %d accounts out of %d (%s%%)", processedAccounts, totalAccounts, progress); } } - if (processedAccounts % 1000 !== 0) { - const progress = ((processedAccounts / totalAccounts) * 100).toFixed(2); - debug("Final progress: %s%%", progress); - } console.log("Finished loading all Marginfi accounts"); } @@ -273,14 +283,12 @@ class Liquidator { const accountInfo = info.accountInfo; if (accountInfo.data.length !== this.client.program.account.marginfiAccount.size) { - debug("Received account update for account with public key: %s, but data length is incorrect", pubkey.toBase58()); return; } try { const account = MarginfiAccountWrapper.fromAccountDataRaw(pubkey, this.client, accountInfo.data); this.accountInfos.set(pubkey, account); - debug("Updated Marginfi account for public key: %s", pubkey.toBase58()); } catch (error) { debug("Failed to decode Marginfi account for public key: %s, Error: %s", pubkey.toBase58(), error); } @@ -450,7 +458,6 @@ class Liquidator { private async rebalancingStage() { const debug = getDebugLogger("rebalancing-stage"); debug("Starting rebalancing stage"); - captureMessage("Starting rebalancing stage"); await this.sellNonUsdcDeposits(); await this.repayAllDebt(); await this.depositRemainingUsdc(); @@ -480,6 +487,14 @@ class Liquidator { } } + private async swapNonUsdcInTokenAccounts2() { + const debug = getDebugLogger("swap-non-usdc-in-token-accounts"); + debug("Swapping any remaining non-usdc to usdc"); + + const banks = Array.from(this.client.banks.values()).filter((bank) => !bank.mint.equals(USDC_MINT)); + const usdcBank = this.client.getBankByMint(USDC_MINT)!; + } + private async swapNonUsdcInTokenAccounts() { const debug = getDebugLogger("swap-non-usdc-in-token-accounts"); debug("Swapping any remaining non-usdc to usdc"); diff --git a/apps/alpha-liquidator/src/utils/connection.ts b/apps/alpha-liquidator/src/utils/connection.ts index 2cec04c3e3..415f7ea1c5 100644 --- a/apps/alpha-liquidator/src/utils/connection.ts +++ b/apps/alpha-liquidator/src/utils/connection.ts @@ -14,6 +14,7 @@ const createConnection = () => new Connection(env_config.RPC_ENDPOINT, { commitment, fetch: fetchWithRetry, + wsEndpoint: env_config.WS_ENDPOINT, }); export let connection = createConnection(); diff --git a/packages/marginfi-client-v2/src/client.ts b/packages/marginfi-client-v2/src/client.ts index bd9443f5cf..23f045d45f 100644 --- a/packages/marginfi-client-v2/src/client.ts +++ b/packages/marginfi-client-v2/src/client.ts @@ -464,7 +464,7 @@ class MarginfiClient { try { const getLatestBlockhashAndContext = await connection.getLatestBlockhashAndContext(); - minContextSlot = getLatestBlockhashAndContext.context.slot; + minContextSlot = getLatestBlockhashAndContext.context.slot - 4; blockhash = getLatestBlockhashAndContext.value.blockhash; lastValidBlockHeight = getLatestBlockhashAndContext.value.lastValidBlockHeight; @@ -526,7 +526,7 @@ class MarginfiClient { }; signature = await connection.sendTransaction(versionedTransaction, { - minContextSlot: mergedOpts.minContextSlot, + // minContextSlot: mergedOpts.minContextSlot, skipPreflight: mergedOpts.skipPreflight, preflightCommitment: mergedOpts.preflightCommitment, maxRetries: mergedOpts.maxRetries,