From 4be6f1cf288667857c287c1f7aba06a49e3efc29 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Mon, 20 Nov 2023 15:20:17 -0500 Subject: [PATCH 1/8] abstract signaling server client --- src/y-webrtc.js | 191 +++++++++++++++++++++++++++++------------------- 1 file changed, 114 insertions(+), 77 deletions(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index 91a77c3..f6b473b 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -269,8 +269,8 @@ const broadcastRoomMessage = (room, m) => { const announceSignalingInfo = room => { signalingConns.forEach(conn => { // only subscribe if connection is established, otherwise the conn automatically subscribes to all rooms - if (conn.connected) { - conn.send({ type: 'subscribe', topics: [room.name] }) + if (conn.connected()) { + conn.subscribe(room.name) if (room.webrtcConns.size < room.provider.maxConns) { publishSignalingMessage(conn, room, { type: 'announce', from: room.peerId }) } @@ -412,8 +412,8 @@ export class Room { disconnect () { // signal through all available signaling connections signalingConns.forEach(conn => { - if (conn.connected) { - conn.send({ type: 'unsubscribe', topics: [this.name] }) + if (conn.connected()) { + conn.unsubscribe(this.name) } }) awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'disconnect') @@ -466,96 +466,133 @@ const openRoom = (doc, provider, name, key) => { const publishSignalingMessage = (conn, room, data) => { if (room.key) { cryptoutils.encryptJson(data, room.key).then(data => { - conn.send({ type: 'publish', topic: room.name, data: buffer.toBase64(data) }) + conn.publish(room.name, buffer.toBase64(data)) }) } else { - conn.send({ type: 'publish', topic: room.name, data }) + conn.publish(room.name, data) } } -export class SignalingConn extends ws.WebsocketClient { +/** + * @param {SignalingConn} conn + * @param {Room} room + * @param {any} data + */ +const handleMessage = (conn, roomName, data) => { + const room = rooms.get(roomName) + if (room == null || typeof roomName !== 'string') { + return + } + const execMessage = data => { + const webrtcConns = room.webrtcConns + const peerId = room.peerId + if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { + // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel + return + } + const emitPeerChange = webrtcConns.has(data.from) + ? () => {} + : () => + room.provider.emit('peers', [{ + removed: [], + added: [data.from], + webrtcPeers: Array.from(room.webrtcConns.keys()), + bcPeers: Array.from(room.bcConns) + }]) + switch (data.type) { + case 'announce': + if (webrtcConns.size < room.provider.maxConns) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(conn, true, data.from, room)) + emitPeerChange() + } + break + case 'signal': + if (data.signal.type === 'offer') { + const existingConn = webrtcConns.get(data.from) + if (existingConn) { + const remoteToken = data.token + const localToken = existingConn.glareToken + if (localToken && localToken > remoteToken) { + log('offer rejected: ', data.from) + return + } + // if we don't reject the offer, we will be accepting it and answering it + existingConn.glareToken = undefined + } + } + if (data.signal.type === 'answer') { + log('offer answered by: ', data.from) + const existingConn = webrtcConns.get(data.from) + existingConn.glareToken = undefined + } + if (data.to === peerId) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(conn, false, data.from, room)).peer.signal(data.signal) + emitPeerChange() + } + break + } + } + if (room.key) { + if (typeof data === 'string') { + cryptoutils.decryptJson(buffer.fromBase64(data), room.key).then(execMessage) + } + } else { + execMessage(data) + } +} + +export class SignalingConn { constructor (url) { - super(url) + this.url = url /** * @type {Set} */ this.providers = new Set() - this.on('connect', () => { - log(`connected (${url})`) + this.setupClient() + } + + setupClient() { + this.client = new ws.WebsocketClient(this.url) + this.client.on('connect', () => { + log(`connected (${this.url})`) const topics = Array.from(rooms.keys()) - this.send({ type: 'subscribe', topics }) - rooms.forEach(room => - publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) - ) + this.client.send({ type: 'subscribe', topics }) + this.handleConnect() }) - this.on('message', m => { + this.client.on('message', m => { switch (m.type) { case 'publish': { - const roomName = m.topic - const room = rooms.get(roomName) - if (room == null || typeof roomName !== 'string') { - return - } - const execMessage = data => { - const webrtcConns = room.webrtcConns - const peerId = room.peerId - if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { - // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel - return - } - const emitPeerChange = webrtcConns.has(data.from) - ? () => {} - : () => - room.provider.emit('peers', [{ - removed: [], - added: [data.from], - webrtcPeers: Array.from(room.webrtcConns.keys()), - bcPeers: Array.from(room.bcConns) - }]) - switch (data.type) { - case 'announce': - if (webrtcConns.size < room.provider.maxConns) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) - emitPeerChange() - } - break - case 'signal': - if (data.signal.type === 'offer') { - const existingConn = webrtcConns.get(data.from) - if (existingConn) { - const remoteToken = data.token - const localToken = existingConn.glareToken - if (localToken && localToken > remoteToken) { - log('offer rejected: ', data.from) - return - } - // if we don't reject the offer, we will be accepting it and answering it - existingConn.glareToken = undefined - } - } - if (data.signal.type === 'answer') { - log('offer answered by: ', data.from) - const existingConn = webrtcConns.get(data.from) - existingConn.glareToken = undefined - } - if (data.to === peerId) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) - emitPeerChange() - } - break - } - } - if (room.key) { - if (typeof m.data === 'string') { - cryptoutils.decryptJson(buffer.fromBase64(m.data), room.key).then(execMessage) - } - } else { - execMessage(m.data) - } + handleMessage(this, m.topic, m.data) } } }) - this.on('disconnect', () => log(`disconnect (${url})`)) + this.client.on('disconnect', () => log(`disconnect (${this.url})`)) + } + + handleConnect() { + rooms.forEach(room => + publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) + ) + } + + connected () { + return this.client.connected + } + + subscribe (roomName) { + this.client.send({ type: 'subscribe', topics: [roomName] }) + } + + unsubscribe (roomName) { + this.client.send({ type: 'unsubscribe', topics: [roomName] }) + } + + publish (roomName, message) { + this.client.send({ type: 'publish', topic: roomName, data: message }) + } + + destroy () { + this.client.destroy() } } From e4ab973220992616f2ad0eb14bf64ac69122f747 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Tue, 21 Nov 2023 11:23:00 -0500 Subject: [PATCH 2/8] export publishSignalingMessage for use by derived classes --- src/y-webrtc.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index f6b473b..1250bdc 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -463,7 +463,7 @@ const openRoom = (doc, provider, name, key) => { * @param {Room} room * @param {any} data */ -const publishSignalingMessage = (conn, room, data) => { +export const publishSignalingMessage = (conn, room, data) => { if (room.key) { cryptoutils.encryptJson(data, room.key).then(data => { conn.publish(room.name, buffer.toBase64(data)) From 4725aeb69e74d0544c92f1f58730b4f6bbbffc23 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Tue, 21 Nov 2023 11:23:32 -0500 Subject: [PATCH 3/8] make handleMessage a method of SignalingConn --- src/y-webrtc.js | 133 +++++++++++++++++++++++------------------------- 1 file changed, 64 insertions(+), 69 deletions(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index 1250bdc..5b6ed9f 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -473,74 +473,6 @@ export const publishSignalingMessage = (conn, room, data) => { } } -/** - * @param {SignalingConn} conn - * @param {Room} room - * @param {any} data - */ -const handleMessage = (conn, roomName, data) => { - const room = rooms.get(roomName) - if (room == null || typeof roomName !== 'string') { - return - } - const execMessage = data => { - const webrtcConns = room.webrtcConns - const peerId = room.peerId - if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { - // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel - return - } - const emitPeerChange = webrtcConns.has(data.from) - ? () => {} - : () => - room.provider.emit('peers', [{ - removed: [], - added: [data.from], - webrtcPeers: Array.from(room.webrtcConns.keys()), - bcPeers: Array.from(room.bcConns) - }]) - switch (data.type) { - case 'announce': - if (webrtcConns.size < room.provider.maxConns) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(conn, true, data.from, room)) - emitPeerChange() - } - break - case 'signal': - if (data.signal.type === 'offer') { - const existingConn = webrtcConns.get(data.from) - if (existingConn) { - const remoteToken = data.token - const localToken = existingConn.glareToken - if (localToken && localToken > remoteToken) { - log('offer rejected: ', data.from) - return - } - // if we don't reject the offer, we will be accepting it and answering it - existingConn.glareToken = undefined - } - } - if (data.signal.type === 'answer') { - log('offer answered by: ', data.from) - const existingConn = webrtcConns.get(data.from) - existingConn.glareToken = undefined - } - if (data.to === peerId) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(conn, false, data.from, room)).peer.signal(data.signal) - emitPeerChange() - } - break - } - } - if (room.key) { - if (typeof data === 'string') { - cryptoutils.decryptJson(buffer.fromBase64(data), room.key).then(execMessage) - } - } else { - execMessage(data) - } -} - export class SignalingConn { constructor (url) { this.url = url @@ -562,7 +494,7 @@ export class SignalingConn { this.client.on('message', m => { switch (m.type) { case 'publish': { - handleMessage(this, m.topic, m.data) + this.handleMessage(m.topic, m.data) } } }) @@ -575,6 +507,69 @@ export class SignalingConn { ) } + handleMessage (roomName, data) { + const room = rooms.get(roomName) + if (room == null || typeof roomName !== 'string') { + return + } + const execMessage = data => { + const webrtcConns = room.webrtcConns + const peerId = room.peerId + if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { + // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel + return + } + const emitPeerChange = webrtcConns.has(data.from) + ? () => {} + : () => + room.provider.emit('peers', [{ + removed: [], + added: [data.from], + webrtcPeers: Array.from(room.webrtcConns.keys()), + bcPeers: Array.from(room.bcConns) + }]) + switch (data.type) { + case 'announce': + if (webrtcConns.size < room.provider.maxConns) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) + emitPeerChange() + } + break + case 'signal': + if (data.signal.type === 'offer') { + const existingConn = webrtcConns.get(data.from) + if (existingConn) { + const remoteToken = data.token + const localToken = existingConn.glareToken + if (localToken && localToken > remoteToken) { + log('offer rejected: ', data.from) + return + } + // if we don't reject the offer, we will be accepting it and answering it + existingConn.glareToken = undefined + } + } + if (data.signal.type === 'answer') { + log('offer answered by: ', data.from) + const existingConn = webrtcConns.get(data.from) + existingConn.glareToken = undefined + } + if (data.to === peerId) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) + emitPeerChange() + } + break + } + } + if (room.key) { + if (typeof data === 'string') { + cryptoutils.decryptJson(buffer.fromBase64(data), room.key).then(execMessage) + } + } else { + execMessage(data) + } + } + connected () { return this.client.connected } From f2c18471b75e2fed18f3ddb41964597f35042f71 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Tue, 28 Nov 2023 15:34:50 -0500 Subject: [PATCH 4/8] remove publishSignalingMessage export --- src/y-webrtc.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index 5b6ed9f..dd7c246 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -463,7 +463,7 @@ const openRoom = (doc, provider, name, key) => { * @param {Room} room * @param {any} data */ -export const publishSignalingMessage = (conn, room, data) => { +const publishSignalingMessage = (conn, room, data) => { if (room.key) { cryptoutils.encryptJson(data, room.key).then(data => { conn.publish(room.name, buffer.toBase64(data)) From 77360196bd67343b51775a7c2341701116a961a7 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Tue, 28 Nov 2023 15:37:42 -0500 Subject: [PATCH 5/8] only setup client if undefined --- src/y-webrtc.js | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index dd7c246..0c70336 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -484,21 +484,23 @@ export class SignalingConn { } setupClient() { - this.client = new ws.WebsocketClient(this.url) - this.client.on('connect', () => { - log(`connected (${this.url})`) - const topics = Array.from(rooms.keys()) - this.client.send({ type: 'subscribe', topics }) - this.handleConnect() - }) - this.client.on('message', m => { - switch (m.type) { - case 'publish': { - this.handleMessage(m.topic, m.data) + if (this.client === undefined) { + this.client = new ws.WebsocketClient(this.url) + this.client.on('connect', () => { + log(`connected (${this.url})`) + const topics = Array.from(rooms.keys()) + this.client.send({ type: 'subscribe', topics }) + this.handleConnect() + }) + this.client.on('message', m => { + switch (m.type) { + case 'publish': { + this.handleMessage(m.topic, m.data) + } } - } - }) - this.client.on('disconnect', () => log(`disconnect (${this.url})`)) + }) + this.client.on('disconnect', () => log(`disconnect (${this.url})`)) + } } handleConnect() { @@ -569,7 +571,7 @@ export class SignalingConn { execMessage(data) } } - + connected () { return this.client.connected } From dec3d276c375592d2c76f5ccc29fa9e91acb5a6d Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Tue, 5 Dec 2023 13:51:48 -0500 Subject: [PATCH 6/8] add joinAllRooms and addSignalingConn methods --- src/y-webrtc.js | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index 0c70336..c09fdc9 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -488,8 +488,7 @@ export class SignalingConn { this.client = new ws.WebsocketClient(this.url) this.client.on('connect', () => { log(`connected (${this.url})`) - const topics = Array.from(rooms.keys()) - this.client.send({ type: 'subscribe', topics }) + this.joinAllRooms() this.handleConnect() }) this.client.on('message', m => { @@ -591,6 +590,13 @@ export class SignalingConn { destroy () { this.client.destroy() } + + joinAllRooms() { + const topics = Array.from(rooms.keys()) + topics.forEach(topic => + this.subscribe(topic) + ) + } } /** @@ -667,11 +673,7 @@ export class WebrtcProvider extends Observable { connect () { this.shouldConnect = true - this.signalingUrls.forEach(url => { - const signalingConn = map.setIfUndefined(signalingConns, url, () => new SignalingConn(url)) - this.signalingConns.push(signalingConn) - signalingConn.providers.add(this) - }) + this.signalingUrls.forEach(url => this.addSignalingConn(url, () => new SignalingConn(url))) if (this.room) { this.room.connect() } @@ -700,4 +702,10 @@ export class WebrtcProvider extends Observable { }) super.destroy() } + + addSignalingConn(url, conn) { + const signalingConn = map.setIfUndefined(signalingConns, url, conn) + this.signalingConns.push(signalingConn) + signalingConn.providers.add(this) + } } From a31abed7867e523b5374ad74b76048b92dc40115 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Wed, 6 Dec 2023 15:45:53 -0500 Subject: [PATCH 7/8] combine joinAllRooms and handleConnect --- src/y-webrtc.js | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index c09fdc9..af77de7 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -484,11 +484,10 @@ export class SignalingConn { } setupClient() { - if (this.client === undefined) { + // if (this.client === undefined) { this.client = new ws.WebsocketClient(this.url) this.client.on('connect', () => { log(`connected (${this.url})`) - this.joinAllRooms() this.handleConnect() }) this.client.on('message', m => { @@ -499,10 +498,14 @@ export class SignalingConn { } }) this.client.on('disconnect', () => log(`disconnect (${this.url})`)) - } + // } } handleConnect() { + const topics = Array.from(rooms.keys()) + topics.forEach(topic => + this.subscribe(topic) + ) rooms.forEach(room => publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) ) @@ -590,13 +593,6 @@ export class SignalingConn { destroy () { this.client.destroy() } - - joinAllRooms() { - const topics = Array.from(rooms.keys()) - topics.forEach(topic => - this.subscribe(topic) - ) - } } /** From 19bf460d5afff2245778cc3375df4b84d90dc003 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Wed, 6 Dec 2023 15:46:56 -0500 Subject: [PATCH 8/8] remove commented conditional --- src/y-webrtc.js | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index af77de7..76e50ae 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -484,21 +484,19 @@ export class SignalingConn { } setupClient() { - // if (this.client === undefined) { - this.client = new ws.WebsocketClient(this.url) - this.client.on('connect', () => { - log(`connected (${this.url})`) - this.handleConnect() - }) - this.client.on('message', m => { - switch (m.type) { - case 'publish': { - this.handleMessage(m.topic, m.data) - } + this.client = new ws.WebsocketClient(this.url) + this.client.on('connect', () => { + log(`connected (${this.url})`) + this.handleConnect() + }) + this.client.on('message', m => { + switch (m.type) { + case 'publish': { + this.handleMessage(m.topic, m.data) } - }) - this.client.on('disconnect', () => log(`disconnect (${this.url})`)) - // } + } + }) + this.client.on('disconnect', () => log(`disconnect (${this.url})`)) } handleConnect() {