From 7542ed80cd349b27daa40f00ac0e38615b165a76 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Sun, 26 May 2024 01:56:58 +0200 Subject: [PATCH] filter ops by origin again - reducing back-and-forth of messages --- package.json | 2 +- src/api/actions.js | 27 ++++++++++++------ src/comm.js | 8 +++--- src/comms/websocket-server.js | 53 +++++++++++++++++++++++++++-------- src/comms/websocket-utils.js | 11 ++++++++ src/comms/websocket.js | 29 ++++++++++++------- src/db.js | 3 +- src/extensions/fs.js | 4 +-- src/protocol.js | 19 +++++++------ src/ystream.js | 12 ++++++-- tests/helpers.js | 23 ++++----------- 11 files changed, 122 insertions(+), 69 deletions(-) create mode 100644 src/comms/websocket-utils.js diff --git a/package.json b/package.json index b086004..d5050b4 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "url": "https://github.com/sponsors/dmonad" }, "scripts": { - "demo": "npm run clean && rollup -c && concurrently '0serve -o ./demo/index.html' 'rollup -wc' 'npm run ws-server'", + "demo": "rollup -c && concurrently -r '0serve -o ./demo/index.html' 'rollup -wc' 'npm run ws-server'", "clean": "rm -rf dist .ystream .test_dbs tmp coverage", "types": "tsc", "gentesthtml": "npx 0gentesthtml --script ./tests/index.js > test.html", diff --git a/src/api/actions.js b/src/api/actions.js index 3709033..95e9ed2 100644 --- a/src/api/actions.js +++ b/src/api/actions.js @@ -16,6 +16,7 @@ import { emitOpsEvent } from '../ystream.js' import * as authorization from '../api/authorization.js' import * as protocol from '../protocol.js' import * as isodb from 'isodb' +import * as wsUtils from '../comms/websocket-utils.js' /** * @typedef {import('../ystream.js').Ystream} Ystream @@ -28,11 +29,11 @@ const opsPerMessage = 300 * @param {number} startClock * @param {Uint8Array?} owner * @param {string?} collection - * @param {number} remoteClientId + * @param {import('../comm.js').Comm} comm * * @return {ReadableStream<{ messages: Array, origin: any }>} */ -export const createOpsReader = (ystream, startClock, owner, collection, remoteClientId) => { +export const createOpsReader = (ystream, startClock, owner, collection, comm) => { let nextClock = startClock /** * @type {((ops: Array, origin: any) => void) | null} @@ -45,10 +46,11 @@ export const createOpsReader = (ystream, startClock, owner, collection, remoteCl const stream = new ReadableStream({ start (controller) { listener = (ops, origin) => { + if (origin === comm && origin !== null) return if (collection != null) { - ops = ops.filter(op => op.client !== remoteClientId && op.localClock >= nextClock && op.collection === collection && array.equalFlat(op.owner, /** @type {Uint8Array} */ (owner))) + ops = ops.filter(op => op.client !== comm.clientid && op.localClock >= nextClock && op.collection === collection && array.equalFlat(op.owner, /** @type {Uint8Array} */ (owner))) } else { - ops = ops.filter(op => op.client !== remoteClientId && op.localClock >= nextClock) + ops = ops.filter(op => op.client !== comm.clientid && op.localClock >= nextClock) } while (ops.length > 0) { const opsToSend = ops.splice(0, opsPerMessage) @@ -86,7 +88,7 @@ export const createOpsReader = (ystream, startClock, owner, collection, remoteCl } return update.value })) - ops = ops.filter(op => op.client !== remoteClientId) + ops = ops.filter(op => op.client !== comm.clientid) if (ops.length === 0) { nextClock = math.max(ystream._eclock || 0, nextClock) console.log('sending synced step') @@ -465,7 +467,6 @@ export const getNoPerms = async (tr, ystream, owner, collection) => export const getClock = async (tr, ystream, clientid, owner, collection) => { if (ystream.clientid === clientid) { const latestEntry = await tr.tables.oplog.getKeys({ - end: number.HIGHEST_UINT32, // @todo change to uint reverse: true, limit: 1 }) @@ -478,7 +479,7 @@ export const getClock = async (tr, ystream, clientid, owner, collection) => { owner != null && queries.push(clocksTable.get(new dbtypes.ClocksKey(clientid, owner, null))) owner != null && collection != null && queries.push(clocksTable.get(new dbtypes.ClocksKey(clientid, owner, collection))) const clocks = await promise.all(queries) - return array.fold(clocks.map(c => c ? c.clock : 0), 0, math.max) + return array.fold(clocks.map(c => c ? c.clock : -1), -1, math.max) } /** @@ -581,7 +582,7 @@ export const applyRemoteOps = async (ystream, comm, ops, user, origin, startCloc await ystream.transact(async tr => { if (comm.nextClock < startClock) { console.error('some operations seem to be missing. Reconnecting!', { commNextClock: comm.nextClock, startClock, endClock }) - comm.close(1002, 'some operations seem to be missing') + comm.close(wsUtils.statusConsistencyError, 'some operations seem to be missing') // process.exit(1) // @todo remove - this just exists to catch bugs throw new Error('some operations seem to be missing') // instead of return, to cancel everything } @@ -645,6 +646,13 @@ export const applyRemoteOps = async (ystream, comm, ops, user, origin, startCloc console.log('Not applying op because of missing permission', op, ystream.syncsEverything, user.hash, user.isTrusted) } } + if (ops.length > 0) { + // we know that we received all ops from the remote user up until endClock + const lastOp = ops[ops.length - 1] + // @todo this is only meant for single-collection syncs. When syncing all collections, we need a + // better mechanism. + clientClockEntries.set(encodeClocksKey(comm.clientid, lastOp.owner, lastOp.collection), new dbtypes.ClientClockValue(endClock, lastOp.localClock)) + } clientClockEntries.forEach((clockValue, encClocksKey) => { const clocksKey = dbtypes.ClocksKey.decode(decoding.createDecoder(buffer.fromBase64(encClocksKey))) tr.tables.clocks.set(clocksKey, clockValue) @@ -673,7 +681,8 @@ export const applyRemoteOps = async (ystream, comm, ops, user, origin, startCloc if (docupdates.length > 0 && ((docset && docset.size > 0) || env.isBrowser)) { const mergedUpdate = Y.mergeUpdatesV2(docupdates.map(op => op.op.update)) if (docset && docset.size > 0) { - docset.forEach(doc => Y.applyUpdateV2(doc, mergedUpdate)) + // @todo listen to the 'ops' event instead (but more efficiently) + docset.forEach(doc => Y.applyUpdateV2(doc, mergedUpdate, ystream)) } /* c8 ignore start */ if (env.isBrowser) { diff --git a/src/comm.js b/src/comm.js index 93b34e0..edf8bf2 100644 --- a/src/comm.js +++ b/src/comm.js @@ -21,7 +21,7 @@ import * as observable from 'lib0/observable' * Interface that describes a communication channel. * * @interface - * @extends observable.ObservableV2<{ authenticated: (comm:Comm) => void }> + * @extends {observable.ObservableV2<{ authenticated: (comm:Comm) => void, "requested-ops": (comm: Comm, sub: { collection: { owner: Uint8Array?, name: string? }, clock: number }) => void }>} */ export class Comm extends observable.ObservableV2 { get clientid () { return -1 } @@ -73,10 +73,10 @@ export class Comm extends observable.ObservableV2 { } /** - * @param {number} [code] - * @param {string} [reason] + * @param {number} [_code] + * @param {string} [_reason] */ - close (code, reason) { + close (_code, _reason) { error.methodUnimplemented() } } diff --git a/src/comms/websocket-server.js b/src/comms/websocket-server.js index c1d2a43..81d0020 100644 --- a/src/comms/websocket-server.js +++ b/src/comms/websocket-server.js @@ -31,6 +31,10 @@ import * as utils from '../utils.js' import * as logging from 'lib0/logging' import * as observable from 'lib0/observable' import * as actions from '../api/actions.js' +import * as buffer from 'lib0/buffer' +import * as array from 'lib0/array' +import * as wsUtils from './websocket-utils.js' +import * as url from 'lib0/url' const expectedBufferedAmount = 512 * 1024 // 512kb @@ -45,17 +49,21 @@ const log = (comm, type, ...args) => _log(logging.PURPLE, `(local=${comm.ystream const maxBufferedAmount = 3000_000 /** - * @implements comm.Comm - * @extends {observable.ObservableV2<{ authenticated: (comm: WSClient) => void }>} + * @implements {comm.Comm} + * @extends {observable.ObservableV2<{ authenticated: (comm:comm.Comm) => void, "requested-ops": (comm: comm.Comm, sub: { collection: { owner: Uint8Array?, name: string? }, clock: number }) => void }>} */ class WSClient extends observable.ObservableV2 { /** * @param {uws.WebSocket<{ client: WSClient }>} ws * @param {Ystream.Ystream} ystream + * @param {object} collection + * @param {Uint8Array} collection.owner + * @param {string} collection.name */ - constructor (ws, ystream) { + constructor (ws, ystream, collection) { super() this.ystream = ystream + this.collection = collection this.clientid = -1 /** * @type {import('../api/dbtypes.js').UserIdentity|null} @@ -84,7 +92,7 @@ class WSClient extends observable.ObservableV2 { */ this.writer = new WritableStream({ write: ({ messages, origin }) => { - log(this, 'sending ops', () => { return `number of ops=${messages.length}` }) + log(this, 'sending messages', () => { return `number of messages=${messages.length}` }) for (let i = 0; i < messages.length; i++) { this.send(messages[i]) } @@ -95,11 +103,16 @@ class WSClient extends observable.ObservableV2 { } }) this.on('authenticated', async () => { - const encoder = encoding.createEncoder() - const clock = await ystream.transact(tr => actions.getClock(tr, ystream, this.clientid, null, null)) - this.nextClock = clock - protocol.writeRequestAllOps(encoder, clock) - this.send(encoding.toUint8Array(encoder)) + const clock = await ystream.transact(tr => actions.getClock(tr, ystream, this.clientid, this.collection.owner, this.collection.name)) + this.nextClock = clock + 1 + this.send(encoding.encode(encoder => { + protocol.writeRequestOps(encoder, this.collection.owner, this.collection.name, this.nextClock) + })) + }) + this.on('requested-ops', (_comm, { collection }) => { + if (collection.name !== this.collection.name || collection.owner == null || !array.equalFlat(collection.owner, this.collection.owner)) { + this.close(wsUtils.statusUnauthenticated, 'cannot request ops from other collections') + } }) } @@ -144,6 +157,9 @@ class WSClient extends observable.ObservableV2 { } /** + * Close the connection. + * Use a status code from websocket-utils.js + * * @param {number} [code] * @param {string} [reason] */ @@ -200,18 +216,31 @@ export class WSServer { this.ystream = ystream this.ready = ystream.whenAuthenticated.then(() => promise.create((resolve, reject) => { console.log('starting websocket server') - uws.App({}).ws('/*', /** @type {uws.WebSocketBehavior<{ client: WSClient }>} */ ({ + uws.App({}).ws('/:owner/:cname', /** @type {uws.WebSocketBehavior<{ client: WSClient, collection: { owner: Uint8Array, name: string } }>} */ ({ /* Options */ compression: uws.SHARED_COMPRESSOR, maxPayloadLength: 70 * 1024 * 1024 * 1024, // @todo use the "dropped" timeout to create a new reader that reads directly form the // database without consuming much memory. maxBackpressure: 70 * 1024 * 1024 * 1024 * 100, + // closeOnBackpressureLimit: true, // @todo reenable once types are fixed idleTimeout: 960, + upgrade: (res, req, context) => { + const owner = buffer.fromBase64UrlEncoded(req.getParameter(0)) + const name = decodeURIComponent(req.getParameter(1)) + res.upgrade( + { client: null, collection: { owner, name } }, + req.getHeader('sec-websocket-key'), + req.getHeader('sec-websocket-protocol'), + req.getHeader('sec-websocket-extensions'), + context + ) + }, /* Handlers */ open: (ws) => { - const client = new WSClient(ws, ystream) - ws.getUserData().client = client + const userData = ws.getUserData() + const client = new WSClient(ws, ystream, userData.collection) + userData.client = client client.send(encoding.encode(encoder => { protocol.writeInfo(encoder, ystream, client) })) diff --git a/src/comms/websocket-utils.js b/src/comms/websocket-utils.js new file mode 100644 index 0000000..6109c6b --- /dev/null +++ b/src/comms/websocket-utils.js @@ -0,0 +1,11 @@ +/** + * Status codes for closing websocket connections + * + * We may specify status codes in the range of 3000-3999. + * Ref: https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4 + */ + +export const statusNormal = 1000 +export const statusUnauthenticated = 3000 +export const statusParseError = 3100 +export const statusConsistencyError = 3200 diff --git a/src/comms/websocket.js b/src/comms/websocket.js index c203a2e..faa1e6e 100644 --- a/src/comms/websocket.js +++ b/src/comms/websocket.js @@ -141,9 +141,11 @@ class WebSocketCommInstance extends ObservableV2 { this.on('authenticated', async () => { const encoder = encoding.createEncoder() await ystream.transact(tr => - actions.getClock(tr, ystream, this.clientid, handler.collection.owner, handler.collection.name).then(clock => { - this.nextClock = clock - return protocol.writeRequestOps(encoder, handler.collection.owner, handler.collection.name, clock) + actions.getClock(tr, ystream, this.clientid, handler.collection.owner, handler.collection.name).then(async clock => { + const sv = await actions.getStateVector(tr, ystream, handler.collection.owner, handler.collection.name) + log(this, 'requesting ops', { clock, clientid: this.clientid, sv }) + this.nextClock = clock + 1 + return protocol.writeRequestOps(encoder, handler.collection.owner, handler.collection.name, this.nextClock) }) ) this.send(encoding.toUint8Array(encoder)) @@ -166,6 +168,9 @@ class WebSocketCommInstance extends ObservableV2 { } /** + * Close the connection. + * Use a status code from websocket-utils.js + * * @param {number} [code] * @param {string} [reason] */ @@ -202,13 +207,13 @@ class WebSocketCommInstance extends ObservableV2 { class WebSocketHandlerInstance extends ObservableV2 { /** * @param {import('../ystream.js').Ystream} ystream - * @param {string} url + * @param {string} serverUrl * @param {{ owner: Uint8Array, name: string }} collection */ - constructor (ystream, url, collection) { + constructor (ystream, serverUrl, collection) { super() this.ystream = ystream - this.url = url + this.serverUrl = serverUrl this.collection = collection this.shouldConnect = true this.wsUnsuccessfulReconnects = 0 @@ -222,6 +227,10 @@ class WebSocketHandlerInstance extends ObservableV2 { } } + get url () { + return `${this.serverUrl}/${buffer.toBase64UrlEncoded(this.collection.owner)}/${encodeURIComponent(this.collection.name)}` + } + _setupNewComm () { if (!this.shouldConnect) return const prevComm = this.comm @@ -259,11 +268,11 @@ class WebSocketHandlerInstance extends ObservableV2 { */ export class WebSocketComm { /** - * @param {string} url + * @param {string} serverUrl * @param {{ owner: string, name: string }} collection */ - constructor (url, collection) { - this.url = url + constructor (serverUrl, collection) { + this.serverUrl = serverUrl this.collection = collection } @@ -271,6 +280,6 @@ export class WebSocketComm { * @param {Ystream} ystream */ init (ystream) { - return new WebSocketHandlerInstance(ystream, this.url, { owner: buffer.fromBase64(this.collection.owner), name: this.collection.name }) + return new WebSocketHandlerInstance(ystream, this.serverUrl, { owner: buffer.fromBase64(this.collection.owner), name: this.collection.name }) } } diff --git a/src/db.js b/src/db.js index 86473bb..34a4295 100644 --- a/src/db.js +++ b/src/db.js @@ -204,7 +204,8 @@ export const createDb = dbname => */ let deviceClaim = null if (version == null) { - clientid = random.uint53() + // @todo derive clientid from deviceid + clientid = random.uint32() // init tr.objects.db.set('version', 0) const dguid = new Uint8Array(64) diff --git a/src/extensions/fs.js b/src/extensions/fs.js index 25eafbc..9a4f9aa 100644 --- a/src/extensions/fs.js +++ b/src/extensions/fs.js @@ -200,7 +200,7 @@ const _eventsToCompute = [] */ const _computeEvents = async yfs => { const ycollection = yfs.ycollection - console.log('all events to compute', _eventsToCompute) + // console.log('all events to compute', _eventsToCompute) while (_eventsToCompute.length > 0) { await yfs.ystream.transact(async tr => { for (let iterations = 0; _eventsToCompute.length > 0 && iterations < 300; iterations++) { @@ -242,7 +242,6 @@ const _computeEvents = async yfs => { } case 'addDir': { const { docid, isNew } = await mkPath(tr, ycollection.ystream, ycollection.ownerBin, ycollection.collection, 'root', arrPath) - console.log('addDir', { isNew }) if (isNew) { await ycollection.setLww(tr, docid, {}) // regarding await: make sure that this document exists before continuing } else { @@ -250,7 +249,6 @@ const _computeEvents = async yfs => { if (currContent?.constructor === Object) { // exists and is already a directory // nop } else { - console.log('overwriting current content') await ycollection.setLww(tr, docid, {}) } } diff --git a/src/protocol.js b/src/protocol.js index 788f776..3891393 100644 --- a/src/protocol.js +++ b/src/protocol.js @@ -11,6 +11,7 @@ import * as buffer from 'lib0/buffer' import * as jose from 'lib0/crypto/jwt' import * as sha256 from 'lib0/hash/sha256' import * as string from 'lib0/string' +import * as wsUtils from './comms/websocket-utils.js' const _log = logging.createModuleLogger('@y/stream/protocol') /** @@ -163,18 +164,19 @@ const readRequestOps = async (decoder, ystream, comm) => { let nextClock = 0 if (requestedAllOps) { nextClock = decoding.readVarUint(decoder) - log(ystream, comm, 'RequestOps', 'requested all ops') + log(ystream, comm, 'RequestOps', 'requested all ops', () => ({ nextClock, remoteClientId: comm.clientid })) } else { // requested only a single collection owner = decoding.readVarUint8Array(decoder) collection = decoding.readVarString(decoder) nextClock = decoding.readVarUint(decoder) - log(ystream, comm, 'RequestOps', `requested "${collection}"`) + log(ystream, comm, 'RequestOps', `requested "${collection}" `, () => ({ nextClock, remoteClientId: comm.clientid })) } + comm.emit('requested-ops', [comm, { collection: { owner, name: collection }, clock: nextClock }]) console.log(ystream.clientid, 'subscribing conn to ops', { fcid: comm.clientid, collection, owner }) // @todo add method to filter by owner & collection - actions.createOpsReader(ystream, nextClock, owner, collection, comm.clientid).pipeTo(comm.writer, { signal: comm.streamController.signal }).catch((reason) => { - comm.close(1007, 'unexpected error reading ops stream') + actions.createOpsReader(ystream, nextClock, owner, collection, comm).pipeTo(comm.writer, { signal: comm.streamController.signal }).catch((reason) => { + comm.close(wsUtils.statusParseError, 'unexpected error reading ops stream') console.log('ended pipe', { reason, isDestroyed: comm.isDestroyed }) }) } @@ -225,13 +227,13 @@ const readInfo = async (encoder, decoder, ystream, comm) => { await authentication.registerUser(ystream, user) } else { log(ystream, comm, 'destroying', 'User not registered') - comm.close(1002, 'User not registered') + comm.close(wsUtils.statusUnauthenticated, 'User not registered') return } } const parsedClaim = await deviceClaim.verify(await user.publicKey) if (parsedClaim.payload.iss !== user.ekey) { - comm.close(1002, 'invalid user claim') + comm.close(wsUtils.statusUnauthenticated, 'invalid user claim') error.unexpectedCase() } await ystream.transact(async tr => { @@ -302,7 +304,7 @@ export const readMessage = async (encoder, decoder, ystream, comm) => { } else { if (comm.deviceClaim == null || comm.user == null || !comm.isAuthenticated) { log(ystream, comm, 'closing unauthenticated connection') - comm.close(1002, 'closing unauthenticated connection') + comm.close(wsUtils.statusUnauthenticated, 'closing unauthenticated connection') } switch (messageType) { case messageOps: { @@ -333,8 +335,7 @@ export const readMessage = async (encoder, decoder, ystream, comm) => { } return null } catch (err) { - debugger log(ystream, comm, 'Info rejection', 'Closing connection because of unexpected error', /** @type {Error} */ (err).stack) - comm.close(1007, 'Unexpected error when parsing message') + comm.close(wsUtils.statusParseError, 'Unexpected error when parsing message') } } diff --git a/src/ystream.js b/src/ystream.js index b15a719..d9d94b5 100644 --- a/src/ystream.js +++ b/src/ystream.js @@ -5,7 +5,6 @@ import * as promise from 'lib0/promise' import * as isodb from 'isodb' // eslint-disable-line import * as db from './db.js' // eslint-disable-line import { ObservableV2 } from 'lib0/observable' -import * as random from 'lib0/random' import * as actions from './api/actions.js' import * as dbtypes from './api/dbtypes.js' // eslint-disable-line import * as bc from 'lib0/broadcastchannel' @@ -85,6 +84,7 @@ const _emitOpsEvent = (ystream, ops, origin) => { return } ystream._nextEmitOps = _joinOpArrays(ystream._nextEmitOps, ops, ystream._eclock ?? -1) + ystream._nextEmitOpsOrigins.push(origin) if ( ystream._emitTimeout === null && ystream._nextEmitOps.length > 0 // && (ystream._eclock == null || ystream._nextEmitOps[0].localClock === ystream._eclock) @@ -101,7 +101,10 @@ const _emitOpsEvent = (ystream, ops, origin) => { const emitNow = nextEmitOps.splice(0, emitNowRange) ystream._eclock = eclock if (emitNow.length > 0) { - ystream.emit('ops', [emitNow, origin, true]) + ystream.emit('ops', [emitNow, ystream._nextEmitOpsOrigins.length === 1 ? ystream._nextEmitOpsOrigins[0] : null, true]) + } + if (nextEmitOps.length === 0) { + ystream._nextEmitOpsOrigins.length = 0 } }) } @@ -171,6 +174,11 @@ export class Ystream extends ObservableV2 { * @type {Array} */ this._nextEmitOps = [] + /** + * Ops that will be emitted next. + * @type {Array} + */ + this._nextEmitOpsOrigins = [] /** * @type {eventloop.TimeoutObject?} */ diff --git a/tests/helpers.js b/tests/helpers.js index 20437c6..7ed0988 100644 --- a/tests/helpers.js +++ b/tests/helpers.js @@ -167,23 +167,10 @@ export const waitDocsSynced = (ydoc1, ydoc2) => */ export const waitCollectionsSynced = (ycollection1, ycollection2) => promise.untilAsync(async () => { - const sv1 = await ycollection1.ystream.transact(tr => actions.getStateVector(tr, ycollection1.ystream, ycollection1.ownerBin, ycollection1.collection)) - const sv2 = await ycollection2.ystream.transact(tr => actions.getStateVector(tr, ycollection2.ystream, ycollection2.ownerBin, ycollection2.collection)) - /** - * @param {Ystream} ystream - * @return {(entries:any)=>any} - */ - const updateClocks = (ystream) => colEntries => colEntries.map(update => { - update.value.localClock = update.key.v - if (update.value.client === ystream.clientid) { - update.value.clock = update.key.v - } - return update.value - }) - const ops1 = await ycollection1.ystream.transact(tr => tr.tables.oplog.getEntries({ start: 0 })).then(updateClocks(ycollection1.ystream)) - const ops2 = await ycollection2.ystream.transact(tr => tr.tables.oplog.getEntries({ start: 0 })).then(updateClocks(ycollection2.ystream)) - const ops1sv = ops1.reduce((sv, op) => { sv[op.client] = op.clock; return sv }, /** @type {Object} */ ({})) - const ops2sv = ops2.reduce((sv, op) => { sv[op.client] = op.clock; return sv }, /** @type {Object} */ ({})) - console.log({ sv1, sv2, ops1: ops1.length, ops2: ops2.length, ops1sv, ops2sv, clientid1: ycollection1.ystream.clientid, clientid2: ycollection2.ystream.clientid }) + let sv1 = await ycollection1.ystream.transact(tr => actions.getStateVector(tr, ycollection1.ystream, ycollection1.ownerBin, ycollection1.collection)) + let sv2 = await ycollection2.ystream.transact(tr => actions.getStateVector(tr, ycollection2.ystream, ycollection2.ownerBin, ycollection2.collection)) + sv1 = sv1.filter(s => s.client !== server?.ystream.clientid) + sv2 = sv2.filter(s => s.client !== server?.ystream.clientid) + console.log({ sv1, sv2, clientid1: ycollection1.ystream.clientid, clientid2: ycollection2.ystream.clientid, serverClientId: server?.ystream.clientid }) return fun.equalityDeep(sv1, sv2) }, 0, 100)