diff --git a/packages/database/src/pg/impl/pubsub.ts b/packages/database/src/pg/impl/pubsub.ts index e9853d922..4f60263cb 100644 --- a/packages/database/src/pg/impl/pubsub.ts +++ b/packages/database/src/pg/impl/pubsub.ts @@ -8,14 +8,15 @@ const topic = pubSub.topic('onupsert'); export const subscriptions: { [key: string]: Promise } = {}; export const getSubscription = (table: string) => { - const subsName = Math.random().toString().replace('0.', 'subs'); if (!subscriptions[table]) { + const subsName = Math.random().toString().replace('0.', 'subs-onupsert-'); subscriptions[table] = new Promise(async (res) => { const [subscription] = await topic.createSubscription(subsName, { filter: `attributes.table="${table}"`, ackDeadlineSeconds: 600, expirationPolicy: { ttl: { seconds: 86400 } }, }); + subscription.setMaxListeners(0); res(subscription); }); } diff --git a/packages/database/src/pg/impl/tables/milestone.ts b/packages/database/src/pg/impl/tables/milestone.ts index 549dbe865..e37b37835 100644 --- a/packages/database/src/pg/impl/tables/milestone.ts +++ b/packages/database/src/pg/impl/tables/milestone.ts @@ -9,10 +9,10 @@ export class MilestoneConverter implements Converter { uid: get(milestone, 'uid', ''), createdOn: milestone.createdOn?.toDate(), completed: milestone.completed, - completedOn: milestone.completedOn.toDate(), + completedOn: milestone.completedOn?.toDate(), listenerNodeId: milestone.listenerNodeId, milestone: milestone.milestone, - milestoneTimestamp: milestone.milestoneTimestamp.toDate(), + milestoneTimestamp: milestone.milestoneTimestamp?.toDate(), trxConflictCount: milestone.trxConflictCount, trxFailedCount: milestone.trxFailedCount, trxValidCount: milestone.trxValidCount, diff --git a/packages/database/src/pg/impl/tables/space.ts b/packages/database/src/pg/impl/tables/space.ts index 6db89ad01..65cfe845a 100644 --- a/packages/database/src/pg/impl/tables/space.ts +++ b/packages/database/src/pg/impl/tables/space.ts @@ -40,7 +40,7 @@ export class SpaceConverter implements Converter { alias_address: space.alias?.address, alias_aliasId: space.alias?.aliasId, alias_blockId: space.alias?.blockId, - alias_mintedOn: space.alias?.mintedOn.toDate(), + alias_mintedOn: space.alias?.mintedOn?.toDate(), alias_mintedBy: space.alias?.mintedBy, }); diff --git a/packages/database/src/pg/impl/tables/stake_reward.ts b/packages/database/src/pg/impl/tables/stake_reward.ts index b8267899a..fbee44499 100644 --- a/packages/database/src/pg/impl/tables/stake_reward.ts +++ b/packages/database/src/pg/impl/tables/stake_reward.ts @@ -12,9 +12,9 @@ export class StakeRewardConverter implements Converter { onSnapshot = ( callback: (data: C | undefined) => Promise | void, - _error?: (err: any) => void, + onError?: (err: any) => void, ) => { const onMemssage = (message: Message) => { message.ack(); @@ -51,16 +51,20 @@ export class IDocument { callback(this.converter.fromPg(update)); } }; - let subscription: Subscription | undefined = undefined; - this.getQuery().then(async (raw) => { - callback(raw ? this.converter.fromPg(raw) : undefined); - subscription = await getSubscription(this.table); - subscription.on('message', onMemssage); - }); + const subsPromise = getSubscription(this.table); + + this.getQuery() + .then(async (raw) => { + callback(raw ? this.converter.fromPg(raw) : undefined); + (await subsPromise).on('message', onMemssage); + }) + .catch(onError); return () => { - subscription?.removeListener('message', onMemssage); + subsPromise.then((subs) => { + subs.removeListener('message', onMemssage); + }); }; }; diff --git a/packages/database/src/pg/interfaces/query.ts b/packages/database/src/pg/interfaces/query.ts index ec98d2fa9..fbcff8f75 100644 --- a/packages/database/src/pg/interfaces/query.ts +++ b/packages/database/src/pg/interfaces/query.ts @@ -1,5 +1,5 @@ import { COL, SUB_COL } from '@build-5/interfaces'; -import { Message, Subscription } from '@google-cloud/pubsub'; +import { Message } from '@google-cloud/pubsub'; import { Knex } from 'knex'; import { get, gt, gte, isEqual, lt, lte, orderBy, take } from 'lodash'; import { getPgData, getSubscription } from '../impl/pubsub'; @@ -24,7 +24,7 @@ export class IQuery { this.table = getTableName(col); } - private getQuery = () => { + private getQuery = async () => { let query = this.con(this.table).select('*'); for (const ins of this.whereIns) { @@ -78,7 +78,14 @@ export class IQuery { query = query.orderBy(orderBy.key, orderBy.dir); } - return query; + const sql = query.toQuery(); + const queryPlan = await this.con.raw(`EXPLAIN ${sql}`); + const usesIndex = queryPlan.rows.some((row: any) => row['QUERY PLAN'].includes('Index')); + if (!usesIndex) { + throw new Error('Query requires and index.\n' + sql); + } + + return await query; }; get = async (): Promise => { @@ -147,7 +154,7 @@ export class IQuery { return act; }; - onSnapshot = (callback: (data: C[]) => Promise | void, error?: (err: any) => void) => { + onSnapshot = (callback: (data: C[]) => Promise | void, onError?: (err: any) => void) => { let pgData: Q[] = []; const onMemssage = (message: Message) => { @@ -161,29 +168,24 @@ export class IQuery { callback(callbackData); } } catch (err) { - console.error('onSnapshot-onMemssage', this.table, this.getQuery().toSQL(), err); - error?.({ code: 500, message: 'Internal server error' }); + onError?.(err); } }; - let subscription: Subscription | undefined = undefined; + const subsPromise = getSubscription(this.table); this.getQuery() .then(async (raw: Q[]) => { pgData = raw; - callback(raw.map(this.converter.fromPg)); - - subscription = await getSubscription(this.table); - subscription.on('message', onMemssage); + (await subsPromise).on('message', onMemssage); }) - .catch((err) => { - console.error('onSnapshot-getQuery', this.table, this.getQuery().toSQL(), err); - error?.({ code: 500, message: 'Internal server error' }); - }); + .catch(onError); return () => { - subscription?.removeListener('message', onMemssage); + subsPromise.then((subs) => { + subs.removeListener('message', onMemssage); + }); }; }; @@ -206,7 +208,7 @@ export class IQuery { return this; }; - startAfter = (data: C) => { + startAfter = (data: C | undefined) => { this.startAfterData = data; return this; }; diff --git a/packages/functions/src/controls/collection/collection-mint.control.ts b/packages/functions/src/controls/collection/collection-mint.control.ts index 5c753423d..ffc6a5142 100644 --- a/packages/functions/src/controls/collection/collection-mint.control.ts +++ b/packages/functions/src/controls/collection/collection-mint.control.ts @@ -6,6 +6,7 @@ import { CollectionStatus, CollectionType, Network, + Nft, TRANSACTION_AUTO_EXPIRY_MS, Transaction, TransactionPayloadType, @@ -137,17 +138,16 @@ const getNftsTotalStorageDeposit = async ( ) => { let storageDeposit = 0; let nftsToMint = 0; - let lastUid = ''; + let lastDoc: Nft | undefined = undefined; do { - const nfts = await build5Db() + const nfts: Nft[] = await build5Db() .collection(COL.NFT) .where('collection', '==', collection.uid) .where('placeholderNft', '==', false) - .where('uid', '>', lastUid) - .orderBy('uid') + .startAfter(lastDoc) .limit(500) .get(); - lastUid = last(nfts)?.uid || ''; + lastDoc = last(nfts); const promises = nfts.map(async (nft) => { if (unsoldMintingOptions === UnsoldMintingOptions.BURN_UNSOLD && !nft.sold) { @@ -162,7 +162,7 @@ const getNftsTotalStorageDeposit = async ( const amounts = await Promise.all(promises); storageDeposit += amounts.reduce((acc, act) => acc + act, 0); nftsToMint += amounts.filter((a) => a !== 0).length; - } while (lastUid); + } while (lastDoc); return { storageDeposit, nftsToMint }; }; diff --git a/packages/search/src/common.ts b/packages/search/src/common.ts index ca4f894de..8ec325dff 100644 --- a/packages/search/src/common.ts +++ b/packages/search/src/common.ts @@ -28,40 +28,61 @@ const queryStrToParams = (url: string) => { export const isHiddenNft = (dataset: Dataset, data?: any) => dataset === Dataset.NFT && data?.hidden === true; -export const queryToObservable = (query: IQuery) => - new Observable((observer) => { - const unsubscribe = query.onSnapshot( - (data) => { - observer.next(data); - }, - (error) => { - observer.error(error); - }, - ); - return () => { - unsubscribe(); - }; +export const queryToObservable = (query: IQuery, isLive: boolean) => { + if (isLive) { + return new Observable((obs) => { + const unsubscribe = query.onSnapshot( + (data) => { + obs.next(data); + }, + (error) => { + obs.error(error); + }, + ); + return () => { + unsubscribe(); + }; + }); + } + + return new Observable((obs) => { + query + .get() + .then((r) => obs.next(r)) + .catch((err) => obs.error(err)); }); +}; export const documentToObservable = ( doc: IDocument, -) => - new Observable((observer) => { - const unsubscribe = doc.onSnapshot( - (data) => { - observer.next(data); - }, - (error) => { - observer.error(error); - }, - ); - return () => { - unsubscribe(); - }; + isLive: boolean, +): Observable => { + if (isLive) { + return new Observable((observer) => { + const unsubscribe = doc.onSnapshot( + (data) => { + observer.next(data); + }, + (error) => { + observer.error(error); + }, + ); + return () => { + unsubscribe(); + }; + }); + } + + return new Observable((obs) => { + doc + .get() + .then((r) => obs.next(r)) + .catch((err) => obs.error(err)); }); +}; -export const getHeadPriceObs = (query: IQuery) => - queryToObservable(query).pipe(map((r) => head(r)?.price || 0)); +export const getHeadPriceObs = (query: IQuery, isLive: boolean) => + queryToObservable(query, isLive).pipe(map((r) => head(r)?.price || 0)); // Used to be 42, changed to 5 to support milestone get and transactions subset export const minAddressLength = 5; diff --git a/packages/search/src/getAddresses.ts b/packages/search/src/getAddresses.ts index 098809970..5af69cfe0 100644 --- a/packages/search/src/getAddresses.ts +++ b/packages/search/src/getAddresses.ts @@ -13,7 +13,7 @@ const getAddressesSchema = Joi.object({ createdAfter: Joi.number().min(0).max(MAX_MILLISECONDS).integer().required(), }); -export const getAddresses = async (url: string) => { +export const getAddresses = async (url: string, isLive: boolean) => { const body = getQueryParams(url, getAddressesSchema); const query = build5Db() @@ -23,7 +23,7 @@ export const getAddresses = async (url: string) => { .orderBy('createdOn') .limit(1000); - return queryToObservable(query).pipe(map((mnemonics) => mnemonics.map(sanitizeMnemonic))); + return queryToObservable(query, isLive).pipe(map((mnemonics) => mnemonics.map(sanitizeMnemonic))); }; const sanitizeMnemonic = (mnemonic: Mnemonic) => ({ diff --git a/packages/search/src/getAvgPrice.ts b/packages/search/src/getAvgPrice.ts index f932da095..fad0b54a8 100644 --- a/packages/search/src/getAvgPrice.ts +++ b/packages/search/src/getAvgPrice.ts @@ -13,20 +13,20 @@ const getAvgPriceSchema = Joi.object({ .required(), }); -export const getAvgPrice = async (url: string) => { +export const getAvgPrice = async (url: string, isLive: boolean) => { const body = getQueryParams(url, getAvgPriceSchema); const tokens = Array.isArray(body.token) ? body.token : [body.token]; - const changes = tokens.map(getAvgLive); + const changes = tokens.map((t) => getAvgLive(t, isLive)); const observable = combineLatest(changes).pipe(map((r) => (r.length === 1 ? r[0] : r))); return observable; }; -const getAvgLive = (token: string) => { - const lowestPurchaseObs = getHeadPriceObs(purchaseQuery(token, true)); - const highestPurchaseObs = getHeadPriceObs(purchaseQuery(token, false)); - const lastPurchaseObs = getHeadPriceObs(purchaseQuery(token)); +const getAvgLive = (token: string, isLive: boolean) => { + const lowestPurchaseObs = getHeadPriceObs(purchaseQuery(token, true), isLive); + const highestPurchaseObs = getHeadPriceObs(purchaseQuery(token, false), isLive); + const lastPurchaseObs = getHeadPriceObs(purchaseQuery(token), isLive); return combineLatest([lowestPurchaseObs, highestPurchaseObs, lastPurchaseObs]).pipe( map(([lowest, highest, last]) => (highest + lowest + last) / 3), map((avg) => ({ id: token, avg })), diff --git a/packages/search/src/getById.ts b/packages/search/src/getById.ts index d9306509c..715390df1 100644 --- a/packages/search/src/getById.ts +++ b/packages/search/src/getById.ts @@ -21,7 +21,7 @@ const getByIdSchema = Joi.object({ subsetId: CommonJoi.uid(false, 7), }); -export const getById = async (url: string) => { +export const getById = async (url: string, isLive: boolean) => { const body = getQueryParams(url, getByIdSchema); const docRef = build5Db().doc( @@ -31,7 +31,7 @@ export const getById = async (url: string) => { body.subsetId, )! as unknown as IDocument; - const observable = documentToObservable(docRef).pipe( + const observable = documentToObservable(docRef, isLive).pipe( map((data) => { if (!data || isHiddenNft(body.dataset, data)) { return {}; diff --git a/packages/search/src/getMany.ts b/packages/search/src/getMany.ts index d0008d6fa..53268a1eb 100644 --- a/packages/search/src/getMany.ts +++ b/packages/search/src/getMany.ts @@ -48,7 +48,7 @@ const getManySchema = Joi.object({ startAfter: CommonJoi.uid(false), }); -export const getMany = async (project: string, url: string) => { +export const getMany = async (project: string, url: string, isLive: boolean) => { const body = getQueryParams(url, getManySchema); let query = build5Db() @@ -96,7 +96,7 @@ export const getMany = async (project: string, url: string) => { } } - const observable = queryToObservable(query).pipe( + const observable = queryToObservable(query, isLive).pipe( map((snap) => snap.filter((d) => !isEmpty(d)).map((d) => ({ id: d.uid, ...d }))), ); return observable; diff --git a/packages/search/src/getManyAdvanced.ts b/packages/search/src/getManyAdvanced.ts index 83320dd07..b61d00e86 100644 --- a/packages/search/src/getManyAdvanced.ts +++ b/packages/search/src/getManyAdvanced.ts @@ -61,7 +61,7 @@ const getManyAdvancedSchema = Joi.object({ startAfter: CommonJoi.uid(false), }); -export const getManyAdvanced = async (project: string, url: string) => { +export const getManyAdvanced = async (project: string, url: string, isLive: boolean) => { const body = getQueryParams(url, getManyAdvancedSchema); const { dataset, subset, setId } = body; @@ -120,7 +120,7 @@ export const getManyAdvanced = async (project: string, url: string) => { } } - return queryToObservable(query).pipe( + return queryToObservable(query, isLive).pipe( map((snap) => snap.filter((d) => !isEmpty(d)).map((d) => ({ id: d.uid, ...d }))), ); }; diff --git a/packages/search/src/getManyById.ts b/packages/search/src/getManyById.ts index 24857dd1b..15b7bab82 100644 --- a/packages/search/src/getManyById.ts +++ b/packages/search/src/getManyById.ts @@ -30,10 +30,10 @@ const getManyByIdSchema = Joi.object({ subsetIds: Joi.array().items(uidSchema).min(1).max(QUERY_MAX_LENGTH).optional(), }); -export const getManyById = async (url: string) => { +export const getManyById = async (url: string, isLive: boolean) => { const body = getQueryParams(url, getManyByIdSchema); - const observables = getQueries(body).map(documentToObservable); + const observables = getQueries(body).map((b) => documentToObservable(b, isLive)); return combineLatest(observables).pipe( map((all) => all.flat().filter((record) => record && !isHiddenNft(body.dataset, record))), ); diff --git a/packages/search/src/getPriceChange.ts b/packages/search/src/getPriceChange.ts index 2f9f60cb4..68740bed7 100644 --- a/packages/search/src/getPriceChange.ts +++ b/packages/search/src/getPriceChange.ts @@ -20,18 +20,18 @@ const getAvgPriceSchema = Joi.object({ .required(), }); -export const getPriceChange = async (url: string) => { +export const getPriceChange = async (url: string, isLive: boolean) => { const body = getQueryParams(url, getAvgPriceSchema); const tokens = Array.isArray(body.token) ? body.token : [body.token]; - const changes = tokens.map(getPriceChangeLive); + const changes = tokens.map((t) => getPriceChangeLive(t, isLive)); return combineLatest(changes).pipe(map((r) => (r.length === 1 ? r[0] : r))); }; -const getPriceChangeLive = (token: string) => { - const today = getVWAPForDates(token, purchaseQueryToday); - const yesterday = getVWAPForDates(token, purchaseQueryYesterday); +const getPriceChangeLive = (token: string, isLive: boolean) => { + const today = getVWAPForDates(token, purchaseQueryToday, isLive); + const yesterday = getVWAPForDates(token, purchaseQueryYesterday, isLive); return combineLatest([today, yesterday]).pipe( map(([last, secondToLast]) => { if (!secondToLast) { @@ -45,10 +45,11 @@ const getPriceChangeLive = (token: string) => { const getVWAPForDates = ( token: string, queryBuilder: (token: string, lowest?: boolean) => IQuery, + isLive: boolean, ) => { - const lowestPurchaseObs = getHeadPriceObs(queryBuilder(token, true)); - const highestPurchaseObs = getHeadPriceObs(queryBuilder(token, false)); - const lastPurchaseObs = getHeadPriceObs(queryBuilder(token)); + const lowestPurchaseObs = getHeadPriceObs(queryBuilder(token, true), isLive); + const highestPurchaseObs = getHeadPriceObs(queryBuilder(token, false), isLive); + const lastPurchaseObs = getHeadPriceObs(queryBuilder(token), isLive); return combineLatest([lowestPurchaseObs, highestPurchaseObs, lastPurchaseObs]).pipe( map(([lowest, highest, last]) => (highest + lowest + last) / 3), ); diff --git a/packages/search/src/getTokenPrice.ts b/packages/search/src/getTokenPrice.ts index fabb4bb06..7ad56eac8 100644 --- a/packages/search/src/getTokenPrice.ts +++ b/packages/search/src/getTokenPrice.ts @@ -27,12 +27,12 @@ const getTokenPriceSchema = Joi.object({ const tickerDocRef = build5Db().doc(COL.TICKER, TICKERS.SMRUSD); -export const getTokenPrice = async (url: string) => { +export const getTokenPrice = async (url: string, isLive: boolean) => { const body = getQueryParams(url, getTokenPriceSchema); - const ticker = documentToObservable(tickerDocRef); + const ticker = documentToObservable(tickerDocRef, isLive); const tokens = Array.isArray(body.token) ? body.token : [body.token]; - const observables = tokens.map((token) => getPriceForTokenLive(token, ticker)); + const observables = tokens.map((token) => getPriceForTokenLive(token, ticker, isLive)); const combined = combineLatest(observables).pipe( map((result) => (result.length === 1 ? result[0] : result)), ); @@ -40,9 +40,9 @@ export const getTokenPrice = async (url: string) => { return combined; }; -const getPriceForTokenLive = (token: string, ticker: Observable) => { - const lowestSell = queryToObservable(lowestSellQuery(token)); - const highestBuy = queryToObservable(highestBuyQuery(token)); +const getPriceForTokenLive = (token: string, ticker: Observable, isLive: boolean) => { + const lowestSell = queryToObservable(lowestSellQuery(token), isLive); + const highestBuy = queryToObservable(highestBuyQuery(token), isLive); const combined = combineLatest([lowestSell, highestBuy, ticker]).pipe( map(([lowestSell, highestBuy, ticker]) => { const price = calculatePrice(lowestSell, highestBuy); diff --git a/packages/search/src/getTopMilestones.ts b/packages/search/src/getTopMilestones.ts index 78c9ddd78..66536db64 100644 --- a/packages/search/src/getTopMilestones.ts +++ b/packages/search/src/getTopMilestones.ts @@ -3,9 +3,9 @@ import { Network, getMilestoneCol } from '@build-5/interfaces'; import { combineLatest, map } from 'rxjs'; import { queryToObservable } from './common'; -export const getTopMilestones = async (_: string) => { +export const getTopMilestones = async (_: string, isLive: boolean) => { const observables = Object.values(Network).map((network) => - queryToObservable(networkToQuery(network)).pipe(map((r) => ({ [network]: r[0] }))), + queryToObservable(networkToQuery(network), isLive).pipe(map((r) => ({ [network]: r[0] }))), ); return combineLatest(observables).pipe(map((r) => r.reduce((acc, act) => ({ ...acc, ...act })))); }; diff --git a/packages/search/src/getUpdatedAfter.ts b/packages/search/src/getUpdatedAfter.ts index 9e4e9196d..ddfb635d9 100644 --- a/packages/search/src/getUpdatedAfter.ts +++ b/packages/search/src/getUpdatedAfter.ts @@ -31,7 +31,7 @@ const getUpdatedAfterSchema = Joi.object({ startAfter: CommonJoi.uid(false), }); -export const getUpdatedAfter = async (project: string, url: string) => { +export const getUpdatedAfter = async (project: string, url: string, isLive: boolean) => { const body = getQueryParams(url, getUpdatedAfterSchema); const updatedAfter = body.updatedAfter ? dayjs.unix(body.updatedAfter) : dayjs().subtract(1, 'h'); @@ -71,7 +71,7 @@ export const getUpdatedAfter = async (project: string, url: string) => { } } - return queryToObservable>(query).pipe( + return queryToObservable>(query, isLive).pipe( map((snap) => snap.filter((d) => !isEmpty(d)).map((d) => ({ id: d.uid, ...d }))), ); }; diff --git a/packages/search/src/index.ts b/packages/search/src/index.ts index e74d975be..9031d0271 100644 --- a/packages/search/src/index.ts +++ b/packages/search/src/index.ts @@ -62,11 +62,14 @@ const onConnection = async (jwtToken: string, url: URL, res: express.Response | try { const project = getProjectId(jwtToken); - const observable = await getObservable(project, url); - if (res instanceof ws.WebSocket) { + const isLive = res instanceof ws.WebSocket; + const observable = await getObservable(project, url, isLive); + + if (isLive) { sendLiveUpdates(res, observable); return; } + observable.pipe(first()).subscribe({ next: (r) => { res.send(r); @@ -80,28 +83,32 @@ const onConnection = async (jwtToken: string, url: URL, res: express.Response | } }; -const getObservable = (project: string, url: URL): Promise> => { +const getObservable = ( + project: string, + url: URL, + isLive: boolean, +): Promise> => { switch (url.pathname) { case ApiRoutes.GET_BY_ID: - return getById(url.href); + return getById(url.href, isLive); case ApiRoutes.GET_MANY_BY_ID: - return getManyById(url.href); + return getManyById(url.href, isLive); case ApiRoutes.GET_MANY: - return getMany(project, url.href); + return getMany(project, url.href, isLive); case ApiRoutes.GET_MANY_ADVANCED: - return getManyAdvanced(project, url.href); + return getManyAdvanced(project, url.href, isLive); case ApiRoutes.GET_UPDATED_AFTER: - return getUpdatedAfter(project, url.href); + return getUpdatedAfter(project, url.href, isLive); case ApiRoutes.GET_TOKEN_PRICE: - return getTokenPrice(url.href); + return getTokenPrice(url.href, isLive); case ApiRoutes.GET_AVG_PRICE: - return getAvgPrice(url.href); + return getAvgPrice(url.href, isLive); case ApiRoutes.GET_PRICE_CHANGE: - return getPriceChange(url.href); + return getPriceChange(url.href, isLive); case ApiRoutes.GET_ADDRESSES: - return getAddresses(url.href); + return getAddresses(url.href, isLive); case ApiRoutes.GET_TOP_MILESTONES: - return getTopMilestones(url.href); + return getTopMilestones(url.href, isLive); case ApiRoutes.GET_NFT_MUTABLE_METADATA: return getNftMutableMetadata(url.href); case ApiRoutes.GET_NFT_IDS: