From ef7f5421e1a9f9844fd7db6e58e4903ec172ee33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Iv=C3=A1n=20Vieitez=20Parra?= <3857362+corrideat@users.noreply.github.com> Date: Thu, 14 Mar 2024 09:33:39 +0100 Subject: [PATCH] Limit events after (#1876) * Add limit to eventsAfter * Minor cleanup * Convert circularClassList.js into a class * Simplify streamEventsAfter * Remove a bit of debugging in addEntry * Handle read() error in streamEventsAfter * No longer reverse entries in streamEventsAfter * fix the double dialog box issue * Remove pepper, read secret from file and minimise the number of secrets * Signature verification * Remove groupContractID in chatroom contract attributes (#1847) * feat: removed groupContractID in chatroom attributes * fix: recovered package.json * chore: remove unnecessary getter * chore: updated manifest * Fix error to close CreatePoll modal by clicking sidebar (#1849) * fix: error to close CreatePoll modal by clicking sidebar * chore: added comment for better understanding * chore: fixed typo * fix: make notification after identity contract is fully synced (#1848) * #1851 - Fix the broken VoterAvatars.vue component (#1852) * add a fix for Prompt.vue not being closed properly * fix the broken VoterAvatars.vue * Replace checks to use _private instead of _private_ * Remove notifications while leaving group (#1859) * fix: error in removing notifications after leaving group * chore: random commit for running travis again * Encrypted files (#1858) * WIP upload files * Files uploading (wip) * Files uploading, HAPI integration (wip) * File upload and download * Encrypted files * Changes to dowload API for ergonomics and manifest processing * Avatar types * File caching * Documentation * Update avatar types in Message* classes * Fix flow errors * Remove streams check for Node.js * Fix tests & feedback * Fix tests & feedback (again) * Fix tests for Node 18 * Fix test Co-authored-by: Greg Slepak --------- Co-authored-by: Greg Slepak * Fix streams support * Fix error in getting updated with the user's profile after rejoining group (#1864) * fix: sharing new PEKs * chore: grunt pin:0.2.5 * chore: reverted grunt pin and removed manifest.json --------- Co-authored-by: Greg Slepak * #1571 - Create vuex module for chat (#1854) * add chatroom/vuexModule.js with some boilerplate code in there * fix a linter error * move chat&DM related getters in state.js to the chatroom vuexModule * move all the chatroom-relateed mutations to the vuexModules * add chatroom vuexModule to store in state.js * a small bug-fix for re-login process * typo fix for the cypress failure * remove postUpgradeVerification code (resetting groups anyway) --------- Co-authored-by: Greg Slepak * Fix issue #1857 * Fix incorrect badge while switching group (#1863) * fix: incorrect badge from direct messages * chore: grunt pin:0.2.4 * chore: reverted version 0.2.4 * chore: reverted the grunt pin:0.2.3 too * chore: removed manifest.json * fix the overflow bug * Fix error to display profile picture for DMs with more than 2 people (#1869) * fix: error to select avatar for direct messages * chore: follow-up process for changes of getters * fix: error in getting direct messages by groupID * fix: syntax error * 32 logging (#1870) * pino enabled, still need to fix logging behavior though * fix flow errors * pino support seems to be working * one minor fix for %s strings. Closes #32 * fix some error logging in pubsub + remove hapi-pino * Convert eventsAfter to stream (missing: change usage in ChatMain) * API improvements * Endpoint consolidation * Fix subscriptionSet issues * Fix chatroom race condition * Log instead of throw * Cleaning up * Linting --------- Co-authored-by: snowteamer <64228468+snowteamer@users.noreply.github.com> Co-authored-by: SebinSong Co-authored-by: Alex Jin <57976479+Silver-IT@users.noreply.github.com> Co-authored-by: Greg Slepak --- Gruntfile.js | 4 +- backend/database.js | 117 +++--------- backend/routes.js | 42 +---- frontend/model/captureLogs.js | 4 +- frontend/utils/circularList.js | 38 ---- .../views/containers/chatroom/ChatMain.vue | 96 ++++++---- .../chatroom/EditChannelNameModal.vue | 9 +- shared/CircularList.js | 48 +++++ shared/domains/chelonia/chelonia.js | 80 ++++---- shared/domains/chelonia/db.js | 1 + shared/domains/chelonia/internals.js | 57 +++--- shared/domains/chelonia/utils.js | 177 +++++++++++++++++- 12 files changed, 397 insertions(+), 276 deletions(-) delete mode 100644 frontend/utils/circularList.js create mode 100644 shared/CircularList.js diff --git a/Gruntfile.js b/Gruntfile.js index 92cfa7771..2846749c1 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -41,6 +41,7 @@ const packageJSON = require('./package.json') const { CI = '', LIGHTWEIGHT_CLIENT = 'true', + MAX_EVENTS_AFTER = '', NODE_ENV = 'development', EXPOSE_SBP = '', ENABLE_UNSAFE_NULL_CRYPTO = 'false' @@ -217,6 +218,7 @@ module.exports = (grunt) => { 'process.env.CONTRACTS_VERSION': `'${CONTRACTS_VERSION}'`, 'process.env.GI_VERSION': `'${GI_VERSION}'`, 'process.env.LIGHTWEIGHT_CLIENT': `'${LIGHTWEIGHT_CLIENT}'`, + 'process.env.MAX_EVENTS_AFTER': `'${MAX_EVENTS_AFTER}'`, 'process.env.NODE_ENV': `'${NODE_ENV}'`, 'process.env.EXPOSE_SBP': `'${EXPOSE_SBP}'`, 'process.env.ENABLE_UNSAFE_NULL_CRYPTO': `'${ENABLE_UNSAFE_NULL_CRYPTO}'` @@ -697,7 +699,7 @@ module.exports = (grunt) => { }) process.on('unhandledRejection', (reason, p) => { - console.error('[gruntfile] Unhandled promise rejection:', p, 'reason:', reason) + console.error('[gruntfile] Unhandled promise rejection:', p, 'reason:', reason.message, reason.stack) process.exit(1) }) } diff --git a/backend/database.js b/backend/database.js index d9846f37b..b1a6678b7 100644 --- a/backend/database.js +++ b/backend/database.js @@ -37,107 +37,50 @@ if (!fs.existsSync(dataFolder)) { fs.mkdirSync(dataFolder, { mode: 0o750 }) } +// Streams stored contract log entries since the given entry hash (inclusive!). sbp('sbp/selectors/register', { - 'backend/db/streamEntriesAfter': async function (contractID: string, hash: string): Promise<*> { + 'backend/db/streamEntriesAfter': async function (contractID: string, height: string, requestedLimit: ?number): Promise<*> { + const limit = Math.min(requestedLimit ?? Number.POSITIVE_INFINITY, process.env.MAX_EVENTS_BATCH_SIZE ?? 500) const latestHEADinfo = await sbp('chelonia/db/latestHEADinfo', contractID) if (!latestHEADinfo) { throw Boom.notFound(`contractID ${contractID} doesn't exist!`) } - let { HEAD: currentHEAD } = latestHEADinfo + // Number of entries pushed. + let counter = 0 + let currentHash = await sbp('chelonia/db/get', `_private_hidx=${contractID}#${height}`) let prefix = '[' // NOTE: if this ever stops working you can also try Readable.from(): // https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options - return new Readable({ - async read (): any { - try { - const entry = await sbp('chelonia/db/getEntry', currentHEAD) - const json = `"${strToB64(entry.serialize())}"` - if (currentHEAD !== hash) { - this.push(prefix + json) - currentHEAD = entry.head().previousHEAD - prefix = ',' - } else { - this.push(prefix + json + ']') + const stream = new Readable({ + read (): void { + if (currentHash && counter < limit) { + sbp('chelonia/db/getEntry', currentHash).then(async entry => { + if (entry) { + this.push(`${prefix}"${strToB64(entry.serialize())}"`) + prefix = ',' + counter++ + currentHash = await sbp('chelonia/db/get', `_private_hidx=${contractID}#${entry.height() + 1}`) + } else { + this.push(counter > 0 ? ']' : '[]') + this.push(null) + } + }).catch(e => { + console.error(`[backend] streamEntriesAfter: read(): ${e.message}:`, e) + this.push(counter > 0 ? ']' : '[]') this.push(null) - } - } catch (e) { - console.error(e, `read(): ${e.message}`) - this.push(']') + }) + } else { + this.push(counter > 0 ? ']' : '[]') this.push(null) } } }) - }, - 'backend/db/streamEntriesBefore': async function (before: string, limit: number): Promise<*> { - let prefix = '[' - let currentHEAD = before - let entry = await sbp('chelonia/db/getEntry', currentHEAD) - if (!entry) { - throw Boom.notFound(`entry ${currentHEAD} doesn't exist!`) + // $FlowFixMe[prop-missing] + stream.headers = { + 'shelter-headinfo-head': latestHEADinfo.HEAD, + 'shelter-headinfo-height': latestHEADinfo.height } - limit++ // to return `before` apart from the `limit` number of events - // NOTE: if this ever stops working you can also try Readable.from(): - // https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options - return new Readable({ - async read (): any { - try { - if (!currentHEAD || !limit) { - this.push(']') - this.push(null) - } else { - entry = await sbp('chelonia/db/getEntry', currentHEAD) - const json = `"${strToB64(entry.serialize())}"` - this.push(prefix + json) - prefix = ',' - limit-- - currentHEAD = entry.head().previousHEAD - } - } catch (e) { - // TODO: properly return an error to caller, see https://nodejs.org/api/stream.html#errors-while-reading - console.error(e, `read(): ${e.message}:`) - this.push(']') - this.push(null) - } - } - }) - }, - 'backend/db/streamEntriesBetween': async function (startHash: string, endHash: string, offset: number): Promise<*> { - let prefix = '[' - let isMet = false - let currentHEAD = endHash - let entry = await sbp('chelonia/db/getEntry', currentHEAD) - if (!entry) { - throw Boom.notFound(`entry ${currentHEAD} doesn't exist!`) - } - // NOTE: if this ever stops working you can also try Readable.from(): - // https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options - return new Readable({ - async read (): any { - try { - entry = await sbp('chelonia/db/getEntry', currentHEAD) - const json = `"${strToB64(entry.serialize())}"` - this.push(prefix + json) - prefix = ',' - - if (currentHEAD === startHash) { - isMet = true - } else if (isMet) { - offset-- - } - - currentHEAD = entry.head().previousHEAD - if (!currentHEAD || (isMet && !offset)) { - this.push(']') - this.push(null) - } - } catch (e) { - // TODO: properly return an error to caller, see https://nodejs.org/api/stream.html#errors-while-reading - console.error(e, `read(): ${e.message}:`) - this.push(']') - this.push(null) - } - } - }) + return stream }, // ======================= // wrapper methods to add / lookup names diff --git a/backend/routes.js b/backend/routes.js index 950de537b..634db587c 100644 --- a/backend/routes.js +++ b/backend/routes.js @@ -82,14 +82,14 @@ route.POST('/event', { } }) -route.GET('/eventsAfter/{contractID}/{since}', {}, async function (request, h) { - const { contractID, since } = request.params +route.GET('/eventsAfter/{contractID}/{since}/{limit?}', {}, async function (request, h) { + const { contractID, since, limit } = request.params try { if (contractID.startsWith('_private') || since.startsWith('_private')) { return Boom.notFound() } - const stream = await sbp('backend/db/streamEntriesAfter', contractID, since) + const stream = await sbp('backend/db/streamEntriesAfter', contractID, since, limit) // "On an HTTP server, make sure to manually close your streams if a request is aborted." // From: http://knexjs.org/#Interfaces-Streams // https://github.com/tgriesser/knex/wiki/Manually-Closing-Streams @@ -107,42 +107,6 @@ route.GET('/eventsAfter/{contractID}/{since}', {}, async function (request, h) { } }) -route.GET('/eventsBefore/{before}/{limit}', {}, async function (request, h) { - const { before, limit } = request.params - try { - if (!before) return Boom.badRequest('missing before') - if (!limit) return Boom.badRequest('missing limit') - if (isNaN(parseInt(limit)) || parseInt(limit) <= 0) return Boom.badRequest('invalid limit') - if (before.startsWith('_private')) return Boom.notFound() - - const stream = await sbp('backend/db/streamEntriesBefore', before, parseInt(limit)) - request.events.once('disconnect', stream.destroy.bind(stream)) - return stream - } catch (err) { - logger.error(err, `GET /eventsBefore/${before}/${limit}`, err.message) - return err - } -}) - -route.GET('/eventsBetween/{startHash}/{endHash}', {}, async function (request, h) { - const { startHash, endHash } = request.params - try { - const offset = parseInt(request.query.offset || '0') - - if (!startHash) return Boom.badRequest('missing startHash') - if (!endHash) return Boom.badRequest('missing endHash') - if (isNaN(offset) || offset < 0) return Boom.badRequest('invalid offset') - if (startHash.startsWith('_private') || endHash.startsWith('_private')) return Boom.notFound() - - const stream = await sbp('backend/db/streamEntriesBetween', startHash, endHash, offset) - request.events.once('disconnect', stream.destroy.bind(stream)) - return stream - } catch (err) { - logger.error(err, `GET /eventsBetwene/${startHash}/${endHash}`, err.message) - return err - } -}) - /* // The following endpoint is disabled because name registrations are handled // through the `shelter-namespace-registration` header when registering a diff --git a/frontend/model/captureLogs.js b/frontend/model/captureLogs.js index 64f634dab..277c42a4e 100644 --- a/frontend/model/captureLogs.js +++ b/frontend/model/captureLogs.js @@ -1,6 +1,6 @@ import sbp from '@sbp/sbp' import { CAPTURED_LOGS, SET_APP_LOGS_FILTER } from '~/frontend/utils/events.js' -import { createCircularList } from '~/frontend/utils/circularList.js' +import CircularList from '~/shared/CircularList.js' /* - giConsole/[username]/entries - the stored log entries. @@ -28,7 +28,7 @@ const setItem = (key: string, value: any): void => { } function createLogger (config: Object): Object { - const entries = createCircularList(config.maxEntries) + const entries = new CircularList(config.maxEntries) const methods = loggingLevels.reduce( (acc, name) => { acc[name] = (...args) => { diff --git a/frontend/utils/circularList.js b/frontend/utils/circularList.js deleted file mode 100644 index 9836653f2..000000000 --- a/frontend/utils/circularList.js +++ /dev/null @@ -1,38 +0,0 @@ -'use strict' - -// A list with fixed capacity and constant-time `add()`. -export function createCircularList (capacity: number, defaultValue: any = ''): Object { - const buffer: string[] = new Array(capacity).fill(defaultValue) - let isFull = false - let offset = 0 - - // NOTE: this code doesn't let distinct instances share their method objects, - // which would be bad for memory usage if many instances were created. - // But that's fine since we're only using one so far. - return { - add (entry) { - buffer[offset] = entry - if (offset === capacity - 1) { - isFull = true - } - offset = (offset + 1) % capacity - }, - addAll (entries: Array<*>) { - for (const entry of entries) { - this.add(entry) - } - }, - clear () { - buffer.fill(defaultValue) - isFull = false - offset = 0 - }, - toArray (): Array<*> { - return ( - isFull - ? [...buffer.slice(offset), ...buffer.slice(0, offset)] - : buffer.slice(0, offset) - ) - } - } -} diff --git a/frontend/views/containers/chatroom/ChatMain.vue b/frontend/views/containers/chatroom/ChatMain.vue index e09369439..7490d0d3d 100644 --- a/frontend/views/containers/chatroom/ChatMain.vue +++ b/frontend/views/containers/chatroom/ChatMain.vue @@ -138,6 +138,17 @@ import { proximityDate, MINS_MILLIS } from '@model/contracts/shared/time.js' import { cloneDeep, debounce, throttle } from '@model/contracts/shared/giLodash.js' import { EVENT_HANDLED } from '~/shared/domains/chelonia/events.js' +const collectEventStream = async (s: ReadableStream) => { + const reader = s.getReader() + const r = [] + for (;;) { + const { done, value } = await reader.read() + if (done) break + r.push(value) + } + return r +} + const ignorableScrollDistanceInPixel = 500 // The following methods are wrapped inside `debounce`, which requires calling @@ -468,7 +479,9 @@ export default ({ } else { const contractID = this.summary.chatRoomId const limit = this.chatRoomSettings?.actionsPerPage || CHATROOM_ACTIONS_PER_PAGE - const events = await sbp('chelonia/out/eventsBetween', messageHash, this.messages[0].hash, limit / 2) + const events = await collectEventStream(sbp('chelonia/out/eventsBetween', contractID, messageHash, this.messages[0].height, limit / 2)).catch((e) => { + console.debug(`Error fetching events or message ${messageHash} doesn't belong to ${contractID}`) + }) if (!this.checkEventSourceConsistency(contractID)) return if (events && events.length) { await this.rerenderEvents(events) @@ -486,10 +499,12 @@ export default ({ } }, updateScroll (scrollTargetMessage = null, effect = false) { - if (this.summary.chatRoomId) { + const contractID = this.summary.chatRoomId + if (contractID) { // force conversation viewport to be at the bottom (most recent messages) setTimeout(() => { if (scrollTargetMessage) { + if (!this.checkEventSourceConsistency(contractID)) return this.scrollToMessage(scrollTargetMessage, effect) } else { this.jumpToLatest() @@ -621,38 +636,47 @@ export default ({ let events = [] const isLoadedFromStorage = !this.ephemeral.messagesInitiated && this.latestEvents.length if (isLoadedFromStorage) { - const prevLastEventHash = this.messageState.prevTo // NOTE: check loadMessagesFromStorage function - const newEvents = await sbp('chelonia/out/eventsAfter', chatRoomId, prevLastEventHash) - newEvents.shift() // NOTE: already exists in this.latestEvents - if (newEvents.length > 0) { - // This ensures that `this.latestEvents.push(message.serialize())` - // below happens in order - await sbp('okTurtles.eventQueue/queueEvent', 'chatroom-events', async () => { - if (!this.checkEventSourceConsistency(chatRoomId)) return + const prevLastEvent = this.messageState.prevTo // NOTE: check loadMessagesFromStorage function + const newEventsStream = sbp('chelonia/out/eventsAfter', chatRoomId, prevLastEvent.height, undefined, prevLastEvent.hash) + const newEventsStreamReader = newEventsStream.getReader() + await sbp('okTurtles.eventQueue/queueEvent', 'chatroom-events', async () => { + // NOTE: discard the first event, since it already exists in + // this.latestEvents + const { done } = await newEventsStreamReader.read() + if (done) return + if (!this.checkEventSourceConsistency(chatRoomId)) return - for (const event of newEvents) { - const state = this.messageState.contract - const newState = await sbp('chelonia/in/processMessage', event, state) + for (;;) { + const { done, value: event } = await newEventsStreamReader.read() + if (done) break - if (!this.checkEventSourceConsistency(chatRoomId)) return + const state = this.messageState.contract + const newState = await sbp('chelonia/in/processMessage', event, state) - Vue.set(this.messageState, 'contract', newState) - this.latestEvents.push(event) - } + if (!this.checkEventSourceConsistency(chatRoomId)) return - this.$forceUpdate() - }) - } + Vue.set(this.messageState, 'contract', newState) + this.latestEvents.push(event) + } + + this.$forceUpdate() + }).catch(e => { + console.error('[ChatMain.vue] Error processing events at renderMoreMessages', e) + }).finally(() => { + newEventsStreamReader.releaseLock() + }) } else { if (!this.ephemeral.messagesInitiated && messageHashToScroll) { - const { HEAD: latestHash } = await sbp('chelonia/out/latestHEADInfo', chatRoomId) - events = await sbp('chelonia/out/eventsBetween', messageHashToScroll, latestHash, limit) + const { height: latestHeight } = await sbp('chelonia/out/latestHEADInfo', chatRoomId) + if (!this.checkEventSourceConsistency(chatRoomId)) return + events = await collectEventStream(sbp('chelonia/out/eventsBetween', chatRoomId, messageHashToScroll, latestHeight, limit)) } else if (!this.ephemeral.messagesInitiated || !this.latestEvents.length) { - const { HEAD: latestHash } = await sbp('chelonia/out/latestHEADInfo', chatRoomId) - events = await sbp('chelonia/out/eventsBefore', latestHash, limit) + const { height: latestHeight } = await sbp('chelonia/out/latestHEADInfo', chatRoomId) + if (!this.checkEventSourceConsistency(chatRoomId)) return + events = await collectEventStream(sbp('chelonia/out/eventsBefore', chatRoomId, latestHeight, limit)) } else { - const before = GIMessage.deserializeHEAD(this.latestEvents[0]).hash - events = await sbp('chelonia/out/eventsBefore', before, limit) + const beforeHeight = GIMessage.deserializeHEAD(this.latestEvents[0]).head.height + events = await collectEventStream(sbp('chelonia/out/eventsBefore', chatRoomId, beforeHeight, limit)) } } @@ -684,8 +708,7 @@ export default ({ this.initializeState() - // This ensures that `this.latestEvents.push(message.serialize())` below - // happens in order + // This ensures that `this.latestEvents.push(event)` below happens in order return sbp('okTurtles.eventQueue/queueEvent', 'chatroom-events', async () => { if (!this.checkEventSourceConsistency(contractID)) return @@ -708,10 +731,15 @@ export default ({ if (!this.checkEventSourceConsistency(chatRoomId)) return const latestEvents = prevState ? JSON.parse(prevState) : [] - this.messageState.prevFrom = latestEvents.length ? GIMessage.deserializeHEAD(latestEvents[0]).hash : null - this.messageState.prevTo = latestEvents.length - ? GIMessage.deserializeHEAD(latestEvents[latestEvents.length - 1]).hash - : null + if (latestEvents.length) { + const deserializedHEADfirst = GIMessage.deserializeHEAD(latestEvents[0]) + const deserializedHEADlast = GIMessage.deserializeHEAD(latestEvents[latestEvents.length - 1]) + this.messageState.prevFrom = { hash: deserializedHEADfirst.hash, height: deserializedHEADfirst.head.height } + this.messageState.prevTo = { hash: deserializedHEADlast.hash, height: deserializedHEADlast.head.height } + } else { + this.messageState.prevFrom = null + this.messageState.prevTo = null + } await this.rerenderEvents(latestEvents) }, @@ -794,7 +822,7 @@ export default ({ // NOTE: while syncing the chatroom contract, we should ignore all the events const { addedOrDeleted } = isMessageAddedOrDeleted(message) - // This ensures that `this.latestEvents.push(message.serialize())` below + // This ensures that `this.latestEvents.push(serializedMessage)` below // happens in order sbp('okTurtles.eventQueue/queueEvent', 'chatroom-events', async () => { if (!this.checkEventSourceConsistency(contractID)) return @@ -939,7 +967,7 @@ export default ({ // NOTE: save messages in the browser storage, but not more than CHATROOM_MAX_ARCHIVE_ACTION_PAGES pages of events if (latestEvents.length >= CHATROOM_MAX_ARCHIVE_ACTION_PAGES * unit) { sbp('gi.db/archive/delete', this.archiveKeyFromChatRoomId(chatRoomId)) - } else if (to !== this.messageState.prevTo || from !== this.messageState.prevFrom) { + } else if (to !== this.messageState.prevTo?.hash || from !== this.messageState.prevFrom?.hash) { // this.currentChatRoomId could be wrong when the channels are switched very fast // so it's good to initiate using input parameter chatRoomId sbp('gi.db/archive/save', this.archiveKeyFromChatRoomId(chatRoomId), JSON.stringify(latestEvents)) diff --git a/frontend/views/containers/chatroom/EditChannelNameModal.vue b/frontend/views/containers/chatroom/EditChannelNameModal.vue index 9b7e8dded..9206775e9 100644 --- a/frontend/views/containers/chatroom/EditChannelNameModal.vue +++ b/frontend/views/containers/chatroom/EditChannelNameModal.vue @@ -31,7 +31,7 @@ i18n.is-success( tag='button' @click='submit' - :disabled='$v.form.$invalid' + :disabled='submitting || $v.form.$invalid' data-test='updateChannelNameSubmit' ) Save @@ -63,6 +63,7 @@ export default ({ data () { return { channelId: this.$route.query.channel, + submitting: false, form: { name: null, existingNames: [] @@ -86,6 +87,8 @@ export default ({ }, async submit () { try { + if (this.submitting) return + this.submitting = true if (this.currentChatRoomState.attributes.name === this.form.name) { // TODO: No need to update chatroom name. Display message box or toast or sth else console.log('TODO: Channel name is not changed') @@ -101,11 +104,13 @@ export default ({ } }) } + this.close() } catch (e) { console.error('RenameChannelModal submit() error:', e) this.$refs.formMsg.danger(e.message) + } finally { + this.submitting = false } - this.close() } }, validations: { diff --git a/shared/CircularList.js b/shared/CircularList.js new file mode 100644 index 000000000..62fd61f3c --- /dev/null +++ b/shared/CircularList.js @@ -0,0 +1,48 @@ +'use strict' + +// A list with fixed capacity and constant-time `add()`. +export default class CircularList { + #buffer: string[] + #capacity = 0 + #defaultValue = '' + #isFull = false + #offset = 0 + + constructor (capacity: number, defaultValue: any = '') { + this.#buffer = new Array(capacity).fill(defaultValue) + this.#capacity = capacity + this.#defaultValue = defaultValue + } + + add (entry: any) { + const capacity = this.#capacity + const offset = this.#offset + this.#buffer[offset] = entry + if (offset === capacity - 1) { + this.#isFull = true + } + this.#offset = (offset + 1) % capacity + } + + addAll (entries: Array<*>) { + for (const entry of entries) { + this.add(entry) + } + } + + clear () { + this.#buffer.fill(this.#defaultValue) + this.#isFull = false + this.#offset = 0 + } + + toArray (): Array<*> { + const buffer = this.#buffer + const offset = this.#offset + return ( + this.#isFull + ? [...buffer.slice(offset), ...buffer.slice(0, offset)] + : buffer.slice(0, offset) + ) + } +} diff --git a/shared/domains/chelonia/chelonia.js b/shared/domains/chelonia/chelonia.js index 8a6041e78..03f070b98 100644 --- a/shared/domains/chelonia/chelonia.js +++ b/shared/domains/chelonia/chelonia.js @@ -5,7 +5,6 @@ import '@sbp/okturtles.events' import sbp from '@sbp/sbp' import { handleFetchResult } from '~/frontend/controller/utils/misc.js' import { cloneDeep, difference, has, intersection, merge, randomHexString } from '~/frontend/model/contracts/shared/giLodash.js' -import { b64ToStr } from '~/shared/functions.js' import { NOTIFICATION_TYPE, createClient } from '~/shared/pubsub.js' import type { GIKey, GIOpActionUnencrypted, GIOpContract, GIOpKeyAdd, GIOpKeyDel, GIOpKeyRequest, GIOpKeyRequestSeen, GIOpKeyShare, GIOpKeyUpdate } from './GIMessage.js' import type { Key } from './crypto.js' @@ -19,7 +18,7 @@ import type { EncryptedData } from './encryptedData.js' import { isSignedData, signedIncomingData, signedOutgoingData, signedOutgoingDataWithRawKey } from './signedData.js' import './internals.js' import './files.js' -import { findForeignKeysByContractID, findKeyIdByName, findRevokedKeyIdsByName, findSuitableSecretKeyId, getContractIDfromKeyId } from './utils.js' +import { eventsAfter, findForeignKeysByContractID, findKeyIdByName, findRevokedKeyIdsByName, findSuitableSecretKeyId, getContractIDfromKeyId } from './utils.js' // TODO: define ChelContractType for /defineContract @@ -827,45 +826,52 @@ export default (sbp('sbp/selectors/register', { return state }) }, - // TODO: r.body is a stream.Transform, should we use a callback to process - // the events one-by-one instead of converting to giant json object? - // however, note if we do that they would be processed in reverse... - 'chelonia/out/eventsAfter': async function (contractID: string, since: string) { - const events = await fetch(`${this.config.connectionURL}/eventsAfter/${contractID}/${since}`, { signal: this.abortController.signal }) - .then(handleFetchResult('json')) - if (Array.isArray(events)) { - return events.reverse().map(b64ToStr) - } - }, + 'chelonia/out/eventsAfter': eventsAfter, 'chelonia/out/latestHEADInfo': function (contractID: string) { return fetch(`${this.config.connectionURL}/latestHEADinfo/${contractID}`, { cache: 'no-store', signal: this.abortController.signal }).then(handleFetchResult('json')) }, - 'chelonia/out/eventsBefore': async function (before: string, limit: number) { + 'chelonia/out/eventsBefore': function (contractID: string, beforeHeight: number, limit: number) { if (limit <= 0) { console.error('[chelonia] invalid params error: "limit" needs to be positive integer') - return - } - - const events = await fetch(`${this.config.connectionURL}/eventsBefore/${before}/${limit}`, { signal: this.abortController.signal }) - .then(handleFetchResult('json')) - if (Array.isArray(events)) { - return events.reverse().map(b64ToStr) } + const offset = Math.max(0, beforeHeight - limit + 1) + const eventsAfterLimit = Math.min(beforeHeight + 1, limit) + return sbp('chelonia/out/eventsAfter', contractID, offset, eventsAfterLimit) }, - 'chelonia/out/eventsBetween': async function (startHash: string, endHash: string, offset: number = 0) { + 'chelonia/out/eventsBetween': function (contractID: string, startHash: string, endHeight: number, offset: number = 0) { if (offset < 0) { console.error('[chelonia] invalid params error: "offset" needs to be positive integer or zero') return } - - const events = await fetch(`${this.config.connectionURL}/eventsBetween/${startHash}/${endHash}?offset=${offset}`, { signal: this.abortController.signal }) - .then(handleFetchResult('json')) - if (Array.isArray(events)) { - return events.reverse().map(b64ToStr) - } + let reader: ReadableStreamReader + return new ReadableStream({ + start: async (controller) => { + const first = await fetch(`${this.config.connectionURL}/file/${startHash}`, { signal: this.abortController.signal }).then(handleFetchResult('text')) + const deserializedHEAD = GIMessage.deserializeHEAD(first) + if (deserializedHEAD.contractID !== contractID) { + controller.error(new Error('Mismatched contract ID')) + return + } + const startOffset = Math.max(0, deserializedHEAD.head.height - offset) + const limit = endHeight - startOffset + 1 + if (limit < 1) { + controller.close() + return + } + reader = sbp('chelonia/out/eventsAfter', contractID, startOffset, limit).getReader() + }, + async pull (controller) { + const { done, value } = await reader.read() + if (done) { + controller.close() + } else { + controller.enqueue(value) + } + } + }) }, 'chelonia/rootState': function () { return sbp(this.config.stateSelector) }, 'chelonia/latestContractState': async function (contractID: string, options = { forceSync: false }) { @@ -875,22 +881,13 @@ export default (sbp('sbp/selectors/register', { if (!options.forceSync && rootState[contractID] && Object.keys(rootState[contractID]).some((x) => x !== '_volatile')) { return cloneDeep(rootState[contractID]) } - const events = await sbp('chelonia/private/out/eventsAfter', contractID, contractID) let state = Object.create(null) + const eventsStream = sbp('chelonia/out/eventsAfter', contractID, 0, undefined, contractID) + const eventsStreamReader = eventsStream.getReader() if (rootState[contractID]) state._volatile = rootState[contractID]._volatile - // fast-path - try { - for (const event of events) { - await sbp('chelonia/private/in/processMessage', GIMessage.deserialize(event, this.transientSecretKeys, state), state) - } - return state - } catch (e) { - console.warn(`[chelonia] latestContractState(${contractID}): fast-path failed due to ${e.name}: ${e.message}`, e.stack) - state = Object.create(null) - if (rootState[contractID]) state._volatile = rootState[contractID]._volatile - } - // more error-tolerant but slower due to cloning state on each message - for (const event of events) { + for (;;) { + const { value: event, done } = await eventsStreamReader.read() + if (done) return state const stateCopy = cloneDeep(state) try { await sbp('chelonia/private/in/processMessage', GIMessage.deserialize(event, this.transientSecretKeys, state), state) @@ -900,7 +897,6 @@ export default (sbp('sbp/selectors/register', { state = stateCopy } } - return state }, // 'chelonia/out' - selectors that send data out to the server 'chelonia/out/registerContract': async function (params: ChelRegParams) { diff --git a/shared/domains/chelonia/db.js b/shared/domains/chelonia/db.js index 8143a8ddf..556b92203 100644 --- a/shared/domains/chelonia/db.js +++ b/shared/domains/chelonia/db.js @@ -149,6 +149,7 @@ export default (sbp('sbp/selectors/register', { await sbp('chelonia/db/set', entry.hash(), entry.serialize()) await sbp('chelonia/db/set', getLogHead(contractID), JSON.stringify({ HEAD: entry.hash(), height: entry.height() })) console.debug(`[chelonia.db] HEAD for ${contractID} updated to:`, entry.hash()) + await sbp('chelonia/db/set', `_private_hidx=${contractID}#${entryHeight}`, entry.hash()) return entry.hash() } catch (e) { if (e.name.includes('ErrorDB')) { diff --git a/shared/domains/chelonia/internals.js b/shared/domains/chelonia/internals.js index 7a897cbb4..abfe6c955 100644 --- a/shared/domains/chelonia/internals.js +++ b/shared/domains/chelonia/internals.js @@ -3,7 +3,7 @@ import sbp, { domainFromSelector } from '@sbp/sbp' import { handleFetchResult } from '~/frontend/controller/utils/misc.js' import { cloneDeep, debounce, delay, has, pick, randomIntFromRange } from '~/frontend/model/contracts/shared/giLodash.js' -import { b64ToStr, createCID } from '~/shared/functions.js' +import { createCID } from '~/shared/functions.js' import type { GIKey, GIOpActionEncrypted, GIOpActionUnencrypted, GIOpAtomic, GIOpContract, GIOpKeyAdd, GIOpKeyDel, GIOpKeyRequest, GIOpKeyRequestSeen, GIOpKeyShare, GIOpKeyUpdate, GIOpPropSet, GIOpType, ProtoGIOpKeyRequestSeen, ProtoGIOpKeyShare } from './GIMessage.js' import { GIMessage } from './GIMessage.js' import { INVITE_STATUS } from './constants.js' @@ -541,16 +541,6 @@ export default (sbp('sbp/selectors/register', { signal: this.abortController.signal }).then(handleFetchResult('json')) }, - // TODO: r.body is a stream.Transform, should we use a callback to process - // the events one-by-one instead of converting to giant json object? - // however, note if we do that they would be processed in reverse... - 'chelonia/private/out/eventsAfter': async function (contractID: string, since: string) { - const events = await fetch(`${this.config.connectionURL}/eventsAfter/${contractID}/${since}`, { signal: this.abortController.signal }) - .then(handleFetchResult('json')) - if (Array.isArray(events)) { - return events.reverse().map(b64ToStr) - } - }, 'chelonia/private/postKeyShare': function (contractID, previousVolatileState, signingKey) { const cheloniaState = sbp(this.config.stateSelector) const targetState = cheloniaState[contractID] @@ -621,9 +611,13 @@ export default (sbp('sbp/selectors/register', { keyAdditionProcessor.call(self, hash, v.keys, state, contractID, signingKey, internalSideEffectStack) }, [GIMessage.OP_ACTION_ENCRYPTED] (v: GIOpActionEncrypted) { - if (!config.skipActionProcessing) { - opFns[GIMessage.OP_ACTION_UNENCRYPTED](v.valueOf()) + if (config.skipActionProcessing) { + if (process.env.BUILD === 'web') { + console.log('OP_ACTION_ENCRYPTED: skipped action processing') + } + return } + opFns[GIMessage.OP_ACTION_UNENCRYPTED](v.valueOf()) }, [GIMessage.OP_ACTION_UNENCRYPTED] (v: GIOpActionUnencrypted) { if (!config.skipActionProcessing) { @@ -1099,10 +1093,10 @@ export default (sbp('sbp/selectors/register', { this.config.reactiveSet(state, contractID, Object.create(null)) this.config.reactiveSet(state[contractID], '_volatile', currentVolatileState) } - const { HEAD: latest } = await sbp('chelonia/out/latestHEADInfo', contractID) - console.debug(`[chelonia] syncContract: ${contractID} latestHash is: ${latest}`) + const { HEAD: latestHEAD } = await sbp('chelonia/out/latestHEADInfo', contractID) + console.debug(`[chelonia] syncContract: ${contractID} latestHash is: ${latestHEAD}`) // there is a chance two users are logged in to the same machine and must check their contracts before syncing - const recent = state.contracts[contractID]?.HEAD + const { HEAD: recentHEAD, height: recentHeight } = state.contracts[contractID] || {} const isSubcribed = this.subscriptionSet.has(contractID) if (isSubcribed) { if (params?.deferredRemove) { @@ -1122,29 +1116,32 @@ export default (sbp('sbp/selectors/register', { this.currentSyncs[contractID] = { firstSync: !state.contracts[contractID] } this.postSyncOperations[contractID] = this.postSyncOperations[contractID] ?? Object.create(null) try { - if (latest !== recent) { - console.debug(`[chelonia] Synchronizing Contract ${contractID}: our recent was ${recent || 'undefined'} but the latest is ${latest}`) + if (latestHEAD !== recentHEAD) { + console.debug(`[chelonia] Synchronizing Contract ${contractID}: our recent was ${recentHEAD || 'undefined'} but the latest is ${latestHEAD}`) // TODO: fetch events from localStorage instead of server if we have them - const events = await sbp('chelonia/out/eventsAfter', contractID, recent || contractID) + const eventsStream = sbp('chelonia/out/eventsAfter', contractID, recentHeight ?? 0, undefined, recentHEAD ?? contractID) // Sanity check: verify event with latest hash exists in list of events // TODO: using findLastIndex, it will be more clean but it needs Cypress 9.7+ which has bad performance // https://docs.cypress.io/guides/references/changelog#9-7-0 // https://github.com/cypress-io/cypress/issues/22868 let latestHashFound = false - for (let i = events.length - 1; i >= 0; i--) { - if (GIMessage.deserializeHEAD(events[i]).hash === latest) { - latestHashFound = true + // state.contracts[contractID] && events.shift() + const eventReader = eventsStream.getReader() + // remove the first element in cases where we are not getting the contract for the first time + for (let skip = !!state.contracts[contractID]; ; skip = false) { + const { done, value: event } = await eventReader.read() + if (done) { + if (!latestHashFound) { + throw new ChelErrorUnrecoverable(`expected hash ${latestHEAD} in list of events for contract ${contractID}`) + } break } - } - if (!latestHashFound) { - throw new ChelErrorUnrecoverable(`expected hash ${latest} in list of events for contract ${contractID}`) - } - // remove the first element in cases where we are not getting the contract for the first time - state.contracts[contractID] && events.shift() - for (let i = 0; i < events.length; i++) { + if (!latestHashFound) { + latestHashFound = GIMessage.deserializeHEAD(event).hash === latestHEAD + } + if (skip) continue // this must be called directly, instead of via enqueueHandleEvent - await sbp('chelonia/private/in/handleEvent', contractID, events[i]) + await sbp('chelonia/private/in/handleEvent', contractID, event) } } else if (!isSubcribed) { this.subscriptionSet.add(contractID) diff --git a/shared/domains/chelonia/utils.js b/shared/domains/chelonia/utils.js index 7743af0c7..6b880ce5c 100644 --- a/shared/domains/chelonia/utils.js +++ b/shared/domains/chelonia/utils.js @@ -1,16 +1,19 @@ import sbp from '@sbp/sbp' import { has } from '~/frontend/model/contracts/shared/giLodash.js' +import { b64ToStr } from '~/shared/functions.js' import type { GIKey, GIKeyPurpose, GIKeyUpdate, GIOpActionUnencrypted, GIOpAtomic, GIOpKeyAdd, GIOpKeyUpdate, GIOpValue, ProtoGIOpActionUnencrypted } from './GIMessage.js' import { GIMessage } from './GIMessage.js' import { INVITE_STATUS } from './constants.js' import { deserializeKey, serializeKey } from './crypto.js' import type { EncryptedData } from './encryptedData.js' import { unwrapMaybeEncryptedData } from './encryptedData.js' -import { CONTRACT_IS_PENDING_KEY_REQUESTS } from './events.js' import { ChelErrorWarning } from './errors.js' +import { CONTRACT_IS_PENDING_KEY_REQUESTS } from './events.js' import type { SignedData } from './signedData.js' import { isSignedData } from './signedData.js' +const MAX_EVENTS_AFTER = Number.parseInt(process.env.MAX_EVENTS_AFTER, 10) || Infinity + export const findKeyIdByName = (state: Object, name: string): ?string => state._vm?.authorizedKeys && ((Object.values((state._vm.authorizedKeys: any)): any): GIKey[]).find((k) => k.name === name && k._notAfterHeight == null)?.id export const findForeignKeysByContractID = (state: Object, contractID: string): ?string[] => state._vm?.authorizedKeys && ((Object.values((state._vm.authorizedKeys: any)): any): GIKey[]).filter((k) => k._notAfterHeight == null && k.foreignKey?.includes(contractID)).map(k => k.id) @@ -530,3 +533,175 @@ export const getContractIDfromKeyId = (contractID: string, signingKeyId: ?string ? new URL(state._vm.authorizedKeys[signingKeyId].foreignKey).pathname : contractID } + +export function eventsAfter (contractID: string, sinceHeight: number, limit?: number, sinceHash?: string): ReadableStream { + const fetchEventsStreamReader = async () => { + requestLimit = Math.min(limit ?? MAX_EVENTS_AFTER, remainingEvents) + const eventsResponse = await fetch(`${this.config.connectionURL}/eventsAfter/${contractID}/${sinceHeight}${Number.isInteger(requestLimit) ? `/${requestLimit}` : ''}`, { signal }) + if (!eventsResponse.ok) throw new Error('Unexpected status code') + if (!eventsResponse.body) throw new Error('Missing body') + latestHeight = parseInt(eventsResponse.headers.get('shelter-headinfo-height'), 10) + if (!Number.isSafeInteger(latestHeight)) throw new Error('Invalid latest height') + requestCount++ + // $FlowFixMe[incompatible-use] + return eventsResponse.body.getReader() + } + if (!Number.isSafeInteger(sinceHeight) || sinceHeight < 0) { + throw new TypeError('Invalid since height value. Expected positive integer.') + } + const signal = this.abortController.signal + let requestCount = 0 + let remainingEvents = limit ?? Number.POSITIVE_INFINITY + let eventsStreamReader + let latestHeight + let state: 'fetch' | 'read-eos' | 'read-new-response' | 'read' | 'events' = 'fetch' + let requestLimit: number + let count: number + let buffer: string = '' + let currentEvent: string + // return ReadableStream with a custom pull function to handle streamed data + return new ReadableStream({ + // The pull function is called whenever the internal buffer of the stream + // becomes empty and needs more data. + async pull (controller) { + for (;;) { + // Handle different states of the stream reading process. + switch (state) { + // When in 'fetch' state, initiate a new fetch request to obtain a + // stream reader for events. + case 'fetch': { + eventsStreamReader = await fetchEventsStreamReader() + // Transition to reading the new response and reset the processed + // events counter + state = 'read-new-response' + count = 0 + break + } + case 'read-eos': // End of stream case + case 'read-new-response': // Just started reading a new response + case 'read': { // Reading from the response stream + const { done, value } = await eventsStreamReader.read() + // If done, determine if the stream should close or fetch more + // data by making a new request + if (done) { + // No more events to process or reached the latest event + if (remainingEvents === 0 || sinceHeight === latestHeight) { + controller.close() + return + } else if (state === 'read-new-response' || buffer) { + // If done prematurely, throw an error + controller.error(new Error('Invalid response: done too early')) + return + } else { + // If there are still events to fetch, switch state to fetch + state = 'fetch' + break + } + } + if (!value) { + // If there's no value (e.g., empty response), throw an error + controller.error(new Error('Invalid response: missing body')) + return + } + // Concatenate new data to the buffer, trimming any + // leading/trailing whitespace (the response is a JSON array of + // base64-encoded data, meaning that whitespace is not significant) + buffer = buffer + Buffer.from(value).toString().trim() + // If there was only whitespace, try reading again + if (!buffer) break + if (state === 'read-new-response') { + // Response is in JSON format, so we look for the start of an + // array (`[`) + if (buffer[0] !== '[') { + controller.error(new Error('Invalid response: no array start delimiter')) + return + } + // Trim the array start delimiter from the buffer + buffer = buffer.slice(1) + } else if (state === 'read-eos') { + // If in 'read-eos' state and still reading data, it's an error + // because the response isn't valid JSON (there should be + // nothing other than whitespace after `]`) + controller.error(new Error('Invalid data at the end of response')) + return + } + // If not handling new response or end-of-stream, switch to + // processing events + state = 'events' + break + } + case 'events': { + // Process events by looking for a comma or closing bracket that + // indicates the end of an event + const nextIdx = buffer.search(/(?<=\s*)[,\]]/) + // If the end of the event isn't found, go back to reading more + // data + if (nextIdx < 0) { + state = 'read' + break + } + let enqueued = false + try { + // Extract the current event's value and trim whitespace + const eventValue = buffer.slice(0, nextIdx).trim() + if (eventValue) { + // Check if the event limit is reached; if so, throw an error + if (count === requestLimit) { + controller.error(new Error('Received too many events')) + return + } + currentEvent = b64ToStr(JSON.parse(eventValue)) + if (count === 0) { + const hash = GIMessage.deserializeHEAD(currentEvent).hash + const height = GIMessage.deserializeHEAD(currentEvent).head.height + if (height !== sinceHeight || (sinceHash && sinceHash !== hash)) { + controller.error(new Error('hash() !== since')) + return + } + } + // If this is the first event in a second or later request, + // drop the event because it's already been included in + // a previous response + if (count++ !== 0 || requestCount !== 0) { + controller.enqueue(currentEvent) + enqueued = true + remainingEvents-- + } + } + // If the stream is finished (indicated by a closing bracket), + // update `since` (to make the next request if needed) and + // switch to 'read-eos'. + if (buffer[nextIdx] === ']') { + if (currentEvent) { + const deserialized = GIMessage.deserializeHEAD(currentEvent) + sinceHeight = deserialized.head.height + sinceHash = deserialized.hash + } + state = 'read-eos' + // This should be an empty string now + buffer = buffer.slice(nextIdx + 1).trim() + } else if (currentEvent) { + // Otherwise, move the buffer pointer to the next event + buffer = buffer.slice(nextIdx + 1).trimStart() + } else { + // If the end delimiter (`]`) is missing, throw an error + controller.error(new Error('Missing end delimiter')) + return + } + // If an event was successfully enqueued, exit the loop to wait + // for the next pull request + if (enqueued) { + return + } + } catch (e) { + console.error('[chelonia] Error during event parsing', e) + controller.error(e) + return + } + break + } + } + } + } + }) +}