Skip to content

Commit

Permalink
Optimize load (#401)
Browse files Browse the repository at this point in the history
* optimize load a bit more

* versions

---------

Co-authored-by: Kelvin Lau <[email protected]>
  • Loading branch information
kelvinlau20100 and Kelvin Lau authored May 29, 2024
1 parent 6c8f733 commit c6ff937
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 116 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
All notable changes to this project will be documented in this file.
Version changes are pinned to SDK releases.

## [1.34.0]

- reduce number of fetches on startup and speed up load. ([#401](https://github.com/zetamarkets/sdk/pull/401))

## [1.31.3]

- Update subscribePricing to return mark prices ([#399](https://github.com/zetamarkets/sdk/pull/399))
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@zetamarkets/sdk",
"repository": "https://github.com/zetamarkets/sdk/",
"version": "1.32.0",
"version": "1.34.0",
"description": "Zeta SDK",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
180 changes: 99 additions & 81 deletions src/exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
import * as utils from "./utils";
import * as constants from "./constants";
import * as assets from "./assets";
import { PerpSyncQueue, ProductGreeks, State, Pricing } from "./program-types";
import { PerpSyncQueue, State, Pricing } from "./program-types";
import { Market, ZetaGroupMarkets } from "./market";
import { RiskCalculator } from "./risk";
import { EventType } from "./events";
Expand All @@ -30,9 +30,10 @@ import { assetToIndex, toProgramAsset } from "./assets";
import { Asset } from "./constants";
import { SubExchange } from "./subexchange";
import * as instructions from "./program-instructions";
import { Orderbook } from "./serum/market";
import fetch from "cross-fetch";
import { HttpProvider } from "@bloxroute/solana-trader-client-ts";
import { getDecodedMarket } from "./serum/generate-decoded";
import { MARKET_STATE_LAYOUT_V3 } from "./serum/market";
import { BlockhashCache } from "./blockhash-cache";

export class Exchange {
Expand Down Expand Up @@ -662,96 +663,102 @@ export class Exchange {
this.connection as unknown as Connection
);

const subExchangeToFetchAddrs: PublicKey[] = this.assets
const srmMarketAddresses = this.assets.map((a) => {
return this.pricing.products[assetToIndex(a)].market;
});

const perpSyncQueueAddresses: PublicKey[] = this.assets
.map((a) => {
const se = this.getSubExchange(a);
return [se.perpSyncQueueAddress];
})
.flat()
.concat([SYSVAR_CLOCK_PUBKEY]);
.flat();

const accFetchPromises: Promise<any>[] = subExchangeToFetchAddrs.map(
(addr) => {
return this.provider.connection.getAccountInfo(addr);
}
);
const allPromises: Promise<any>[] = accFetchPromises.concat([
const allAddrsToFetch: PublicKey[] = perpSyncQueueAddresses;

let srmBidAddrs: PublicKey[] = [];
let srmAskAddrs: PublicKey[] = [];
let decodedSrmMarkets: any[] = [];
if (!loadConfig.loadFromStore) {
allAddrsToFetch.push(...srmMarketAddresses);
}

if (this.network != Network.LOCALNET) {
this.assets.forEach((a) => {
let decodedSrmMarket = getDecodedMarket(
this.network,
a,
constants.PERP_INDEX
);
decodedSrmMarkets.push(decodedSrmMarket);
srmBidAddrs.push(decodedSrmMarket.bids);
srmAskAddrs.push(decodedSrmMarket.asks);
});
}

allAddrsToFetch.push(...srmBidAddrs);
allAddrsToFetch.push(...srmAskAddrs);
allAddrsToFetch.push(SYSVAR_CLOCK_PUBKEY);

const accFetchPromise: Promise<AccountInfo<Buffer>[]> =
this.connection.getMultipleAccountsInfo(allAddrsToFetch);

const allPromises: Promise<any>[] = [accFetchPromise as any].concat([
this.subscribeOracle(this.assets, callback),
]);

// If throttleMs is passed, do each promise slowly with a delay, else load everything at once
let accFetches = [];
if (loadConfig.throttleMs > 0) {
console.log(
`Fetching ${allPromises.length} core accounts with ${loadConfig.throttleMs}ms sleep inbetween each fetch...`
);
for (var prom of allPromises) {
await utils.sleep(loadConfig.throttleMs);
accFetches.push(await Promise.resolve(prom));
}
accFetches = accFetches.slice(0, this.assets.length + 1);
} else {
accFetches = (await Promise.all(allPromises)).slice(
0,
this.assets.length + 1
);
}
const accFetches: AccountInfo<Buffer>[] = (
await Promise.all(allPromises)
)[0];

let perpSyncQueueAccs: PerpSyncQueue[] = [];
for (let i = 0; i < accFetches.length - 1; i++) {
perpSyncQueueAccs.push(
this.program.account.perpSyncQueue.coder.accounts.decode(
// First 0 to Assets.length-1 will always be PerpSyncQueues
const perpSyncQueueAccs = accFetches
.slice(0, this.assets.length)
.map((accInfo) => {
return this.program.account.perpSyncQueue.coder.accounts.decode(
types.ProgramAccountType.PerpSyncQueue,
accFetches[i].data
) as PerpSyncQueue
);
}
accInfo.data
) as PerpSyncQueue;
});

// If throttleMs is passed, load sequentially with some delay, else load as fast as possible
if (loadConfig.throttleMs > 0) {
for (var asset of this.assets) {
await utils.sleep(loadConfig.throttleMs);
await this.getSubExchange(asset).load(
asset,
this.opts,
[perpSyncQueueAccs[this.assets.indexOf(asset)]],
loadConfig.loadFromStore,
callback
);
}
} else {
await Promise.all(
this.assets.map(async (asset, i) => {
return this.getSubExchange(asset).load(
asset,
this.opts,
[perpSyncQueueAccs[i]],
loadConfig.loadFromStore,
callback
const clockData = utils.getClockData(accFetches.at(-1));

// Assets.length to Assets.length + Assets.length-1 will sometimes be SerumMarket data
await Promise.all(
this.assets.map(async (asset, i) => {
let decodedSrmMarket = undefined;
let bidAccInfo: AccountInfo<Buffer> = undefined;
let askAccInfo: AccountInfo<Buffer> = undefined;

if (this.network != Network.LOCALNET) {
if (loadConfig.loadFromStore) {
decodedSrmMarket = decodedSrmMarkets[i];
bidAccInfo = accFetches[this.assets.length + i];
askAccInfo = accFetches[this.assets.length * 2 + i];
} else {
decodedSrmMarket = MARKET_STATE_LAYOUT_V3.decode(
accFetches[this.assets.length + i].data
);
bidAccInfo = accFetches[this.assets.length * 2 + i];
askAccInfo = accFetches[this.assets.length * 3 + i];
}
} else {
decodedSrmMarket = MARKET_STATE_LAYOUT_V3.decode(
accFetches[this.assets.length + i].data
);
})
);
}
}

if (loadConfig.throttleMs > 0) {
console.log(
`Updating ${this.assets.length} markets with ${loadConfig.throttleMs}ms sleep inbetween each update...`
);
for (var asset of this.assets) {
await utils.sleep(loadConfig.throttleMs);
await this.getPerpMarket(asset).serumMarket.updateDecoded(
this.connection as unknown as ConnectionZstd
return this.getSubExchange(asset).load(
asset,
this.opts,
perpSyncQueueAccs[i],
decodedSrmMarket,
bidAccInfo,
askAccInfo,
clockData
);
}
} else {
await Promise.all(
this._assets.map(async (a) => {
await this.getPerpMarket(a).serumMarket.updateDecoded(
this.connection as unknown as ConnectionZstd
);
})
);
}
})
);

for (var se of this.getAllSubExchanges()) {
// Only subscribe to the orderbook for assets provided in the override
Expand All @@ -772,7 +779,6 @@ export class Exchange {
this._zetaGroupPubkeyToAsset.set(se.zetaGroupAddress, se.asset);
}

const clockData = utils.getClockData(accFetches.at(-1));
this.subscribeClock(clockData, callback);
this.subscribePricing(callback);
this.subscribeState(callback);
Expand Down Expand Up @@ -922,8 +928,20 @@ export class Exchange {
}

public async updateExchangeState() {
await this.updateState();
await this.updateZetaPricing();
let accInfos = await this.connection.getMultipleAccountsInfo([
this.stateAddress,
this.pricingAddress,
]);

this._state = this.program.account.state.coder.accounts.decode(
types.ProgramAccountType.State,
accInfos[0].data
);

this._pricing = this.program.account.pricing.coder.accounts.decode(
types.ProgramAccountType.Pricing,
accInfos[1].data
);
}

/**
Expand Down
58 changes: 29 additions & 29 deletions src/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,39 +73,26 @@ export class ZetaGroupMarkets {
public static async load(
asset: Asset,
opts: ConfirmOptions,
loadFromStore: boolean
decodedSrmMarket: any,
bidAccInfo: AccountInfo<Buffer> | undefined,
askAccInfo: AccountInfo<Buffer> | undefined,
clockData: types.ClockData
): Promise<ZetaGroupMarkets> {
let instance = new ZetaGroupMarkets(asset);
let subExchange = Exchange.getSubExchange(asset);

// Perps product/market is separate
let marketAddr = Exchange.pricing.products[assetToIndex(asset)].market;
let serumMarket: SerumMarket;
if (loadFromStore) {
const decoded = getDecodedMarket(
Exchange.network,
asset,
constants.PERP_INDEX
);
serumMarket = SerumMarket.loadFromDecoded(
decoded,
{
commitment: opts.commitment,
skipPreflight: opts.skipPreflight,
},
constants.DEX_PID[Exchange.network] as PublicKeyZstd
);
} else {
serumMarket = await SerumMarket.load(
Exchange.connection,
marketAddr as PublicKeyZstd,
{
commitment: opts.commitment,
skipPreflight: opts.skipPreflight,
},
constants.DEX_PID[Exchange.network] as PublicKeyZstd
);
}

serumMarket = SerumMarket.loadFromDecoded(
decodedSrmMarket,
{
commitment: opts.commitment,
skipPreflight: opts.skipPreflight,
},
constants.DEX_PID[Exchange.network] as PublicKeyZstd
);

let [baseVaultAddr, _baseVaultNonce] = getZetaVault(
Exchange.programId,
Expand All @@ -125,9 +112,22 @@ export class ZetaGroupMarkets {
serumMarket
);

let book = await serumMarket.loadBidsAndAsks(
Exchange.provider.connection as unknown as ConnectionZstd
);
let book = undefined;
if (bidAccInfo && askAccInfo) {
book = serumMarket.loadBidsAndAsksFromData(
clockData,
bidAccInfo,
askAccInfo
);
} else {
book = await serumMarket.loadBidsAndAsks(
Exchange.provider.connection as unknown as ConnectionZstd
);
}

instance._market.bids = book.bids;
instance._market.asks = book.asks;

instance._market.bids = book.bids;
instance._market.asks = book.asks;
instance._market.updateOrderbook();
Expand Down
9 changes: 9 additions & 0 deletions src/serum/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { decodeEventQueue, decodeRequestQueue } from "./queue";
import { Buffer } from "buffer";
import { exchange as Exchange } from "../exchange";
import { getClockData } from "../utils";
import { ClockData } from "../types";

export const MARKET_STATE_LAYOUT_V3 = struct([
blob(5),
Expand Down Expand Up @@ -259,6 +260,14 @@ export class Market {
};
}

loadBidsAndAsksFromData(clockInfo: ClockData, bidsInfo, asksInfo): any {
return {
slot: clockInfo.slot,
bids: Orderbook.decode(this, throwIfNull(bidsInfo).data),
asks: Orderbook.decode(this, throwIfNull(asksInfo).data),
};
}

async loadRequestQueue(connection: Connection) {
const { data } = throwIfNull(
await connection.getAccountInfo(this._decoded.requestQueue)
Expand Down
Loading

0 comments on commit c6ff937

Please sign in to comment.