Skip to content

Commit

Permalink
feat: use pRetry around commitTransaction
Browse files Browse the repository at this point in the history
Not only around saveBatch.
  • Loading branch information
kirillgroshkov committed Oct 16, 2022
1 parent 65718bc commit 32bcbd9
Showing 1 changed file with 40 additions and 35 deletions.
75 changes: 40 additions & 35 deletions src/datastore.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
CommonDBSaveMethod,
DBQuery,
DBTransaction,
mergeDBOperations,
RunQueryResult,
} from '@naturalcycles/db-lib'
import {
Expand All @@ -25,6 +26,8 @@ import {
commonLoggerMinLevel,
pTimeout,
pRetryFn,
pRetry,
PRetryOptions,
} from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { boldWhite } from '@naturalcycles/nodejs-lib/dist/colors'
Expand Down Expand Up @@ -255,24 +258,9 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {

const method = methodMap[opt.saveMethod || 'upsert'] || 'save'

const save = pRetryFn(
async (batch: DatastorePayload<ROW>[]) => {
await (opt.tx || this.ds())[method](batch)
},
{
// Here we retry the GOAWAY errors that are somewhat common for Datastore
// Currently only retrying them here in .saveBatch(), cause probably they're only thrown when saving
predicate: err => RETRY_ON.some(s => err?.message?.includes(s)),
name: `DatastoreLib.saveBatch(${table})`,
maxAttempts: 5,
delay: 5000,
delayMultiplier: 2,
logFirstAttempt: false,
logFailures: true,
// logAll: true,
logger: this.cfg.logger,
},
)
const save = pRetryFn(async (batch: DatastorePayload<ROW>[]) => {
await (opt.tx || this.ds())[method](batch)
}, this.getPRetryOptions(`DatastoreLib.saveBatch(${table})`))

try {
const chunks = _chunk(entities, MAX_ITEMS)
Expand Down Expand Up @@ -328,28 +316,31 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
_tx: DBTransaction,
opt?: DatastoreDBSaveOptions,
): Promise<void> {
const tx = this.ds().transaction()
// Using Retry, because Datastore can throw errors like "too much contention" here
await pRetry(async () => {
const tx = this.ds().transaction()

try {
await tx.run()
try {
await tx.run()

// const ops = mergeDBOperations(_tx.ops)
const ops = mergeDBOperations(_tx.ops)

for await (const op of _tx.ops) {
if (op.type === 'saveBatch') {
await this.saveBatch(op.table, op.rows, { ...opt, tx })
} else if (op.type === 'deleteByIds') {
await this.deleteByIds(op.table, op.ids, { ...opt, tx })
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
for await (const op of ops) {
if (op.type === 'saveBatch') {
await this.saveBatch(op.table, op.rows, { ...opt, tx })
} else if (op.type === 'deleteByIds') {
await this.deleteByIds(op.table, op.ids, { ...opt, tx })
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
}
}
}

await tx.commit()
} catch (err) {
void tx.rollback()
throw err // rethrow
}
await tx.commit()
} catch (err) {
await tx.rollback()
throw err // rethrow
}
}, this.getPRetryOptions(`DatastoreLib.commitTransaction`))
}

async getAllStats(): Promise<DatastoreStats[]> {
Expand Down Expand Up @@ -496,4 +487,18 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {

return s
}

private getPRetryOptions(name: string): PRetryOptions {
return {
predicate: err => RETRY_ON.some(s => err?.message?.includes(s)),
name,
maxAttempts: 5,
delay: 5000,
delayMultiplier: 2,
logFirstAttempt: false,
logFailures: true,
// logAll: true,
logger: this.cfg.logger,
}
}
}

0 comments on commit 32bcbd9

Please sign in to comment.