Skip to content

Commit

Permalink
Simplify streamEventsAfter
Browse files Browse the repository at this point in the history
  • Loading branch information
snowteamer committed Feb 8, 2024
1 parent 575e97c commit b7c43dc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 40 deletions.
63 changes: 23 additions & 40 deletions backend/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import path from 'node:path'
import '@sbp/okturtles.data'
import { checkKey, parsePrefixableKey, prefixHandlers } from '~/shared/domains/chelonia/db.js'
import LRU from 'lru-cache'
import CircularList from '../shared/CircularList.js'

const Boom = require('@hapi/boom')

const MAX_EVENTS_AFTER = Number.parseInt(process.env.MAX_EVENTS_AFTER, 10)
const MAX_EVENTS_AFTER = Number.parseInt(process.env.MAX_EVENTS_AFTER, 10) || Infinity
const production = process.env.NODE_ENV === 'production'
// Defaults to `fs` in production.
const persistence = process.env.GI_PERSIST || (production ? 'fs' : undefined)
Expand Down Expand Up @@ -45,58 +44,42 @@ sbp('sbp/selectors/register', {
if (!latestHEADinfo) {
throw Boom.notFound(`contractID ${contractID} doesn't exist!`)
}
let { HEAD: currentHEAD } = latestHEADinfo
const entries = []
// Number of entries pushed.
let counter = 0
let currentHash = hash
let prefix = '['
if (MAX_EVENTS_AFTER) {
const circularList = new CircularList(MAX_EVENTS_AFTER, undefined)
while (currentHEAD !== hash) {
const entry = await sbp('chelonia/db/getEntry', currentHEAD)
currentHEAD = entry.message().previousHEAD
circularList.add(entry)
}
const entry = await sbp('chelonia/db/getEntry', currentHEAD)
circularList.add(entry)

const entries = circularList.toArray()
let i = 0
return new Readable({
read (): void {
try {
const entry = entries[i++]
if (entry) {
const json = `"${strToB64(entry.serialize())}"`
this.push(prefix + json)
prefix = ','
} else {
this.push(prefix === ',' ? ']' : '[]')
this.push(null)
}
} catch (e) {
console.error(`read(): ${e.message}:`, e)
this.push(prefix === ',' ? ']' : '[]')
this.push(null)
}
try {
while (currentHash && (entries.length < MAX_EVENTS_AFTER)) {
const entry = await sbp('chelonia/db/getEntry', currentHash)
if (entry) {
entries.push(entry)
currentHash = await sbp('chelonia/db/get', `next=${currentHash}`)
}
})
}
} catch (e) {
console.error(`read(): ${e.message}:`, e)
}
entries.reverse()
// NOTE: if this ever stops working you can also try Readable.from():
// https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options
return new Readable({
async read (): Promise<void> {
read (): void {
try {
const entry = await sbp('chelonia/db/getEntry', currentHEAD)
const json = `"${strToB64(entry.serialize())}"`
if (currentHEAD !== hash) {
const entry = entries[counter]
if (entry) {
const json = `"${strToB64(entry.serialize())}"`
this.push(prefix + json)
currentHEAD = entry.head().previousHEAD
prefix = ','
counter++
} else {
this.push(prefix + json + ']')
this.push(prefix === ',' ? ']' : '[]')
this.push(null)
console.debug(`streamEntriesAfter: ${counter} entries pushed`)
}
} catch (e) {
console.error(`read(): ${e.message}:`, e)
this.push(']')
this.push(prefix === ',' ? ']' : '[]')
this.push(null)
}
}
Expand Down
4 changes: 4 additions & 0 deletions shared/domains/chelonia/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ export default (sbp('sbp/selectors/register', {
await sbp('chelonia/db/set', entry.hash(), entry.serialize())
await sbp('chelonia/db/set', getLogHead(contractID), JSON.stringify({ HEAD: entry.hash(), height: entry.height() }))
console.debug(`[chelonia.db] HEAD for ${contractID} updated to:`, entry.hash())
if (entryPreviousHEAD) {
await sbp('chelonia/db/set', `next=${entryPreviousHEAD}`, entry.hash())
console.debug(`[chelonia.db] next hash for ${entryPreviousHEAD} updated to:`, entry.hash())
}
return entry.hash()
} catch (e) {
if (e.name.includes('ErrorDB')) {
Expand Down

0 comments on commit b7c43dc

Please sign in to comment.