Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deploy action #2882

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 28 additions & 35 deletions packages/database/src/pg/impl/knex.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,31 @@
import Knex from 'knex';

export let knex: Knex.Knex | undefined = undefined;
export const knex: Knex.Knex = Knex({
client: 'pg',
connection: {
user: process.env.DB_USER,
password: process.env.DB_USER_PWD,
database: process.env.DB_NAME,
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT),
},
pool: {
min: Number(process.env.DB_POOL_MIN || 0),
max: Number(process.env.DB_POOL_MAX || 20),
},
});

export const getKnex = () => {
if (!knex) {
knex = Knex({
client: 'pg',
connection: {
user: process.env.DB_USER,
password: process.env.DB_USER_PWD,
database: process.env.DB_NAME,
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT),
},
pool: {
min: Number(process.env.DB_POOL_MIN || 0),
max: Number(process.env.DB_POOL_MAX || 10),
},
});
}
return knex;
};

export let knextran: Knex.Knex | undefined = undefined;

export const getKnexTran = () =>
Knex({
client: 'pg',
connection: {
user: process.env.DB_USER,
password: process.env.DB_USER_PWD,
database: process.env.DB_NAME,
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT),
},
pool: { min: 0, max: 1 },
});
export const knextran: Knex.Knex = Knex({
client: 'pg',
connection: {
user: process.env.DB_USER,
password: process.env.DB_USER_PWD,
database: process.env.DB_NAME,
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT),
},
pool: {
min: Number(process.env.DB_POOL_MIN || 0),
max: Number(process.env.DB_POOL_MAX || 20),
},
});
36 changes: 16 additions & 20 deletions packages/database/src/pg/impl/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ import {
PgTokenVotesUpdate,
} from '../models/token_update';
import { PgTransactionUpdate } from '../models/transaction_update';
import { getKnex, getKnexTran, knex, knextran } from './knex';
import { knex, knextran } from './knex';
import { subscriptions } from './pubsub';
import { AirdropConverter, PgAirdropCollection } from './tables/airdrop';
import { AuctionConverter } from './tables/auction';
import { AwardConverter } from './tables/award';
import { AwardOwnerConverter } from './tables/award_owner';
import { AwardParticipantConverter } from './tables/award_participant';
import { CollectionConverter, PgCollectionCollection } from './tables/collection';
import { CollectionConverter } from './tables/collection';
import { CollectionRankConverter } from './tables/collection_rank';
import { CollectionStatsConverter } from './tables/collection_stats';
import { CollectionVotesConverter } from './tables/collection_vote';
Expand All @@ -159,7 +159,7 @@ import {
MilestoneTransactions,
} from './tables/milestone_transactions';
import { MnemonicConverter } from './tables/mnemonic';
import { NftConverter } from './tables/nft';
import { NftConverter, PgNftCollection } from './tables/nft';
import { NftStakeConverter } from './tables/nft_stake';
import { NotificationConverter } from './tables/notification';
import { ProjectConverter } from './tables/project';
Expand Down Expand Up @@ -195,8 +195,8 @@ export type IColType<T extends COL, S extends SUB_COL | undefined = undefined> =
T extends COL.MEMBER ? ICollection<Member, PgMember, PgMemberUpdate> :
T extends COL.SPACE ? ICollection<Space, PgSpace, PgSpaceUpdate> :
T extends COL.PROJECT ? ICollection<Project, PgProject, PgProjectUpdate> :
T extends COL.COLLECTION ? PgCollectionCollection :
T extends COL.NFT ? ICollection<Nft, PgNft, PgNftUpdate> :
T extends COL.COLLECTION ? ICollection<Collection, PgCollection, PgCollectionUpdate> :
T extends COL.NFT ? PgNftCollection :
T extends COL.NFT_STAKE ? ICollection<NftStake, PgNftStake, PgNftStakeUpdate> :
T extends COL.TRANSACTION ? ICollection<Transaction, PgTransaction, PgTransactionUpdate> :
T extends COL.AUCTION ? ICollection<Auction, PgAuction, PgAuctionUpdate> :
Expand Down Expand Up @@ -338,11 +338,7 @@ export type IDocType<T extends COL, S extends SUB_COL | undefined = undefined> =
undefined;

export class PgDatabase implements IDatabase {
private readonly con: Knex;

constructor() {
this.con = getKnex();
}
private readonly con: Knex = knex;

private getConverter = (col: COL, subCol?: SUB_COL) => {
if (!subCol) {
Expand Down Expand Up @@ -460,8 +456,8 @@ export class PgDatabase implements IDatabase {
if (col === COL.AIRDROP) {
return new PgAirdropCollection(this.con, col, converter) as IColType<C, S>;
}
if (col === COL.COLLECTION) {
return new PgCollectionCollection(this.con, col, converter) as IColType<C, S>;
if (col === COL.NFT) {
return new PgNftCollection(this.con, col, converter) as IColType<C, S>;
}
if (col === COL.STAKE) {
return new PgStakeCollection(this.con, col, converter) as unknown as IColType<C, S>;
Expand All @@ -485,10 +481,9 @@ export class PgDatabase implements IDatabase {

runTransaction = async <T>(func: (transaction: ITransaction) => Promise<T>) => {
for (let i = 0; i < 240; ++i) {
const con = getKnexTran();
let trx: Knex.Transaction | undefined = undefined;
try {
trx = await con.transaction();
trx = await knextran.transaction();
const transaction = new PgRunTransaction(trx);
const result = await func(transaction);
if (transaction.allLocksAquired) {
Expand All @@ -501,8 +496,6 @@ export class PgDatabase implements IDatabase {
} catch (err) {
await trx?.rollback();
throw err;
} finally {
await con.destroy();
}
}
throw { code: 500, key: 'Failed to execute transaction' };
Expand All @@ -515,7 +508,7 @@ export class PgDatabase implements IDatabase {
arrayRemove = <T>(value: T) => new ArrayRemove(value);

destroy = async () => {
const promises: any[] = [knex?.destroy(), knextran?.destroy()];
const promises: any[] = [knex.destroy(), knextran.destroy()];

for (const subs of Object.values(subscriptions)) {
promises.push((await subs).delete());
Expand Down Expand Up @@ -631,9 +624,12 @@ export class PgRunTransaction implements ITransaction {
}
try {
return await docRef.useTransaction(this.trx, (doc) => doc.get());
} catch {
this.allLocksAquired = false;
return await docRef.get();
} catch (err: any) {
if (err.name === 'KnexTimeoutError') {
this.allLocksAquired = false;
return await docRef.get();
}
throw err;
}
};

Expand Down
22 changes: 1 addition & 21 deletions packages/database/src/pg/impl/tables/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,11 @@ import {
UnsoldMintingOptions,
} from '@buildcore/interfaces';
import { get } from 'lodash';
import { ICollection } from '../../interfaces/collection';
import { Converter } from '../../interfaces/common';
import { PgCollection } from '../../models';
import { PgCollectionUpdate } from '../../models/collection_update';
import { removeNulls } from '../common';
import { pgDateToTimestamp } from '../postgres';

export class PgCollectionCollection extends ICollection<
Collection,
PgCollection,
PgCollectionUpdate
> {
updateFloorPrice = async () => {
await this.con(this.table).update({
floorPrice: this.con.raw(`(
SELECT MIN("availablePrice")
FROM nft
WHERE collection = collection.uid AND
nft."saleAccess" = 0 AND
available IN (1, 3)
)`),
});
};
}

export class CollectionConverter implements Converter<Collection, PgCollection> {
toPg = (collection: Collection): PgCollection => ({
uid: collection.uid,
Expand Down Expand Up @@ -124,7 +104,7 @@ export class CollectionConverter implements Converter<Collection, PgCollection>
tokenSymbol: get(d, 'tokenSymbol', ''),
tokenReward: get(d, 'tokenReward', 0),
amount: get(d, 'amount', 0),
}) as DiscountLine,
} as DiscountLine),
),
total: pg.total || 0,
totalTrades: pg.totalTrades || 0,
Expand Down
18 changes: 17 additions & 1 deletion packages/database/src/pg/impl/tables/nft.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,27 @@ import {
PropStats,
UnsoldMintingOptions,
} from '@buildcore/interfaces';
import { get, head } from 'lodash';
import { ICollection } from '../../interfaces/collection';
import { Converter } from '../../interfaces/common';
import { PgNft } from '../../models';
import { PgNft, PgNftUpdate } from '../../models';
import { removeNulls } from '../common';
import { pgDateToTimestamp } from '../postgres';

export class PgNftCollection extends ICollection<Nft, PgNft, PgNftUpdate> {
getFloorPrice = async (collection: string) => {
const result = await this.con.raw(
`SELECT MIN("availablePrice") as "floorPrice"
FROM ${this.table}
WHERE collection = ? AND nft."saleAccess" = ? AND available IN (?, ?)
`,
[collection, NftAccess.OPEN, NftAvailable.SALE, NftAvailable.AUCTION_AND_SALE],
);

return get(head(result.rows), 'floorPrice', 0);
};
}

export class NftConverter implements Converter<Nft, PgNft> {
toPg = (nft: Nft): PgNft => ({
uid: nft.uid,
Expand Down
2 changes: 2 additions & 0 deletions packages/database/src/pg/interfaces/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export class ICollection<T, Q extends BaseRecord, U extends Update> {
whereIn = <F extends keyof Q>(fieldPath: F, value: Q[F][]): IQuery<T, Q> =>
this.createQuery().whereIn(fieldPath, value);

startAfter = (data: T | undefined): IQuery<T, Q> => this.createQuery().startAfter(data);

update = async <F extends keyof Q>(data: U, where: Record<F, Q[F]>) => {
await this.con(this.table).update(undefinedToNull(data)).where(where);
};
Expand Down
2 changes: 1 addition & 1 deletion packages/database/src/pg/interfaces/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class IDocument<C, B extends BaseRecord, U extends Update> {
createQuery = () => {
let query = this.con(this.table).where(this.pKey);
if (this.con.isTransaction) {
query.forUpdate().noWait();
query.forUpdate().timeout(200);
}
return query;
};
Expand Down
7 changes: 2 additions & 5 deletions packages/functions/deploy.script.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ const deployServices = () => {
...flattenObject(onTriggers),
...flattenObject(onScheduled),
...flattenObject(onStorage),
}).forEach(([name, value], index) => {
if (index % 50 === 0) {
fs.appendFileSync(file, 'wait\n\n');
}
}).forEach(([name, value]) => {
const options = (value as CloudFunctions).runtimeOptions;

let command = `gcloud run deploy ${name} \\
Expand Down Expand Up @@ -150,7 +147,7 @@ const deployCronTriggers = () => {
const setMaxAckDeadline = () => {
fs.appendFileSync(
file,
`for SUBSCRIPTION_NAME in $(gcloud pubsub subscriptions list --format="value(name)")\n`,
`for SUBSCRIPTION_NAME in $(gcloud pubsub subscriptions list --format="value(name)" | grep -v 'subs-onupsert')\n`,
);
fs.appendFileSync(file, `do\n`);
fs.appendFileSync(
Expand Down
23 changes: 20 additions & 3 deletions packages/functions/src/cron/collection.floor.price.cron.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
import { database } from '@buildcore/database';
import { COL } from '@buildcore/interfaces';
import { COL, Collection } from '@buildcore/interfaces';
import { last } from 'lodash';

export const updateFloorPriceOnCollections = () =>
database().collection(COL.COLLECTION).updateFloorPrice();
const LIMIT = 500;
export const updateFloorPriceOnCollections = async () => {
let lastDoc: Collection | undefined = undefined;
do {
const snap: Collection[] = await database()
.collection(COL.COLLECTION)
.startAfter(lastDoc)
.limit(LIMIT)
.get();
lastDoc = last(snap);

const promises = snap.map(async (col) => {
const floorPrice = await database().collection(COL.NFT).getFloorPrice(col.uid);
await database().doc(COL.COLLECTION, col.uid).update({ floorPrice });
});
await Promise.all(promises);
} while (lastDoc);
};
Loading