Skip to content

Commit

Permalink
filter ops by origin again - reducing back-and-forth of messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dmonad committed May 26, 2024
1 parent 642b4ad commit 7542ed8
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 69 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"url": "https://github.com/sponsors/dmonad"
},
"scripts": {
"demo": "npm run clean && rollup -c && concurrently '0serve -o ./demo/index.html' 'rollup -wc' 'npm run ws-server'",
"demo": "rollup -c && concurrently -r '0serve -o ./demo/index.html' 'rollup -wc' 'npm run ws-server'",
"clean": "rm -rf dist .ystream .test_dbs tmp coverage",
"types": "tsc",
"gentesthtml": "npx 0gentesthtml --script ./tests/index.js > test.html",
Expand Down
27 changes: 18 additions & 9 deletions src/api/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { emitOpsEvent } from '../ystream.js'
import * as authorization from '../api/authorization.js'
import * as protocol from '../protocol.js'
import * as isodb from 'isodb'
import * as wsUtils from '../comms/websocket-utils.js'

/**
* @typedef {import('../ystream.js').Ystream} Ystream
Expand All @@ -28,11 +29,11 @@ const opsPerMessage = 300
* @param {number} startClock
* @param {Uint8Array?} owner
* @param {string?} collection
* @param {number} remoteClientId
* @param {import('../comm.js').Comm} comm
*
* @return {ReadableStream<{ messages: Array<dbtypes.OpValue|Uint8Array>, origin: any }>}
*/
export const createOpsReader = (ystream, startClock, owner, collection, remoteClientId) => {
export const createOpsReader = (ystream, startClock, owner, collection, comm) => {
let nextClock = startClock
/**
* @type {((ops: Array<dbtypes.OpValue>, origin: any) => void) | null}
Expand All @@ -45,10 +46,11 @@ export const createOpsReader = (ystream, startClock, owner, collection, remoteCl
const stream = new ReadableStream({
start (controller) {
listener = (ops, origin) => {
if (origin === comm && origin !== null) return
if (collection != null) {
ops = ops.filter(op => op.client !== remoteClientId && op.localClock >= nextClock && op.collection === collection && array.equalFlat(op.owner, /** @type {Uint8Array} */ (owner)))
ops = ops.filter(op => op.client !== comm.clientid && op.localClock >= nextClock && op.collection === collection && array.equalFlat(op.owner, /** @type {Uint8Array} */ (owner)))
} else {
ops = ops.filter(op => op.client !== remoteClientId && op.localClock >= nextClock)
ops = ops.filter(op => op.client !== comm.clientid && op.localClock >= nextClock)
}
while (ops.length > 0) {
const opsToSend = ops.splice(0, opsPerMessage)
Expand Down Expand Up @@ -86,7 +88,7 @@ export const createOpsReader = (ystream, startClock, owner, collection, remoteCl
}
return update.value
}))
ops = ops.filter(op => op.client !== remoteClientId)
ops = ops.filter(op => op.client !== comm.clientid)
if (ops.length === 0) {
nextClock = math.max(ystream._eclock || 0, nextClock)
console.log('sending synced step')
Expand Down Expand Up @@ -465,7 +467,6 @@ export const getNoPerms = async (tr, ystream, owner, collection) =>
export const getClock = async (tr, ystream, clientid, owner, collection) => {
if (ystream.clientid === clientid) {
const latestEntry = await tr.tables.oplog.getKeys({
end: number.HIGHEST_UINT32, // @todo change to uint
reverse: true,
limit: 1
})
Expand All @@ -478,7 +479,7 @@ export const getClock = async (tr, ystream, clientid, owner, collection) => {
owner != null && queries.push(clocksTable.get(new dbtypes.ClocksKey(clientid, owner, null)))
owner != null && collection != null && queries.push(clocksTable.get(new dbtypes.ClocksKey(clientid, owner, collection)))
const clocks = await promise.all(queries)
return array.fold(clocks.map(c => c ? c.clock : 0), 0, math.max)
return array.fold(clocks.map(c => c ? c.clock : -1), -1, math.max)
}

/**
Expand Down Expand Up @@ -581,7 +582,7 @@ export const applyRemoteOps = async (ystream, comm, ops, user, origin, startCloc
await ystream.transact(async tr => {
if (comm.nextClock < startClock) {
console.error('some operations seem to be missing. Reconnecting!', { commNextClock: comm.nextClock, startClock, endClock })
comm.close(1002, 'some operations seem to be missing')
comm.close(wsUtils.statusConsistencyError, 'some operations seem to be missing')
// process.exit(1) // @todo remove - this just exists to catch bugs
throw new Error('some operations seem to be missing') // instead of return, to cancel everything
}
Expand Down Expand Up @@ -645,6 +646,13 @@ export const applyRemoteOps = async (ystream, comm, ops, user, origin, startCloc
console.log('Not applying op because of missing permission', op, ystream.syncsEverything, user.hash, user.isTrusted)
}
}
if (ops.length > 0) {
// we know that we received all ops from the remote user up until endClock
const lastOp = ops[ops.length - 1]
// @todo this is only meant for single-collection syncs. When syncing all collections, we need a
// better mechanism.
clientClockEntries.set(encodeClocksKey(comm.clientid, lastOp.owner, lastOp.collection), new dbtypes.ClientClockValue(endClock, lastOp.localClock))
}
clientClockEntries.forEach((clockValue, encClocksKey) => {
const clocksKey = dbtypes.ClocksKey.decode(decoding.createDecoder(buffer.fromBase64(encClocksKey)))
tr.tables.clocks.set(clocksKey, clockValue)
Expand Down Expand Up @@ -673,7 +681,8 @@ export const applyRemoteOps = async (ystream, comm, ops, user, origin, startCloc
if (docupdates.length > 0 && ((docset && docset.size > 0) || env.isBrowser)) {
const mergedUpdate = Y.mergeUpdatesV2(docupdates.map(op => op.op.update))
if (docset && docset.size > 0) {
docset.forEach(doc => Y.applyUpdateV2(doc, mergedUpdate))
// @todo listen to the 'ops' event instead (but more efficiently)
docset.forEach(doc => Y.applyUpdateV2(doc, mergedUpdate, ystream))
}
/* c8 ignore start */
if (env.isBrowser) {
Expand Down
8 changes: 4 additions & 4 deletions src/comm.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import * as observable from 'lib0/observable'
* Interface that describes a communication channel.
*
* @interface
* @extends observable.ObservableV2<{ authenticated: (comm:Comm) => void }>
* @extends {observable.ObservableV2<{ authenticated: (comm:Comm) => void, "requested-ops": (comm: Comm, sub: { collection: { owner: Uint8Array?, name: string? }, clock: number }) => void }>}
*/
export class Comm extends observable.ObservableV2 {
get clientid () { return -1 }
Expand Down Expand Up @@ -73,10 +73,10 @@ export class Comm extends observable.ObservableV2 {
}

/**
* @param {number} [code]
* @param {string} [reason]
* @param {number} [_code]
* @param {string} [_reason]
*/
close (code, reason) {
close (_code, _reason) {
error.methodUnimplemented()
}
}
Expand Down
53 changes: 41 additions & 12 deletions src/comms/websocket-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import * as utils from '../utils.js'
import * as logging from 'lib0/logging'
import * as observable from 'lib0/observable'
import * as actions from '../api/actions.js'
import * as buffer from 'lib0/buffer'
import * as array from 'lib0/array'
import * as wsUtils from './websocket-utils.js'
import * as url from 'lib0/url'

const expectedBufferedAmount = 512 * 1024 // 512kb

Expand All @@ -45,17 +49,21 @@ const log = (comm, type, ...args) => _log(logging.PURPLE, `(local=${comm.ystream
const maxBufferedAmount = 3000_000

/**
* @implements comm.Comm
* @extends {observable.ObservableV2<{ authenticated: (comm: WSClient) => void }>}
* @implements {comm.Comm}
* @extends {observable.ObservableV2<{ authenticated: (comm:comm.Comm) => void, "requested-ops": (comm: comm.Comm, sub: { collection: { owner: Uint8Array?, name: string? }, clock: number }) => void }>}
*/
class WSClient extends observable.ObservableV2 {
/**
* @param {uws.WebSocket<{ client: WSClient }>} ws
* @param {Ystream.Ystream} ystream
* @param {object} collection
* @param {Uint8Array} collection.owner
* @param {string} collection.name
*/
constructor (ws, ystream) {
constructor (ws, ystream, collection) {
super()
this.ystream = ystream
this.collection = collection
this.clientid = -1
/**
* @type {import('../api/dbtypes.js').UserIdentity|null}
Expand Down Expand Up @@ -84,7 +92,7 @@ class WSClient extends observable.ObservableV2 {
*/
this.writer = new WritableStream({
write: ({ messages, origin }) => {
log(this, 'sending ops', () => { return `number of ops=${messages.length}` })
log(this, 'sending messages', () => { return `number of messages=${messages.length}` })
for (let i = 0; i < messages.length; i++) {
this.send(messages[i])
}
Expand All @@ -95,11 +103,16 @@ class WSClient extends observable.ObservableV2 {
}
})
this.on('authenticated', async () => {
const encoder = encoding.createEncoder()
const clock = await ystream.transact(tr => actions.getClock(tr, ystream, this.clientid, null, null))
this.nextClock = clock
protocol.writeRequestAllOps(encoder, clock)
this.send(encoding.toUint8Array(encoder))
const clock = await ystream.transact(tr => actions.getClock(tr, ystream, this.clientid, this.collection.owner, this.collection.name))
this.nextClock = clock + 1
this.send(encoding.encode(encoder => {
protocol.writeRequestOps(encoder, this.collection.owner, this.collection.name, this.nextClock)
}))
})
this.on('requested-ops', (_comm, { collection }) => {
if (collection.name !== this.collection.name || collection.owner == null || !array.equalFlat(collection.owner, this.collection.owner)) {
this.close(wsUtils.statusUnauthenticated, 'cannot request ops from other collections')
}
})
}

Expand Down Expand Up @@ -144,6 +157,9 @@ class WSClient extends observable.ObservableV2 {
}

/**
* Close the connection.
* Use a status code from websocket-utils.js
*
* @param {number} [code]
* @param {string} [reason]
*/
Expand Down Expand Up @@ -200,18 +216,31 @@ export class WSServer {
this.ystream = ystream
this.ready = ystream.whenAuthenticated.then(() => promise.create((resolve, reject) => {
console.log('starting websocket server')
uws.App({}).ws('/*', /** @type {uws.WebSocketBehavior<{ client: WSClient }>} */ ({
uws.App({}).ws('/:owner/:cname', /** @type {uws.WebSocketBehavior<{ client: WSClient, collection: { owner: Uint8Array, name: string } }>} */ ({
/* Options */
compression: uws.SHARED_COMPRESSOR,
maxPayloadLength: 70 * 1024 * 1024 * 1024,
// @todo use the "dropped" timeout to create a new reader that reads directly form the
// database without consuming much memory.
maxBackpressure: 70 * 1024 * 1024 * 1024 * 100,
// closeOnBackpressureLimit: true, // @todo reenable once types are fixed
idleTimeout: 960,
upgrade: (res, req, context) => {
const owner = buffer.fromBase64UrlEncoded(req.getParameter(0))
const name = decodeURIComponent(req.getParameter(1))
res.upgrade(
{ client: null, collection: { owner, name } },
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context
)
},
/* Handlers */
open: (ws) => {
const client = new WSClient(ws, ystream)
ws.getUserData().client = client
const userData = ws.getUserData()
const client = new WSClient(ws, ystream, userData.collection)
userData.client = client
client.send(encoding.encode(encoder => {
protocol.writeInfo(encoder, ystream, client)
}))
Expand Down
11 changes: 11 additions & 0 deletions src/comms/websocket-utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Status codes for closing websocket connections
*
* We may specify status codes in the range of 3000-3999.
* Ref: https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4
*/

export const statusNormal = 1000
export const statusUnauthenticated = 3000
export const statusParseError = 3100
export const statusConsistencyError = 3200
29 changes: 19 additions & 10 deletions src/comms/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@ class WebSocketCommInstance extends ObservableV2 {
this.on('authenticated', async () => {
const encoder = encoding.createEncoder()
await ystream.transact(tr =>
actions.getClock(tr, ystream, this.clientid, handler.collection.owner, handler.collection.name).then(clock => {
this.nextClock = clock
return protocol.writeRequestOps(encoder, handler.collection.owner, handler.collection.name, clock)
actions.getClock(tr, ystream, this.clientid, handler.collection.owner, handler.collection.name).then(async clock => {
const sv = await actions.getStateVector(tr, ystream, handler.collection.owner, handler.collection.name)
log(this, 'requesting ops', { clock, clientid: this.clientid, sv })
this.nextClock = clock + 1
return protocol.writeRequestOps(encoder, handler.collection.owner, handler.collection.name, this.nextClock)
})
)
this.send(encoding.toUint8Array(encoder))
Expand All @@ -166,6 +168,9 @@ class WebSocketCommInstance extends ObservableV2 {
}

/**
* Close the connection.
* Use a status code from websocket-utils.js
*
* @param {number} [code]
* @param {string} [reason]
*/
Expand Down Expand Up @@ -202,13 +207,13 @@ class WebSocketCommInstance extends ObservableV2 {
class WebSocketHandlerInstance extends ObservableV2 {
/**
* @param {import('../ystream.js').Ystream} ystream
* @param {string} url
* @param {string} serverUrl
* @param {{ owner: Uint8Array, name: string }} collection
*/
constructor (ystream, url, collection) {
constructor (ystream, serverUrl, collection) {
super()
this.ystream = ystream
this.url = url
this.serverUrl = serverUrl
this.collection = collection
this.shouldConnect = true
this.wsUnsuccessfulReconnects = 0
Expand All @@ -222,6 +227,10 @@ class WebSocketHandlerInstance extends ObservableV2 {
}
}

get url () {
return `${this.serverUrl}/${buffer.toBase64UrlEncoded(this.collection.owner)}/${encodeURIComponent(this.collection.name)}`
}

_setupNewComm () {
if (!this.shouldConnect) return
const prevComm = this.comm
Expand Down Expand Up @@ -259,18 +268,18 @@ class WebSocketHandlerInstance extends ObservableV2 {
*/
export class WebSocketComm {
/**
* @param {string} url
* @param {string} serverUrl
* @param {{ owner: string, name: string }} collection
*/
constructor (url, collection) {
this.url = url
constructor (serverUrl, collection) {
this.serverUrl = serverUrl
this.collection = collection
}

/**
* @param {Ystream} ystream
*/
init (ystream) {
return new WebSocketHandlerInstance(ystream, this.url, { owner: buffer.fromBase64(this.collection.owner), name: this.collection.name })
return new WebSocketHandlerInstance(ystream, this.serverUrl, { owner: buffer.fromBase64(this.collection.owner), name: this.collection.name })
}
}
3 changes: 2 additions & 1 deletion src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ export const createDb = dbname =>
*/
let deviceClaim = null
if (version == null) {
clientid = random.uint53()
// @todo derive clientid from deviceid
clientid = random.uint32()
// init
tr.objects.db.set('version', 0)
const dguid = new Uint8Array(64)
Expand Down
4 changes: 1 addition & 3 deletions src/extensions/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ const _eventsToCompute = []
*/
const _computeEvents = async yfs => {
const ycollection = yfs.ycollection
console.log('all events to compute', _eventsToCompute)
// console.log('all events to compute', _eventsToCompute)
while (_eventsToCompute.length > 0) {
await yfs.ystream.transact(async tr => {
for (let iterations = 0; _eventsToCompute.length > 0 && iterations < 300; iterations++) {
Expand Down Expand Up @@ -242,15 +242,13 @@ const _computeEvents = async yfs => {
}
case 'addDir': {
const { docid, isNew } = await mkPath(tr, ycollection.ystream, ycollection.ownerBin, ycollection.collection, 'root', arrPath)
console.log('addDir', { isNew })
if (isNew) {
await ycollection.setLww(tr, docid, {}) // regarding await: make sure that this document exists before continuing
} else {
const currContent = await ycollection.getLww(tr, docid)
if (currContent?.constructor === Object) { // exists and is already a directory
// nop
} else {
console.log('overwriting current content')
await ycollection.setLww(tr, docid, {})
}
}
Expand Down
Loading

0 comments on commit 7542ed8

Please sign in to comment.