Skip to content

Commit

Permalink
improve performance of concurrent actions, don't fetch when already f…
Browse files Browse the repository at this point in the history
…etching, reduce amount of data fetched, logging
  • Loading branch information
dmonad committed Feb 7, 2020
1 parent f1864aa commit 144fb94
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 41 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"types": "./dist/src/y-redis.d.ts",
"scripts": {
"dist": "rm -rf dist && rollup -c && tsc",
"test": "rollup -c && node ./dist/test.js",
"test": "rollup -c && LOG=y-redis node ./dist/test.cjs",
"debug": "rollup -c && LOG=y-redis node --unhandled-rejections=strict --inspect-brk ./dist/test.cjs",
"lint": "standard && tsc",
"preversion": "npm run lint && npm test && npm run dist"
},
Expand Down
2 changes: 1 addition & 1 deletion rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const paths = path => {
export default [{
input: './tests/index.js',
output: {
file: './dist/test.js',
file: './dist/test.cjs',
format: 'cjs',
sourcemap: true,
paths
Expand Down
58 changes: 38 additions & 20 deletions src/y-redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,11 @@ import * as Y from 'yjs'
import * as mutex from 'lib0/mutex.js'
import { Observable } from 'lib0/observable.js'
import * as promise from 'lib0/promise.js'
import * as number from 'lib0/number.js'
import * as error from 'lib0/error.js'
import * as logging from 'lib0/logging.js'
import Redis from 'ioredis'

/**
* @param {RedisPersistence} rp
*/
const getUpdates = rp => {
rp.docs.forEach(doc =>
doc.getUpdates()
)
}
const logger = logging.createModuleLogger('y-redis')

/**
* Handles persistence of a sinle doc.
Expand All @@ -34,6 +27,7 @@ export class PersistenceDoc {
* @type {number}
*/
this._clock = 0
this._fetchingClock = 0
/**
* @param {Uint8Array} update
*/
Expand All @@ -43,6 +37,9 @@ export class PersistenceDoc {
rp.redis.rpushBuffer(name + ':updates', Buffer.from(update)).then(len => {
if (len === this._clock + 1) {
this._clock++
if (this._fetchingClock < this._clock) {
this._fetchingClock = this._clock
}
}
// @ts-ignore
rp.redis.publish(this.name, len.toString())
Expand Down Expand Up @@ -73,6 +70,7 @@ export class PersistenceDoc {
getUpdates () {
const startClock = this._clock
return this.rp.redis.lrangeBuffer(this.name + ':updates', startClock, -1).then(/** @type {function(Array<Buffer>)} */ updates => {
logger('Fetched ', logging.BOLD, logging.PURPLE, (updates.length).toString().padEnd(2), logging.UNBOLD, logging.UNCOLOR, ' updates')
this.mux(() => {
this.doc.transact(() => {
updates.forEach(update => {
Expand All @@ -82,9 +80,23 @@ export class PersistenceDoc {
if (this._clock < nextClock) {
this._clock = nextClock
}
if (this._fetchingClock < this._clock) {
this._fetchingClock = this._clock
}
})
})
return this
if (this._fetchingClock <= this._clock) {
return this
} else {
// there is still something missing. new updates came in. fetch again.
if (updates.length === 0) {
// Calling getUpdates recursively has the potential to be an infinite fetch-call.
// In case no new updates came in, reset _fetching clock (in case the pubsub lied / send an invalid message).
// Being overly protective here..
this._fetchingClock = this._clock
}
return this.getUpdates()
}
})
}
}
Expand Down Expand Up @@ -115,18 +127,20 @@ export class RedisPersistence extends Observable {
* @type {Map<string,PersistenceDoc>}
*/
this.docs = new Map()
if (/** @type {any} */ (this.redis).status === 'ready') {
getUpdates(this)
}
this.redis.on('ready', () => {
getUpdates(this)
})
this.sub.on('message', (channel, sclock) => {
// console.log('message', channel, sclock)
const pdoc = this.docs.get(channel)
if (pdoc) {
const clock = Number(sclock)
if (pdoc._clock < clock || number.isNaN(clock)) {
pdoc.getUpdates()
const clock = Number(sclock) || Number.POSITIVE_INFINITY // case of null
if (pdoc._fetchingClock < clock) {
// do not query doc updates if this document is currently already fetching
const isCurrentlyFetching = pdoc._fetchingClock !== pdoc._clock
if (pdoc._fetchingClock < clock) {
pdoc._fetchingClock = clock
}
if (!isCurrentlyFetching) {
pdoc.getUpdates()
}
}
} else {
this.sub.unsubscribe(channel)
Expand All @@ -141,7 +155,7 @@ export class RedisPersistence extends Observable {
*/
bindState (name, ydoc) {
if (this.docs.has(name)) {
throw error.create('This document name is already bound to this RedisPersistence instance')
throw error.create(`"${name}" is already bound to this RedisPersistence instance`)
}
const pd = new PersistenceDoc(this, name, ydoc)
this.docs.set(name, pd)
Expand All @@ -154,6 +168,10 @@ export class RedisPersistence extends Observable {
return promise.all(Array.from(docs.values()).map(doc => doc.destroy())).then(() => {
this.redis.quit()
this.sub.quit()
// @ts-ignore
this.redis = null
// @ts-ignore
this.sub = null
})
}

Expand Down
139 changes: 120 additions & 19 deletions tests/y-redis.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ export const testPubsub = async tc => {

const redisPersistence1 = new RedisPersistence()
const doc1 = new Y.Doc()
const persistedDoc1 = redisPersistence1.bindState('test', doc1)
const persistedDoc1 = redisPersistence1.bindState(tc.testName, doc1)
await persistedDoc1.synced

const redisPersistence2 = new RedisPersistence()
const doc2 = new Y.Doc()
const persistedDoc2 = redisPersistence2.bindState('test', doc2)
const persistedDoc2 = redisPersistence2.bindState(tc.testName, doc2)
await persistedDoc2.synced

doc1.getArray('test').push([1])
Expand All @@ -32,8 +32,8 @@ export const testPubsub = async tc => {
t.assert(doc1.getArray('test').length === 2)
t.assert(doc2.getArray('test').length === 2)

redisPersistence1.destroy()
redisPersistence2.destroy()
await redisPersistence1.destroy()
await redisPersistence2.destroy()
}

/**
Expand All @@ -45,17 +45,17 @@ export const testStoreAndReload = async tc => {
{
const redisPersistence = new RedisPersistence()
const doc = new Y.Doc()
await redisPersistence.bindState('test', doc).synced
await redisPersistence.bindState(tc.testName, doc).synced
doc.getArray('test').push([1])
await promise.wait(50)
redisPersistence.destroy()
}
{
const redisPersistence = new RedisPersistence()
const doc = new Y.Doc()
await redisPersistence.bindState('test', doc).synced
await redisPersistence.bindState(tc.testName, doc).synced
t.assert(doc.getArray('test').length === 1)
redisPersistence.destroy()
await redisPersistence.destroy()
}
}

Expand All @@ -68,18 +68,18 @@ export const testClearDocument = async tc => {
{
const redisPersistence = new RedisPersistence()
const doc = new Y.Doc()
await redisPersistence.bindState('test', doc).synced
await redisPersistence.bindState(tc.testName, doc).synced
doc.getArray('test').push([1])
await promise.wait(50)
await redisPersistence.clearDocument('test')
redisPersistence.destroy()
await redisPersistence.clearDocument(tc.testName)
await redisPersistence.destroy()
}
{
const redisPersistence = new RedisPersistence()
const doc = new Y.Doc()
await redisPersistence.bindState('test', doc).synced
await redisPersistence.bindState(tc.testName, doc).synced
t.assert(doc.getArray('test').length === 0)
redisPersistence.destroy()
await redisPersistence.destroy()
}
}

Expand All @@ -92,21 +92,23 @@ export const testClearAllDocument = async tc => {
{
const redisPersistence = new RedisPersistence()
const doc = new Y.Doc()
await redisPersistence.bindState('test', doc).synced
await redisPersistence.bindState(tc.testName, doc).synced
doc.getArray('test').push([1])
await promise.wait(50)
await redisPersistence.clearAllDocuments()
}
{
const redisPersistence = new RedisPersistence()
const doc = new Y.Doc()
await redisPersistence.bindState('test', doc).synced
await redisPersistence.bindState(tc.testName, doc).synced
t.assert(doc.getArray('test').length === 0)
redisPersistence.destroy()
await redisPersistence.destroy()
}
}

/**
* Test time until N updates are written to redis + time to receive and apply updates.
*
* @param {t.TestCase} tc
*/
export const testPerformance = async tc => {
Expand All @@ -116,7 +118,7 @@ export const testPerformance = async tc => {
{
const redisPersistence = new RedisPersistence()
const doc = new Y.Doc()
const persistenceDoc = redisPersistence.bindState('test', doc)
const persistenceDoc = redisPersistence.bindState(tc.testName, doc)
await persistenceDoc.synced
await t.measureTime(`write ${N / 1000}k updates`, async () => {
const testarray = doc.getArray('test')
Expand All @@ -125,21 +127,120 @@ export const testPerformance = async tc => {
}
await promise.until(0, () => persistenceDoc._clock >= N)
t.assert(testarray.length === N)
t.assert(persistenceDoc._clock === N)
return undefined
})
await t.measureTime('destroy persistence with pending pubsub messages', async () => {
await redisPersistence.destroy()
await redisPersistence.destroy()
}
{
const redisPersistence = new RedisPersistence()
await t.measureTime(`read ${N / 1000}k updates`, async () => {
const doc = new Y.Doc()
await redisPersistence.bindState(tc.testName, doc).synced
t.assert(doc.getArray('test').length === N)
return undefined
})
await redisPersistence.destroy()
}
}

/**
* Two clients concurrently adding a lot of updates. Syncing after every 10 updates.
*
* @param {t.TestCase} tc
*/
export const testPerformanceConcurrent = async tc => {
const redis = new Redis()
await redis.flushall()
const N = 100
{
const redisPersistence1 = new RedisPersistence()
const doc1 = new Y.Doc()
const persistenceDoc1 = redisPersistence1.bindState(tc.testName, doc1)
await persistenceDoc1.synced
const redisPersistence2 = new RedisPersistence()
const doc2 = new Y.Doc()
const persistenceDoc2 = redisPersistence2.bindState(tc.testName, doc2)
await persistenceDoc2.synced
await t.measureTime(`write ${N / 1000}k updates`, async () => {
const testarray1 = doc1.getArray('test')
const testarray2 = doc2.getArray('test')
for (let i = 0; i < N; i++) {
if (i % 2) {
testarray1.insert(0, [i])
} else {
testarray2.insert(0, [i])
}
if (i % 10 === 0) {
await promise.until(0, () => persistenceDoc1._clock > i && persistenceDoc2._clock >= i)
t.assert(persistenceDoc1._clock === i + 1)
t.assert(persistenceDoc2._clock === i + 1)
}
}
await promise.until(0, () => persistenceDoc1._clock >= N && persistenceDoc2._clock >= N)
t.assert(testarray1.length === N)
t.assert(testarray2.length === N)
t.assert(persistenceDoc1._clock === N)
t.assert(persistenceDoc2._clock === N)
return undefined
})
await redisPersistence1.destroy()
}
{
const redisPersistence = new RedisPersistence()
await t.measureTime(`read ${N / 1000}k updates`, async () => {
const doc = new Y.Doc()
await redisPersistence.bindState('test', doc).synced
await redisPersistence.bindState(tc.testName, doc).synced
t.assert(doc.getArray('test').length === N)
return undefined
})
await redisPersistence.destroy()
}
const updateslen = await redis.llen(`${tc.testName}:updates`)
t.assert(updateslen === N)
}

/**
* Test the time until another client received all updates.
*
* @param {t.TestCase} tc
*/
export const testPerformanceReceive = async tc => {
const redis = new Redis()
await redis.flushall()
const N = 10000
{
const redisPersistence1 = new RedisPersistence()
const doc1 = new Y.Doc()
const persistenceDoc1 = redisPersistence1.bindState(tc.testName, doc1)
await persistenceDoc1.synced
const redisPersistence2 = new RedisPersistence()
const doc2 = new Y.Doc()
const persistenceDoc2 = redisPersistence2.bindState(tc.testName, doc2)
await persistenceDoc2.synced
await t.measureTime(`write ${N / 1000}k updates`, async () => {
const testarray1 = doc1.getArray('test')
const testarray2 = doc1.getArray('test')
for (let i = 0; i < N; i++) {
testarray1.insert(0, [i])
}
await promise.until(0, () => persistenceDoc1._clock >= N && persistenceDoc2._clock >= N)
t.assert(testarray1.length === N)
t.assert(testarray2.length === N)
t.assert(persistenceDoc1._clock === N)
t.assert(persistenceDoc2._clock === N)
return undefined
})
await redisPersistence1.destroy()
}
await t.measureTime(`read ${N / 1000}k updates`, async () => {
const doc = new Y.Doc()
const redisPersistence = new RedisPersistence()
await redisPersistence.bindState(tc.testName, doc).synced
t.assert(doc.getArray('test').length === N)
redisPersistence.destroy()
return undefined
})
const updateslen = await redis.llen(`${tc.testName}:updates`)
t.assert(updateslen === N)
}

0 comments on commit 144fb94

Please sign in to comment.