Skip to content

Commit

Permalink
Merge pull request #414 from mrgnlabs/j/ws-liquidator
Browse files Browse the repository at this point in the history
Websocket liquidator
  • Loading branch information
jkbpvsc authored Dec 9, 2023
2 parents ea68190 + 7df90a7 commit 9387587
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 52 deletions.
8 changes: 6 additions & 2 deletions apps/alpha-liquidator/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ if (!process.env.RPC_ENDPOINT) {
/*eslint sort-keys: "error"*/
let envSchema = z.object({
ACCOUNT_COOL_DOWN_SECONDS: z.string().default("120").transform((s) => parseInt(s, 10)),
/// 30 minutes
ACCOUNT_REFRESH_INTERVAL_SECONDS: z.string().default("1800").transform((s) => parseInt(s, 10)),
EXCLUDE_ISOLATED_BANKS: z.string().optional().default("false").transform((s) => s === "true" || s === "1"),
IS_DEV: z
.string()
Expand Down Expand Up @@ -67,9 +69,9 @@ let envSchema = z.object({
.default("false")
.transform((s) => s === "true" || s === "1"),
SENTRY_DSN: z.string().optional(),
SLEEP_INTERVAL: z
SLEEP_INTERVAL_SECONDS: z
.string()
.default("10000")
.default("5")
.transform((s) => parseInt(s, 10)),
SORT_ACCOUNTS_MODE: z
.string()
Expand All @@ -83,6 +85,8 @@ let envSchema = z.object({
return Keypair.fromSecretKey(new Uint8Array(JSON.parse(keypairStr)));
}
}),
WS_ENDPOINT: z.string().url().optional(),
WS_RESET_INTERVAL_SECONDS: z.string().optional().default("300").transform((s) => parseInt(s, 10)),
});

type EnvSchema = z.infer<typeof envSchema>;
Expand Down
129 changes: 121 additions & 8 deletions apps/alpha-liquidator/src/liquidator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Connection, LAMPORTS_PER_SOL, PublicKey, VersionedTransaction } from "@solana/web3.js";
import { AccountInfo, Connection, LAMPORTS_PER_SOL, PublicKey, VersionedTransaction } from "@solana/web3.js";
import {
MarginRequirementType,
MarginfiAccount,
Expand All @@ -15,6 +15,7 @@ import { captureException, captureMessage, env_config } from "./config";
import BN, { min } from "bn.js";
import { BankMetadataMap, loadBankMetadatas } from "./utils/bankMetadata";
import { Bank } from "@mrgnlabs/marginfi-client-v2/dist/models/bank";
import { chunkedGetRawMultipleAccountInfos, convertBase64StringArrayToBuffer } from "./utils/chunks";

const DUST_THRESHOLD = new BigNumber(10).pow(USDC_DECIMALS - 2);
const DUST_THRESHOLD_UI = new BigNumber(0.01);
Expand All @@ -34,6 +35,8 @@ function getDebugLogger(context: string) {
class Liquidator {
private bankMetadataMap: BankMetadataMap;
private accountCooldowns: Map<string, number> = new Map();
accountInfos: Map<PublicKey, MarginfiAccountWrapper> = new Map();
accountKeys: PublicKey[] = [];

constructor(
readonly connection: Connection,
Expand Down Expand Up @@ -78,6 +81,13 @@ class Liquidator {

console.log("Start with DEBUG=mfi:* to see more logs");

try {
await this.rebalanceIfNeeded();
} catch (e) {
console.error("Error during initial rebalance: ", e);
}

await this.startLiquidatorDataLoader();
await this.mainLoop();
}

Expand All @@ -96,6 +106,16 @@ class Liquidator {
await this.account.reload();
}

private async rebalanceIfNeeded(): Promise<boolean> {
if (await this.needsToBeRebalanced()) {
await this.rebalancingStage();

return true;
}

return false;
}

private async mainLoop() {
const debug = getDebugLogger("main-loop");
drawSpinner("Scanning");
Expand All @@ -105,21 +125,20 @@ class Liquidator {
while (true) {
await this.swapNonUsdcInTokenAccounts();
debug("Started main loop iteration");
if (await this.needsToBeRebalanced()) {
await this.rebalancingStage();
if (await this.rebalanceIfNeeded()) {
continue;
}

// Don't sleep after liquidating an account, start rebalance immediately
if (!(await this.liquidationStage())) {
await sleep(env_config.SLEEP_INTERVAL);
await sleep(env_config.SLEEP_INTERVAL_SECONDS * 1000);
this.reload();
}
}
} catch (e) {
console.error(e);
captureException(e);
await sleep(env_config.SLEEP_INTERVAL);
await sleep(env_config.SLEEP_INTERVAL_SECONDS * 1000);
}
}
}
Expand Down Expand Up @@ -193,6 +212,93 @@ class Liquidator {
debug("Swap transaction sent: %s", txid);
}

private async startLiquidatorDataLoader() {
const debug = getDebugLogger("start-liquidator-data-loader");
debug("Starting liquidator data loader");

// Start a job that periodically loads all marginfi account pubkeys, and then refreshes them in batches.
// Start a websocket that updates the accounts.
debug("Loading all Marginfi accounts for the first time");
await this.loadAllMarginfiAccounts();
debug("Starting websocket account updater");
this.startWebsocketAccountUpdater();

setInterval(async () => {
debug("Refreshing all Marginfi accounts");
await this.loadAllMarginfiAccounts();
}, env_config.ACCOUNT_REFRESH_INTERVAL_SECONDS * 1000);
}

private async loadAllMarginfiAccounts() {
console.log("Loading data, this may take a moment...")
const debug = getDebugLogger("load-all-marginfi-accounts");
debug("Loading all Marginfi accounts");
let allKeys = [];

// If whitelist is set, filter out all accounts that are not in the whitelist
if (env_config.MARGINFI_ACCOUNT_WHITELIST) {
allKeys = env_config.MARGINFI_ACCOUNT_WHITELIST;
} else {
allKeys = (await this.client.getAllMarginfiAccountAddresses());
}

debug("Retrieved all Marginfi account addresses, found: %d", allKeys.length);
const [slot, ais] = await chunkedGetRawMultipleAccountInfos(this.connection, allKeys.map((k) => k.toBase58()), 16 * 64, 64);
debug("Received account information for slot %d, got: %d accounts", slot, ais.size);
this.accountKeys = allKeys;

const totalAccounts = ais.size;
let processedAccounts = 0;
for (const [key, accountInfo] of ais) {

const pubkey = new PublicKey(key);
const account = MarginfiAccountWrapper.fromAccountDataRaw(pubkey, this.client, accountInfo.data);
this.accountInfos.set(pubkey, account);

processedAccounts++;
if (processedAccounts % 5000 === 0) {
const progress = ((processedAccounts / totalAccounts) * 100).toFixed(2);
debug("Processed %d accounts out of %d (%s%%)", processedAccounts, totalAccounts, progress);
}
}

console.log("Finished loading all Marginfi accounts");
}

private async startWebsocketAccountUpdater() {
const debug = getDebugLogger("start-websocket-account-updater");
debug("Starting websocket account updater");
/// Start a websocket that updates the accounts.
let connection = 0;

const fn = () => {
if (connection != 0) {
debug("Resetting websocket connection");
this.connection.removeAccountChangeListener(connection);
}

debug("Starting websocket connection");
connection = this.connection.onProgramAccountChange(this.client.program.programId, (info) => {
const pubkey = info.accountId;
const accountInfo = info.accountInfo;

if (accountInfo.data.length !== this.client.program.account.marginfiAccount.size) {
return;
}

try {
const account = MarginfiAccountWrapper.fromAccountDataRaw(pubkey, this.client, accountInfo.data);
this.accountInfos.set(pubkey, account);
} catch (error) {
debug("Failed to decode Marginfi account for public key: %s, Error: %s", pubkey.toBase58(), error);
}
});
}

setInterval(() => fn, env_config.WS_RESET_INTERVAL_SECONDS * 1000);
fn()
}

/**
* 1. step of the account re-balancing
Expand Down Expand Up @@ -236,7 +342,7 @@ class Liquidator {
this.reload();
}

this.swapNonUsdcInTokenAccounts();
await this.swapNonUsdcInTokenAccounts();
}

/**
Expand Down Expand Up @@ -352,7 +458,6 @@ class Liquidator {
private async rebalancingStage() {
const debug = getDebugLogger("rebalancing-stage");
debug("Starting rebalancing stage");
captureMessage("Starting rebalancing stage");
await this.sellNonUsdcDeposits();
await this.repayAllDebt();
await this.depositRemainingUsdc();
Expand Down Expand Up @@ -382,6 +487,14 @@ class Liquidator {
}
}

private async swapNonUsdcInTokenAccounts2() {
const debug = getDebugLogger("swap-non-usdc-in-token-accounts");
debug("Swapping any remaining non-usdc to usdc");

const banks = Array.from(this.client.banks.values()).filter((bank) => !bank.mint.equals(USDC_MINT));
const usdcBank = this.client.getBankByMint(USDC_MINT)!;
}

private async swapNonUsdcInTokenAccounts() {
const debug = getDebugLogger("swap-non-usdc-in-token-accounts");
debug("Swapping any remaining non-usdc to usdc");
Expand Down Expand Up @@ -476,7 +589,7 @@ class Liquidator {
private async liquidationStage(): Promise<boolean> {
const debug = getDebugLogger("liquidation-stage");
debug("Started liquidation stage");
const allAccounts = await this.client.getAllMarginfiAccounts();
const allAccounts = Array.from(this.accountInfos.values());
const targetAccounts = allAccounts.filter((account) => {
if (this.account_whitelist) {
return (
Expand Down
108 changes: 70 additions & 38 deletions apps/alpha-liquidator/src/utils/chunks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AccountInfo, Connection } from "@solana/web3.js";
import { AccountInfo, Connection, PublicKey } from "@solana/web3.js";
import { commitment } from "./connection";
import { deserializeAccountInfosMap } from "./accountInfos";

export function chunks<T>(array: T[], size: number): T[][] {
return Array.apply<number, T[], T[][]>(0, new Array(Math.ceil(array.length / size))).map((_, index) =>
Expand All @@ -20,46 +21,77 @@ export async function chunkedGetRawMultipleAccountInfos(
pks: string[],
batchChunkSize: number = 1000,
maxAccountsChunkSize: number = 100
): Promise<[number, Map<string, AccountInfo<string[]>>]> {
): Promise<[number, Map<string, AccountInfo<Buffer>>]> {
const accountInfoMap = new Map<string, AccountInfo<string[]>>();
let contextSlot = 0;
const debug = require("debug")("mfi:chunkedGetRawMultipleAccountInfos");
debug(`Starting chunkedGetRawMultipleAccountInfos with ${pks.length} public keys`);

const accountInfos: Array<AccountInfo<string[]> | null> = (
await Promise.all(
chunks(pks, batchChunkSize).map(async (batchPubkeys) => {
const batch = chunks(batchPubkeys, maxAccountsChunkSize).map((pubkeys) => ({
methodName: "getMultipleAccounts",
// @ts-expect-error solana web3.js doesnt type zstd but infact it is supported
// Using zstd instead of base64 because it is faster when fetching
// base64 was 3x slower than zstd fetching from rpc
args: connection._buildArgs([pubkeys], commitment, "base64+zstd"),
}));

return (
// getMultipleAccounts is quite slow, so we use fetch directly
connection
// @ts-ignore
._rpcBatchRequest(batch)
.then((batchResults: Result[]) => {
contextSlot = Math.max(...batchResults.map((res) => res.result.context.slot));

const accounts = batchResults.reduce((acc, res) => {
acc.push(...res.result.value);
return acc;
}, [] as Result["result"]["value"]);
return accounts;
})
);
})
)
).flat();

accountInfos.forEach((item, index) => {
const publicKey = pks[index];
if (item) {
accountInfoMap.set(publicKey, item);
const batches = chunkArray(pks, batchChunkSize);

for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
debug(`Processing batch ${i + 1}/${batches.length} of ${batch.length} public keys`);

const batchRequest = chunkArray(batch, maxAccountsChunkSize).map((pubkeys) => ({
methodName: "getMultipleAccounts",
// @ts-expect-error solana web3.js doesnt type zstd but infact it is supported
// Using zstd instead of base64 because it is faster when fetching
// base64 was 3x slower than zstd fetching from rpc
args: connection._buildArgs([pubkeys], commitment, "base64+zstd"),
}));

let accountInfos: Array<AccountInfo<string[]> | null> = [];
let retries = 0;
const maxRetries = 3;

while (retries < maxRetries && accountInfos.length === 0) {
try {
accountInfos = await connection
// @ts-ignore
._rpcBatchRequest(batchRequest)
.then((batchResults: Result[]) => {
contextSlot = Math.max(...batchResults.map((res) => res.result.context.slot));

const accounts = batchResults.reduce((acc, res) => {
acc.push(...res.result.value);
return acc;
}, [] as Result["result"]["value"]);

debug(`Received batch result with ${accounts.length} accounts`);
return accounts;
});
} catch (error) {
debug(`Batch request failed on retry ${retries}: ${error}`);
retries++;
}
}

if (accountInfos.length === 0) {
throw new Error(`Failed to fetch account infos after ${maxRetries} retries`);
}
});

return [contextSlot, accountInfoMap];
accountInfos.forEach((item, index) => {
const publicKey = batch[index];
if (item) {
accountInfoMap.set(publicKey, item);
}
});
}

return [contextSlot, await deserializeAccountInfosMap(accountInfoMap)];
}

export function convertBase64StringArrayToBuffer(stringArray: string[]): Buffer {
return Buffer.concat(stringArray.map(s => Buffer.from(s, 'base64')));
}

export function chunkArray<T>(array: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}


1 change: 1 addition & 0 deletions apps/alpha-liquidator/src/utils/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const createConnection = () =>
new Connection(env_config.RPC_ENDPOINT, {
commitment,
fetch: fetchWithRetry,
wsEndpoint: env_config.WS_ENDPOINT,
});

export let connection = createConnection();
Loading

3 comments on commit 9387587

@vercel
Copy link

@vercel vercel bot commented on 9387587 Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

marginfi-landing-page – ./apps/marginfi-landing-page

marginfi-landing-page.vercel.app
marginfi-landing-page-mrgn.vercel.app
marginfi-landing-page-git-production-mrgn.vercel.app
marginfi.com
www.marginfi.com

@vercel
Copy link

@vercel vercel bot commented on 9387587 Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

omni – ./apps/omni

omni-git-production-mrgn.vercel.app
omni-mrgn.vercel.app
omni-one.vercel.app
omni.marginfi.com

@vercel
Copy link

@vercel vercel bot commented on 9387587 Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.