From bba901650487b208fb5b5bed9e1228d503911bd9 Mon Sep 17 00:00:00 2001 From: Snowteamer <64228468+snowteamer@users.noreply.github.com> Date: Thu, 7 Dec 2023 21:29:04 +0100 Subject: [PATCH] Support retriable Chelonia actions (#1768) * Fix return value in 'chelonia/db/set' * Add chelonia/persistent-actions.js * Apply review * Make '/load' call '/retryAll' * Make trySBP async * Emit events on action success and failure * Use .checkDatabaseKey rather than .ready * Pause and unload persistent actions on logout * Add persistent-actions.test.js * Improve persistent action tests --- frontend/main.js | 9 +- shared/domains/chelonia/db.js | 2 +- shared/domains/chelonia/events.js | 3 + shared/domains/chelonia/persistent-actions.js | 238 ++++++++++++++++++ .../chelonia/persistent-actions.test.js | 224 +++++++++++++++++ 5 files changed, 474 insertions(+), 2 deletions(-) create mode 100644 shared/domains/chelonia/persistent-actions.js create mode 100644 shared/domains/chelonia/persistent-actions.test.js diff --git a/frontend/main.js b/frontend/main.js index d9b7e69c9..0a8a3ffa5 100644 --- a/frontend/main.js +++ b/frontend/main.js @@ -17,6 +17,7 @@ import { LOGIN, LOGOUT, SWITCH_GROUP } from './utils/events.js' import './controller/namespace.js' import './controller/actions/index.js' import './controller/backend.js' +import '~/shared/domains/chelonia/persistent-actions.js' import manifests from './model/contracts/manifests.json' import router from './controller/router.js' import { PUBSUB_INSTANCE } from './controller/instance-keys.js' @@ -43,6 +44,7 @@ const { Vue, L } = Common console.info('GI_VERSION:', process.env.GI_VERSION) console.info('CONTRACTS_VERSION:', process.env.CONTRACTS_VERSION) +console.info('LIGHTWEIGHT_CLIENT:', process.env.LIGHTWEIGHT_CLIENT) console.info('NODE_ENV:', process.env.NODE_ENV) Vue.config.errorHandler = function (err, vm, info) { @@ -240,18 +242,23 @@ async function startApp () { } sbp('okTurtles.events/off', CONTRACT_IS_SYNCING, initialSyncFn) sbp('okTurtles.events/on', CONTRACT_IS_SYNCING, syncFn.bind(this)) - sbp('okTurtles.events/on', LOGIN, () => { + sbp('okTurtles.events/on', LOGIN, async () => { this.ephemeral.finishedLogin = 'yes' if (this.$store.state.currentGroupId) { this.initOrResetPeriodicNotifications() this.checkAndEmitOneTimeNotifications() } + const databaseKey = `chelonia/persistentActions/${sbp('state/vuex/getters').ourIdentityContractId}` + sbp('chelonia.persistentActions/configure', { databaseKey }) + await sbp('chelonia.persistentActions/load') }) sbp('okTurtles.events/on', LOGOUT, () => { this.ephemeral.finishedLogin = 'no' router.currentRoute.path !== '/' && router.push({ path: '/' }).catch(console.error) + // Stop timers related to periodic notifications or persistent actions. sbp('gi.periodicNotifications/clearStatesAndStopTimers') + sbp('chelonia.persistentActions/unload') }) sbp('okTurtles.events/on', SWITCH_GROUP, () => { this.initOrResetPeriodicNotifications() diff --git a/shared/domains/chelonia/db.js b/shared/domains/chelonia/db.js index 46ca54b67..2f5b941f6 100644 --- a/shared/domains/chelonia/db.js +++ b/shared/domains/chelonia/db.js @@ -77,7 +77,7 @@ const dbPrimitiveSelectors = process.env.LIGHTWEIGHT_CLIENT === 'true' // eslint-disable-next-line require-await 'chelonia/db/set': async function (key: string, value: Buffer | string): Promise { checkKey(key) - sbp('okTurtles.data/set', key, value) + return sbp('okTurtles.data/set', key, value) }, // eslint-disable-next-line require-await 'chelonia/db/delete': async function (key: string): Promise { diff --git a/shared/domains/chelonia/events.js b/shared/domains/chelonia/events.js index 0141d6290..75b4cdac5 100644 --- a/shared/domains/chelonia/events.js +++ b/shared/domains/chelonia/events.js @@ -5,3 +5,6 @@ export const CONTRACTS_MODIFIED = 'contracts-modified' export const EVENT_HANDLED = 'event-handled' export const CONTRACT_REGISTERED = 'contract-registered' export const CONTRACT_UNREGISTERED = 'contract-unregistered' +export const PERSISTENT_ACTION_FAILURE = 'persistent-action-failure' +export const PERSISTENT_ACTION_SUCCESS = 'persistent-action-success' +export const PERSISTENT_ACTION_TOTAL_FAILURE = 'persistent-action-total_failure' diff --git a/shared/domains/chelonia/persistent-actions.js b/shared/domains/chelonia/persistent-actions.js new file mode 100644 index 000000000..946f38787 --- /dev/null +++ b/shared/domains/chelonia/persistent-actions.js @@ -0,0 +1,238 @@ +'use strict' + +import sbp from '@sbp/sbp' +import '@sbp/okturtles.events' +import { PERSISTENT_ACTION_FAILURE, PERSISTENT_ACTION_SUCCESS, PERSISTENT_ACTION_TOTAL_FAILURE } from './events.js' + +type SbpInvocation = any[] +type UUIDV4 = string + +type PersistentActionOptions = { + errorInvocation?: SbpInvocation, + // Maximum number of tries, default: Infinity. + maxAttempts: number, + // How many seconds to wait between retries. + retrySeconds: number, + skipCondition?: SbpInvocation, + totalFailureInvocation?: SbpInvocation +} + +type PersistentActionStatus = {| + attempting: boolean, + failedAttemptsSoFar: number, + lastError: string, + nextRetry: string, + resolved: boolean +|} + +const coerceToError = (arg: any): Error => { + if (arg && arg instanceof Error) return arg + console.warn(tag, 'Please use Error objects when throwing or rejecting') + return new Error((typeof arg === 'string' ? arg : JSON.stringify(arg)) ?? 'undefined') +} + +const defaultOptions = { + maxAttempts: Number.POSITIVE_INFINITY, + retrySeconds: 30 +} +const tag = '[chelonia.persistentActions]' + +class PersistentAction { + id: UUIDV4 + invocation: SbpInvocation + options: PersistentActionOptions + status: PersistentActionStatus + timer: TimeoutID | void + + constructor (invocation: SbpInvocation, options: PersistentActionOptions = {}) { + // $FlowFixMe: Cannot resolve name `crypto`. + this.id = crypto.randomUUID() + this.invocation = invocation + this.options = { ...defaultOptions, ...options } + this.status = { + attempting: false, + failedAttemptsSoFar: 0, + lastError: '', + nextRetry: '', + resolved: false + } + } + + async attempt (): Promise { + // Bail out if the action is already attempting or resolved. + // TODO: should we also check whether the skipCondition call is pending? + if (this.status.attempting || this.status.resolved) return + if (await this.trySBP(this.options.skipCondition)) this.cancel() + // We need to check this again because cancel() could have been called while awaiting the trySBP call. + if (this.status.resolved) return + try { + this.status.attempting = true + const result = await sbp(...this.invocation) + this.status.attempting = false + this.handleSuccess(result) + } catch (error) { + this.status.attempting = false + await this.handleError(coerceToError(error)) + } + } + + cancel (): void { + this.timer && clearTimeout(this.timer) + this.status.nextRetry = '' + this.status.resolved = true + } + + async handleError (error: Error): Promise { + const { id, options, status } = this + // Update relevant status fields before calling any optional code. + status.failedAttemptsSoFar++ + status.lastError = error.message + const anyAttemptLeft = options.maxAttempts > status.failedAttemptsSoFar + if (!anyAttemptLeft) status.resolved = true + status.nextRetry = anyAttemptLeft && !status.resolved + ? new Date(Date.now() + options.retrySeconds * 1e3).toISOString() + : '' + // Perform any optional SBP invocation. + // The event has to be fired first for the action to be immediately removed from the list. + sbp('okTurtles.events/emit', PERSISTENT_ACTION_FAILURE, { error, id }) + await this.trySBP(options.errorInvocation) + if (!anyAttemptLeft) { + sbp('okTurtles.events/emit', PERSISTENT_ACTION_TOTAL_FAILURE, { error, id }) + await this.trySBP(options.totalFailureInvocation) + } + // Schedule a retry if appropriate. + if (status.nextRetry) { + // Note: there should be no older active timeout to clear. + this.timer = setTimeout(() => this.attempt(), this.options.retrySeconds * 1e3) + } + } + + handleSuccess (result: any): void { + const { id, status } = this + status.lastError = '' + status.nextRetry = '' + status.resolved = true + sbp('okTurtles.events/emit', PERSISTENT_ACTION_SUCCESS, { id, result }) + } + + async trySBP (invocation: SbpInvocation | void): Promise { + try { + return invocation ? await sbp(...invocation) : undefined + } catch (error) { + console.error(tag, coerceToError(error).message) + } + } +} + +// SBP API + +sbp('sbp/selectors/register', { + 'chelonia.persistentActions/_init' (): void { + this.actionsByID = Object.create(null) + this.checkDatabaseKey = () => { + if (!this.databaseKey) throw new TypeError(`${tag} No database key configured`) + } + sbp('okTurtles.events/on', PERSISTENT_ACTION_SUCCESS, ({ id }) => { + sbp('chelonia.persistentActions/cancel', id) + }) + sbp('okTurtles.events/on', PERSISTENT_ACTION_TOTAL_FAILURE, ({ id }) => { + sbp('chelonia.persistentActions/cancel', id) + }) + }, + + // Cancels a specific action by its ID. + // The action won't be retried again, but an async action cannot be aborted if its promise is stil attempting. + 'chelonia.persistentActions/cancel' (id: UUIDV4): void { + if (id in this.actionsByID) { + this.actionsByID[id].cancel() + delete this.actionsByID[id] + // Likely no need to await this call. + sbp('chelonia.persistentActions/save') + } + }, + + // TODO: validation + 'chelonia.persistentActions/configure' ({ databaseKey, options = {} }: { databaseKey: string; options: Object }): void { + this.databaseKey = databaseKey + for (const key in options) { + if (key in defaultOptions) { + defaultOptions[key] = options[key] + } else { + throw new TypeError(`${tag} Unknown option: ${key}`) + } + } + }, + + 'chelonia.persistentActions/enqueue' (...args): UUIDV4[] { + const ids: UUIDV4[] = [] + for (const arg of args) { + const action = Array.isArray(arg) + ? new PersistentAction(arg) + : new PersistentAction(arg.invocation, arg) + this.actionsByID[action.id] = action + ids.push(action.id) + } + // Likely no need to await this call. + sbp('chelonia.persistentActions/save') + for (const id of ids) this.actionsByID[id].attempt() + return ids + }, + + // Forces retrying a given persisted action immediately, rather than waiting for the scheduled retry. + // - 'status.failedAttemptsSoFar' will still be increased upon failure. + // - Does nothing if a retry is already running. + // - Does nothing if the action has already been resolved, rejected or cancelled. + 'chelonia.persistentActions/forceRetry' (id: UUIDV4): void { + if (id in this.actionsByID) { + this.actionsByID[id].attempt() + } + }, + + // Loads and tries every stored persistent action under the configured database key. + async 'chelonia.persistentActions/load' (): Promise { + this.checkDatabaseKey() + const storedActions = JSON.parse((await sbp('chelonia/db/get', this.databaseKey)) ?? '[]') + for (const { id, invocation, options } of storedActions) { + this.actionsByID[id] = new PersistentAction(invocation, options) + // Use the stored ID instead of the autogenerated one. + // TODO: find a cleaner alternative. + this.actionsByID[id].id = id + } + sbp('chelonia.persistentActions/retryAll') + }, + + // Retry all existing persisted actions. + // TODO: add some delay between actions so as not to spam the server, + // or have a way to issue them all at once in a single network call. + 'chelonia.persistentActions/retryAll' (): void { + for (const id in this.actionsByID) { + sbp('chelonia.persistentActions/forceRetry', id) + } + }, + + // Updates the database version of the attempting action list. + 'chelonia.persistentActions/save' (): Promise { + this.checkDatabaseKey() + return sbp( + 'chelonia/db/set', + this.databaseKey, + JSON.stringify(Object.values(this.actionsByID)) + ) + }, + + 'chelonia.persistentActions/status' () { + return Object.values(this.actionsByID) + // $FlowFixMe: `PersistentAction` is incompatible with mixed + .map((action: PersistentAction) => ({ id: action.id, invocation: action.invocation, ...action.status })) + }, + + // Pauses every currently loaded action, and removes them from memory. + // Note: persistent storage is not affected, so that these actions can be later loaded again and retried. + 'chelonia.persistentActions/unload' (): void { + for (const id in this.actionsByID) { + // Clear the action's timeout, but don't cancel it so that it can later resumed. + this.actionsByID[id].timer && clearTimeout(this.actionsByID[id].timer) + delete this.actionsByID[id] + } + } +}) diff --git a/shared/domains/chelonia/persistent-actions.test.js b/shared/domains/chelonia/persistent-actions.test.js new file mode 100644 index 000000000..da37f5d65 --- /dev/null +++ b/shared/domains/chelonia/persistent-actions.test.js @@ -0,0 +1,224 @@ +/* eslint-env mocha */ + +// Can run directly with: +// ./node_modules/.bin/mocha --require Gruntfile.js --require @babel/register shared/domains/chelonia/persistent-actions.test.js + +// FIXME: `Error: unsafe must be called before registering selector` when Mocha reloads the file. + +import assert from 'node:assert' +import crypto from 'node:crypto' +import sbp from '@sbp/sbp' +import sinon from 'sinon' + +import '~/shared/domains/chelonia/db.js' + +import './persistent-actions.js' +import { PERSISTENT_ACTION_FAILURE, PERSISTENT_ACTION_TOTAL_FAILURE, PERSISTENT_ACTION_SUCCESS } from './events.js' + +// Provides the 'crypto' global in the Nodejs environment. +globalThis.crypto = crypto +// Necessary to avoid 'JSON.stringify' errors since Node timeouts are circular objects, whereas browser timeouts are just integers. +setTimeout(() => {}).constructor.prototype.toJSON = () => undefined + +sbp('sbp/selectors/register', { + call (fn, ...args) { + return fn(...args) + }, + log (msg) { + console.log(msg) + }, + rejectAfter100ms (arg) { + return new Promise((resolve, reject) => { + setTimeout(() => reject(arg), 100) + }) + }, + resolveAfter100ms (arg) { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(arg), 100) + }) + }, + returnImmediately (arg) { + return arg + }, + throwImmediately (arg) { + throw arg + } +}) + +const createRandomError = () => new Error(`Bad number: ${String(Math.random())}`) +const getActionStatus = (id) => sbp('chelonia.persistentActions/status').find(obj => obj.id === id) +const isActionRemoved = (id) => !sbp('chelonia.persistentActions/status').find(obj => obj.id === id) + +const spies = { + returnImmediately: sinon.spy(sbp('sbp/selectors/fn', 'returnImmediately')) +} +// Custom `configure` options for tests. +// Mocha has a default 2000ms test timeout, therefore we'll use short delays. +const testOptions = { + maxAttempts: 3, + retrySeconds: 0.5 +} + +describe('Test persistent actions', function () { + it('should configure', function () { + sbp('chelonia.persistentActions/configure', { + databaseKey: 'test-key', + options: testOptions + }) + }) + + it('should enqueue without immediately attempting', function () { + // Prepare actions to enqueue. Random numbers are used to make invocations different. + const args = [ + // Basic syntax. + ['returnImmediately', Math.random()], + // Minimal option syntax. + { + invocation: ['returnImmediately', Math.random()] + }, + // Full option syntax. + { + errorInvocation: ['log', 'Action n°3 failed'], + invocation: ['returnImmediately', Math.random()], + maxAttempts: 4, + retrySeconds: 5, + skipCondition: ['test'], + totalFailureInvocation: ['log', 'Action n°3 totally failed'] + } + ] + const ids = sbp('chelonia.persistentActions/enqueue', ...args) + assert(Array.isArray(ids)) + assert(ids.length === args.length) + // Check the actions have been correctly queued. + ids.forEach((id, index) => { + const arg = args[index] + const status = getActionStatus(id) + assert.strictEqual(status.id, id) + assert.deepEqual(status.invocation, arg.invocation ?? arg) + assert.strictEqual(status.attempting, false) + assert.strictEqual(status.failedAttemptsSoFar, 0) + assert.strictEqual(status.lastError, '') + assert.strictEqual(status.nextRetry, '') + assert.strictEqual(status.resolved, false) + }) + // Check the actions have NOT been tried yet. + assert.strictEqual(spies.returnImmediately.called, false) + }) + + it('should emit a success event and remove the action', function () { + // Prepare actions using both sync and async invocations. + // TODO: maybe the async case is enough, which would make the code simpler. + const randomNumbers = [Math.random(), Math.random()] + const invocations = [ + ['resolveAfter100ms', randomNumbers[0]], + ['returnImmediately', randomNumbers[1]] + ] + const ids = sbp('chelonia.persistentActions/enqueue', ...invocations) + return Promise.all(ids.map((id, index) => new Promise((resolve, reject) => { + // Registers a success handler for each received id. + sbp('okTurtles.events/on', PERSISTENT_ACTION_SUCCESS, function handler (details) { + if (details.id !== id) return + try { + // Check the action has actually been called and its result is correct. + assert.strictEqual(details.result, randomNumbers[index]) + // Check the action has been correctly removed. + assert(isActionRemoved(id)) + // Wait a little to make sure the action isn't going to be retried. + setTimeout(resolve, (testOptions.retrySeconds + 1) * 1e3) + } catch (err) { + reject(err) + } finally { + sbp('okTurtles.events/off', PERSISTENT_ACTION_SUCCESS, handler) + } + }) + }))) + }) + + it('should emit a failure event and schedule a retry', function () { + const ourError = createRandomError() + const invocation = ['rejectAfter100ms', ourError] + const [id] = sbp('chelonia.persistentActions/enqueue', invocation) + return new Promise((resolve, reject) => { + sbp('okTurtles.events/once', PERSISTENT_ACTION_FAILURE, (details) => { + try { + assert.strictEqual(details.id, id) + assert.strictEqual(details.error, ourError) + // Check the action status. + const status = getActionStatus(id) + assert.strictEqual(status.failedAttemptsSoFar, 1) + assert.strictEqual(status.lastError, ourError.message) + assert.strictEqual(status.resolved, false) + // Check a retry has been scheduled. + assert(new Date(status.nextRetry) - Date.now() <= testOptions.retrySeconds * 1e3) + resolve() + } catch (err) { + reject(err) + } + }) + }) + }) + + it('should emit N failure events, then a total failure event and remove the action (sync)', function () { + const ourError = createRandomError() + const invocation = ['throwImmediately', ourError] + return e2eFailureTest(invocation, ourError) + }) + + it('should emit N failure events, then a total failure event and remove the action (async)', function () { + const ourError = createRandomError() + const invocation = ['rejectAfter100ms', ourError] + return e2eFailureTest(invocation, ourError) + }) + + it('should handle non-Error failures gracefully', function () { + const ourError = 'not a real error' + const invocation = ['rejectAfter100ms', ourError] + return e2eFailureTest(invocation, ourError) + }) + + function e2eFailureTest (invocation, ourError) { + const errorInvocationSpy = sinon.spy() + const errorInvocation = ['call', errorInvocationSpy] + + const [id] = sbp('chelonia.persistentActions/enqueue', { invocation, errorInvocation }) + + return new Promise((resolve, reject) => { + let failureEventCounter = 0 + sbp('okTurtles.events/on', PERSISTENT_ACTION_FAILURE, (details) => { + if (details.id !== id) return + failureEventCounter++ + try { + assert(failureEventCounter <= testOptions.maxAttempts, 1) + // Check the event handler was called before the corresponding SBP invocation. + assert.strictEqual(failureEventCounter, errorInvocationSpy.callCount + 1, 2) + assert.strictEqual(details.error.message, ourError?.message ?? ourError, 3) + } catch (err) { + reject(err) + } + }) + sbp('okTurtles.events/on', PERSISTENT_ACTION_TOTAL_FAILURE, (details) => { + if (details.id !== id) return + try { + assert.strictEqual(failureEventCounter, testOptions.maxAttempts, 3) + assert.strictEqual(errorInvocationSpy.callCount, testOptions.maxAttempts, 4) + assert.strictEqual(details.error.message, ourError?.message ?? ourError, 5) + assert(isActionRemoved(id), 6) + resolve() + } catch (err) { + reject(err) + } + }) + }) + } + + it('should cancel and remove the given action', function () { + return new Promise((resolve, reject) => { + // This action will reject the promise and fail the test if it ever gets tried. + const [id] = sbp('chelonia.persistentActions/enqueue', ['call', reject]) + sbp('chelonia.persistentActions/cancel', id) + assert(isActionRemoved(id)) + // Wait half a second to be sure the action isn't going to be tried despite being removed. + setTimeout(resolve, 500) + }) + }) +})