Skip to content

Commit

Permalink
Limit events after (#1876)
Browse files Browse the repository at this point in the history
* Add limit to eventsAfter

* Minor cleanup

* Convert circularClassList.js into a class

* Simplify streamEventsAfter

* Remove a bit of debugging in addEntry

* Handle read() error in streamEventsAfter

* No longer reverse entries in streamEventsAfter

* fix the double dialog box issue

* Remove pepper, read secret from file and minimise the number of secrets

* Signature verification

* Remove groupContractID in chatroom contract attributes (#1847)

* feat: removed groupContractID in chatroom attributes

* fix: recovered package.json

* chore: remove unnecessary getter

* chore: updated manifest

* Fix error to close CreatePoll modal by clicking sidebar (#1849)

* fix: error to close CreatePoll modal by clicking sidebar

* chore: added comment for better understanding

* chore: fixed typo

* fix: make notification after identity contract is fully synced (#1848)

* #1851 - Fix the broken VoterAvatars.vue component (#1852)

* add a fix for Prompt.vue not being closed properly

* fix the broken VoterAvatars.vue

* Replace checks to use _private instead of _private_

* Remove notifications while leaving group (#1859)

* fix: error in removing notifications after leaving group

* chore: random commit for running travis again

* Encrypted files (#1858)

* WIP upload files

* Files uploading (wip)

* Files uploading, HAPI integration (wip)

* File upload and download

* Encrypted files

* Changes to dowload API for ergonomics and manifest processing

* Avatar types

* File caching

* Documentation

* Update avatar types in Message* classes

* Fix flow errors

* Remove streams check for Node.js

* Fix tests & feedback

* Fix tests & feedback (again)

* Fix tests for Node 18

* Fix test

Co-authored-by: Greg Slepak <[email protected]>

---------

Co-authored-by: Greg Slepak <[email protected]>

* Fix streams support

* Fix error in getting updated with the user's profile after rejoining group (#1864)

* fix: sharing new PEKs

* chore: grunt pin:0.2.5

* chore: reverted grunt pin and removed manifest.json

---------

Co-authored-by: Greg Slepak <[email protected]>

* #1571 - Create vuex module for chat (#1854)

* add chatroom/vuexModule.js with some boilerplate code in there

* fix a linter error

* move chat&DM related getters in state.js to the chatroom vuexModule

* move all the chatroom-relateed mutations to the vuexModules

* add chatroom vuexModule to store in state.js

* a small bug-fix for re-login process

* typo fix for the cypress failure

* remove postUpgradeVerification code (resetting groups anyway)

---------

Co-authored-by: Greg Slepak <[email protected]>

* Fix issue #1857

* Fix incorrect badge while switching group (#1863)

* fix: incorrect badge from direct messages

* chore: grunt pin:0.2.4

* chore: reverted version 0.2.4

* chore: reverted the grunt pin:0.2.3 too

* chore: removed manifest.json

* fix the overflow bug

* Fix error to display profile picture for DMs with more than 2 people (#1869)

* fix: error to select avatar for direct messages

* chore: follow-up process for changes of getters

* fix: error in getting direct messages by groupID

* fix: syntax error

* 32 logging (#1870)

* pino enabled, still need to fix logging behavior though

* fix flow errors

* pino support seems to be working

* one minor fix for %s strings. Closes #32

* fix some error logging in pubsub + remove hapi-pino

* Convert eventsAfter to stream (missing: change usage in ChatMain)

* API improvements

* Endpoint consolidation

* Fix subscriptionSet issues

* Fix chatroom race condition

* Log instead of throw

* Cleaning up

* Linting

---------

Co-authored-by: snowteamer <[email protected]>
Co-authored-by: SebinSong <[email protected]>
Co-authored-by: Alex Jin <[email protected]>
Co-authored-by: Greg Slepak <[email protected]>
  • Loading branch information
5 people authored Mar 14, 2024
1 parent d7aa2d7 commit ef7f542
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 276 deletions.
4 changes: 3 additions & 1 deletion Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const packageJSON = require('./package.json')
const {
CI = '',
LIGHTWEIGHT_CLIENT = 'true',
MAX_EVENTS_AFTER = '',
NODE_ENV = 'development',
EXPOSE_SBP = '',
ENABLE_UNSAFE_NULL_CRYPTO = 'false'
Expand Down Expand Up @@ -217,6 +218,7 @@ module.exports = (grunt) => {
'process.env.CONTRACTS_VERSION': `'${CONTRACTS_VERSION}'`,
'process.env.GI_VERSION': `'${GI_VERSION}'`,
'process.env.LIGHTWEIGHT_CLIENT': `'${LIGHTWEIGHT_CLIENT}'`,
'process.env.MAX_EVENTS_AFTER': `'${MAX_EVENTS_AFTER}'`,
'process.env.NODE_ENV': `'${NODE_ENV}'`,
'process.env.EXPOSE_SBP': `'${EXPOSE_SBP}'`,
'process.env.ENABLE_UNSAFE_NULL_CRYPTO': `'${ENABLE_UNSAFE_NULL_CRYPTO}'`
Expand Down Expand Up @@ -697,7 +699,7 @@ module.exports = (grunt) => {
})

process.on('unhandledRejection', (reason, p) => {
console.error('[gruntfile] Unhandled promise rejection:', p, 'reason:', reason)
console.error('[gruntfile] Unhandled promise rejection:', p, 'reason:', reason.message, reason.stack)
process.exit(1)
})
}
117 changes: 30 additions & 87 deletions backend/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,107 +37,50 @@ if (!fs.existsSync(dataFolder)) {
fs.mkdirSync(dataFolder, { mode: 0o750 })
}

// Streams stored contract log entries since the given entry hash (inclusive!).
sbp('sbp/selectors/register', {
'backend/db/streamEntriesAfter': async function (contractID: string, hash: string): Promise<*> {
'backend/db/streamEntriesAfter': async function (contractID: string, height: string, requestedLimit: ?number): Promise<*> {
const limit = Math.min(requestedLimit ?? Number.POSITIVE_INFINITY, process.env.MAX_EVENTS_BATCH_SIZE ?? 500)
const latestHEADinfo = await sbp('chelonia/db/latestHEADinfo', contractID)
if (!latestHEADinfo) {
throw Boom.notFound(`contractID ${contractID} doesn't exist!`)
}
let { HEAD: currentHEAD } = latestHEADinfo
// Number of entries pushed.
let counter = 0
let currentHash = await sbp('chelonia/db/get', `_private_hidx=${contractID}#${height}`)
let prefix = '['
// NOTE: if this ever stops working you can also try Readable.from():
// https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options
return new Readable({
async read (): any {
try {
const entry = await sbp('chelonia/db/getEntry', currentHEAD)
const json = `"${strToB64(entry.serialize())}"`
if (currentHEAD !== hash) {
this.push(prefix + json)
currentHEAD = entry.head().previousHEAD
prefix = ','
} else {
this.push(prefix + json + ']')
const stream = new Readable({
read (): void {
if (currentHash && counter < limit) {
sbp('chelonia/db/getEntry', currentHash).then(async entry => {
if (entry) {
this.push(`${prefix}"${strToB64(entry.serialize())}"`)
prefix = ','
counter++
currentHash = await sbp('chelonia/db/get', `_private_hidx=${contractID}#${entry.height() + 1}`)
} else {
this.push(counter > 0 ? ']' : '[]')
this.push(null)
}
}).catch(e => {
console.error(`[backend] streamEntriesAfter: read(): ${e.message}:`, e)
this.push(counter > 0 ? ']' : '[]')
this.push(null)
}
} catch (e) {
console.error(e, `read(): ${e.message}`)
this.push(']')
})
} else {
this.push(counter > 0 ? ']' : '[]')
this.push(null)
}
}
})
},
'backend/db/streamEntriesBefore': async function (before: string, limit: number): Promise<*> {
let prefix = '['
let currentHEAD = before
let entry = await sbp('chelonia/db/getEntry', currentHEAD)
if (!entry) {
throw Boom.notFound(`entry ${currentHEAD} doesn't exist!`)
// $FlowFixMe[prop-missing]
stream.headers = {
'shelter-headinfo-head': latestHEADinfo.HEAD,
'shelter-headinfo-height': latestHEADinfo.height
}
limit++ // to return `before` apart from the `limit` number of events
// NOTE: if this ever stops working you can also try Readable.from():
// https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options
return new Readable({
async read (): any {
try {
if (!currentHEAD || !limit) {
this.push(']')
this.push(null)
} else {
entry = await sbp('chelonia/db/getEntry', currentHEAD)
const json = `"${strToB64(entry.serialize())}"`
this.push(prefix + json)
prefix = ','
limit--
currentHEAD = entry.head().previousHEAD
}
} catch (e) {
// TODO: properly return an error to caller, see https://nodejs.org/api/stream.html#errors-while-reading
console.error(e, `read(): ${e.message}:`)
this.push(']')
this.push(null)
}
}
})
},
'backend/db/streamEntriesBetween': async function (startHash: string, endHash: string, offset: number): Promise<*> {
let prefix = '['
let isMet = false
let currentHEAD = endHash
let entry = await sbp('chelonia/db/getEntry', currentHEAD)
if (!entry) {
throw Boom.notFound(`entry ${currentHEAD} doesn't exist!`)
}
// NOTE: if this ever stops working you can also try Readable.from():
// https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options
return new Readable({
async read (): any {
try {
entry = await sbp('chelonia/db/getEntry', currentHEAD)
const json = `"${strToB64(entry.serialize())}"`
this.push(prefix + json)
prefix = ','

if (currentHEAD === startHash) {
isMet = true
} else if (isMet) {
offset--
}

currentHEAD = entry.head().previousHEAD
if (!currentHEAD || (isMet && !offset)) {
this.push(']')
this.push(null)
}
} catch (e) {
// TODO: properly return an error to caller, see https://nodejs.org/api/stream.html#errors-while-reading
console.error(e, `read(): ${e.message}:`)
this.push(']')
this.push(null)
}
}
})
return stream
},
// =======================
// wrapper methods to add / lookup names
Expand Down
42 changes: 3 additions & 39 deletions backend/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ route.POST('/event', {
}
})

route.GET('/eventsAfter/{contractID}/{since}', {}, async function (request, h) {
const { contractID, since } = request.params
route.GET('/eventsAfter/{contractID}/{since}/{limit?}', {}, async function (request, h) {
const { contractID, since, limit } = request.params
try {
if (contractID.startsWith('_private') || since.startsWith('_private')) {
return Boom.notFound()
}

const stream = await sbp('backend/db/streamEntriesAfter', contractID, since)
const stream = await sbp('backend/db/streamEntriesAfter', contractID, since, limit)
// "On an HTTP server, make sure to manually close your streams if a request is aborted."
// From: http://knexjs.org/#Interfaces-Streams
// https://github.com/tgriesser/knex/wiki/Manually-Closing-Streams
Expand All @@ -107,42 +107,6 @@ route.GET('/eventsAfter/{contractID}/{since}', {}, async function (request, h) {
}
})

route.GET('/eventsBefore/{before}/{limit}', {}, async function (request, h) {
const { before, limit } = request.params
try {
if (!before) return Boom.badRequest('missing before')
if (!limit) return Boom.badRequest('missing limit')
if (isNaN(parseInt(limit)) || parseInt(limit) <= 0) return Boom.badRequest('invalid limit')
if (before.startsWith('_private')) return Boom.notFound()

const stream = await sbp('backend/db/streamEntriesBefore', before, parseInt(limit))
request.events.once('disconnect', stream.destroy.bind(stream))
return stream
} catch (err) {
logger.error(err, `GET /eventsBefore/${before}/${limit}`, err.message)
return err
}
})

route.GET('/eventsBetween/{startHash}/{endHash}', {}, async function (request, h) {
const { startHash, endHash } = request.params
try {
const offset = parseInt(request.query.offset || '0')

if (!startHash) return Boom.badRequest('missing startHash')
if (!endHash) return Boom.badRequest('missing endHash')
if (isNaN(offset) || offset < 0) return Boom.badRequest('invalid offset')
if (startHash.startsWith('_private') || endHash.startsWith('_private')) return Boom.notFound()

const stream = await sbp('backend/db/streamEntriesBetween', startHash, endHash, offset)
request.events.once('disconnect', stream.destroy.bind(stream))
return stream
} catch (err) {
logger.error(err, `GET /eventsBetwene/${startHash}/${endHash}`, err.message)
return err
}
})

/*
// The following endpoint is disabled because name registrations are handled
// through the `shelter-namespace-registration` header when registering a
Expand Down
4 changes: 2 additions & 2 deletions frontend/model/captureLogs.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sbp from '@sbp/sbp'
import { CAPTURED_LOGS, SET_APP_LOGS_FILTER } from '~/frontend/utils/events.js'
import { createCircularList } from '~/frontend/utils/circularList.js'
import CircularList from '~/shared/CircularList.js'

/*
- giConsole/[username]/entries - the stored log entries.
Expand Down Expand Up @@ -28,7 +28,7 @@ const setItem = (key: string, value: any): void => {
}

function createLogger (config: Object): Object {
const entries = createCircularList(config.maxEntries)
const entries = new CircularList(config.maxEntries)
const methods = loggingLevels.reduce(
(acc, name) => {
acc[name] = (...args) => {
Expand Down
38 changes: 0 additions & 38 deletions frontend/utils/circularList.js

This file was deleted.

Loading

0 comments on commit ef7f542

Please sign in to comment.