Skip to content

Commit

Permalink
feat: support PITR (time-machine) queries
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Nov 3, 2024
1 parent bcb6753 commit e5106fc
Show file tree
Hide file tree
Showing 4 changed files with 444 additions and 456 deletions.
18 changes: 12 additions & 6 deletions src/DatastoreStreamReadable.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import { Readable } from 'node:stream'
import { Query } from '@google-cloud/datastore'
import type { RunQueryInfo } from '@google-cloud/datastore/build/src/query'
import type { RunQueryInfo, RunQueryOptions } from '@google-cloud/datastore/build/src/query'
import { _ms, CommonLogger, pRetry, UnixTimestampMillisNumber } from '@naturalcycles/js-lib'
import type { ReadableTyped } from '@naturalcycles/nodejs-lib'
import type { DatastoreDBStreamOptions } from './datastore.model'

export class DatastoreStreamReadable<T = any> extends Readable implements ReadableTyped<T> {
private originalLimit: number
private readonly originalLimit: number
private rowsRetrieved = 0
private endCursor?: string
private running = false
private done = false
private lastQueryDone?: number
private totalWait = 0
private table: string
private readonly table: string
/**
* Used to support maxWait
*/
private lastReadTimestamp: UnixTimestampMillisNumber = 0
private maxWaitInterval: NodeJS.Timeout | undefined
private readonly maxWaitInterval: NodeJS.Timeout | undefined

private opt: DatastoreDBStreamOptions & { batchSize: number }
private readonly opt: DatastoreDBStreamOptions & { batchSize: number }
private dsOpt: RunQueryOptions

constructor(
private q: Query,
Expand All @@ -34,6 +35,11 @@ export class DatastoreStreamReadable<T = any> extends Readable implements Readab
batchSize: 1000,
...opt,
}
this.dsOpt = {}
if (opt.readAt) {
// Datastore expects UnixTimestamp in milliseconds
this.dsOpt.readTime = opt.readAt * 1000
}

this.originalLimit = q.limitVal
this.table = q.kinds[0]!
Expand Down Expand Up @@ -99,7 +105,7 @@ export class DatastoreStreamReadable<T = any> extends Readable implements Readab
try {
await pRetry(
async () => {
const res = await q.run()
const res = await q.run(this.dsOpt)
rows = res[0]
info = res[1]
},
Expand Down
65 changes: 43 additions & 22 deletions src/datastore.db.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { Datastore, Key, Query } from '@google-cloud/datastore'
import { PropertyFilter, Transaction } from '@google-cloud/datastore'
import { type RunQueryOptions } from '@google-cloud/datastore/build/src/query'
import {
BaseCommonDB,
CommonDB,
commonDBFullSupport,
CommonDBOptions,
CommonDBReadOptions,
CommonDBSaveMethod,
CommonDBSaveOptions,
CommonDBSupport,
Expand Down Expand Up @@ -38,6 +40,7 @@ import { boldWhite, ReadableTyped } from '@naturalcycles/nodejs-lib'
import {
DatastoreDBCfg,
DatastoreDBOptions,
DatastoreDBReadOptions,
DatastoreDBSaveOptions,
DatastoreDBStreamOptions,
DatastorePayload,
Expand Down Expand Up @@ -138,17 +141,19 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
override async getByIds<ROW extends ObjectWithId>(
table: string,
ids: string[],
opt: DatastoreDBOptions = {},
opt: DatastoreDBReadOptions = {},
): Promise<ROW[]> {
if (!ids.length) return []
const keys = ids.map(id => this.key(table, id))
let rows: any[]

const dsOpt = this.getRunQueryOptions(opt)

if (this.cfg.timeout) {
// First try
try {
const r = await pTimeout(
() => ((opt.tx as DatastoreDBTransaction)?.tx || this.ds()).get(keys),
() => ((opt.tx as DatastoreDBTransaction)?.tx || this.ds()).get(keys, dsOpt),
{
timeout: this.cfg.timeout,
name: `datastore.getByIds(${table})`,
Expand All @@ -165,7 +170,7 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {

// Second try (will throw)
const r = await pRetry(
() => ((opt.tx as DatastoreDBTransaction)?.tx || this.ds()).get(keys),
() => ((opt.tx as DatastoreDBTransaction)?.tx || this.ds()).get(keys, dsOpt),
{
...this.getPRetryOptions(`datastore.getByIds(${table}) second try`),
maxAttempts: 3,
Expand All @@ -181,7 +186,7 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
} else {
rows = await pRetry(
async () => {
return (await this.ds().get(keys))[0]
return (await this.ds().get(keys, dsOpt))[0]
},
this.getPRetryOptions(`datastore.getByIds(${table})`),
)
Expand All @@ -203,19 +208,20 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {

override async runQuery<ROW extends ObjectWithId>(
dbQuery: DBQuery<ROW>,
_opt?: DatastoreDBOptions,
opt: DatastoreDBReadOptions = {},
): Promise<RunQueryResult<ROW>> {
const idFilter = dbQuery._filters.find(f => f.name === 'id')
if (idFilter) {
const ids: string[] = idFilter.op === '==' ? [idFilter.val] : idFilter.val

return {
rows: await this.getByIds(dbQuery.table, ids),
rows: await this.getByIds(dbQuery.table, ids, opt),
}
}

const q = dbQueryToDatastoreQuery(dbQuery, this.ds().createQuery(dbQuery.table))
const qr = await this.runDatastoreQuery<ROW>(q)
const dsOpt = this.getRunQueryOptions(opt)
const qr = await this.runDatastoreQuery<ROW>(q, dsOpt)

// Special case when projection query didn't specify 'id'
if (dbQuery._selectedFieldNames && !dbQuery._selectedFieldNames.includes('id')) {
Expand All @@ -227,16 +233,20 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {

override async runQueryCount<ROW extends ObjectWithId>(
dbQuery: DBQuery<ROW>,
_opt?: DatastoreDBOptions,
opt: DatastoreDBReadOptions = {},
): Promise<number> {
const q = dbQueryToDatastoreQuery(dbQuery.select([]), this.ds().createQuery(dbQuery.table))
const aq = this.ds().createAggregationQuery(q).count('count')
const [entities] = await this.ds().runAggregationQuery(aq)
const dsOpt = this.getRunQueryOptions(opt)
const [entities] = await this.ds().runAggregationQuery(aq, dsOpt)
return entities[0]?.count
}

async runDatastoreQuery<ROW extends ObjectWithId>(q: Query): Promise<RunQueryResult<ROW>> {
const [entities, queryResult] = await this.ds().runQuery(q)
private async runDatastoreQuery<ROW extends ObjectWithId>(
q: Query,
dsOpt: RunQueryOptions,
): Promise<RunQueryResult<ROW>> {
const [entities, queryResult] = await this.ds().runQuery(q, dsOpt)

const rows = entities.map(e => this.mapId<ROW>(e))

Expand All @@ -254,6 +264,7 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
...this.cfg.streamOptions,
..._opt,
}
const dsOpt = this.getRunQueryOptions(opt)

return (
opt.experimentalCursorStream
Expand All @@ -262,7 +273,7 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
opt,
commonLoggerMinLevel(this.cfg.logger, opt.debug ? 'log' : 'warn'),
)
: (this.ds().runQueryStream(q) as ReadableTyped<ROW>)
: (this.ds().runQueryStream(q, dsOpt) as ReadableTyped<ROW>)
).map(chunk => this.mapId(chunk))
}

Expand Down Expand Up @@ -320,7 +331,7 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {

override async deleteByQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt?: DatastoreDBOptions,
opt: DatastoreDBReadOptions = {},
): Promise<number> {
const idFilter = q._filters.find(f => f.name === 'id')
if (idFilter) {
Expand All @@ -329,7 +340,8 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
}

const datastoreQuery = dbQueryToDatastoreQuery(q.select([]), this.ds().createQuery(q.table))
const { rows } = await this.runDatastoreQuery<ROW>(datastoreQuery)
const dsOpt = this.getRunQueryOptions(opt)
const { rows } = await this.runDatastoreQuery<ROW>(datastoreQuery, dsOpt)
return await this.deleteByIds(
q.table,
rows.map(obj => obj.id),
Expand Down Expand Up @@ -413,7 +425,7 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
return stats
}

mapId<T extends ObjectWithId>(o: any, preserveKey = false): T {
private mapId<T extends ObjectWithId>(o: any, preserveKey = false): T {
if (!o) return o
const r = {
...o,
Expand All @@ -424,12 +436,12 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
}

// if key field exists on entity, it will be used as key (prevent to duplication of numeric keyed entities)
toDatastoreEntity<T = any>(
private toDatastoreEntity<T extends ObjectWithId>(
kind: string,
o: T & { id?: string | number },
o: T,
excludeFromIndexes: string[] = [],
): DatastorePayload<T> {
const key = this.getDsKey(o) || this.key(kind, o.id!)
const key = this.getDsKey(o) || this.key(kind, o.id)
const data = Object.assign({}, o) as any
delete data.id
delete data[this.KEY]
Expand All @@ -441,12 +453,12 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
}
}

key(kind: string, id: string | number): Key {
key(kind: string, id: string): Key {
_assert(id, `Cannot save "${kind}" entity without "id"`)
return this.ds().key([kind, String(id)])
return this.ds().key([kind, id])
}

getDsKey(o: any): Key | undefined {
private getDsKey(o: any): Key | undefined {
return o?.[this.KEY]
}

Expand Down Expand Up @@ -562,6 +574,15 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
this.cfg.logger.error(err)
}
}

private getRunQueryOptions(opt: DatastoreDBReadOptions): RunQueryOptions {
if (!opt.readAt) return {}

return {
// Datastore expects UnixTimestamp in milliseconds
readTime: opt.readAt * 1000,
}
}
}

/**
Expand All @@ -585,7 +606,7 @@ export class DatastoreDBTransaction implements DBTransaction {
async getByIds<ROW extends ObjectWithId>(
table: string,
ids: string[],
opt?: CommonDBOptions,
opt?: CommonDBReadOptions,
): Promise<ROW[]> {
return await this.db.getByIds(table, ids, { ...opt, tx: this })
}
Expand Down
6 changes: 4 additions & 2 deletions src/datastore.model.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { DatastoreOptions, Key } from '@google-cloud/datastore'
import { CommonDBOptions, CommonDBSaveOptions } from '@naturalcycles/db-lib'
import { CommonDBOptions, CommonDBReadOptions, CommonDBSaveOptions } from '@naturalcycles/db-lib'
import { CommonLogger, NumberOfSeconds, ObjectWithId } from '@naturalcycles/js-lib'

export interface DatastorePayload<T = any> {
Expand Down Expand Up @@ -56,7 +56,7 @@ export interface DatastoreCredentials {
refresh_token?: string
}

export interface DatastoreDBStreamOptions extends DatastoreDBOptions {
export interface DatastoreDBStreamOptions extends DatastoreDBReadOptions {
/**
* Set to `true` to stream via experimental "cursor-query based stream".
*
Expand Down Expand Up @@ -116,6 +116,8 @@ export interface DatastoreDBStreamOptions extends DatastoreDBOptions {

export interface DatastoreDBOptions extends CommonDBOptions {}

export interface DatastoreDBReadOptions extends CommonDBReadOptions {}

export interface DatastoreDBSaveOptions<ROW extends ObjectWithId>
extends CommonDBSaveOptions<ROW> {}

Expand Down
Loading

0 comments on commit e5106fc

Please sign in to comment.