From fd6f29b6e692a23c7fe5034cc4eee55af98ea55c Mon Sep 17 00:00:00 2001 From: cy Date: Fri, 21 May 2021 07:26:03 -0500 Subject: [PATCH] extend simple peer to handle buffered/packet transmission; add dependency with license --- src/SimplePeerExtended.js | 131 ++++++++++++++++++++++++++++++++++++++ src/int64-buffer.min.js | 25 ++++++++ src/y-webrtc.js | 4 +- 3 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 src/SimplePeerExtended.js create mode 100644 src/int64-buffer.min.js diff --git a/src/SimplePeerExtended.js b/src/SimplePeerExtended.js new file mode 100644 index 0000000..125e599 --- /dev/null +++ b/src/SimplePeerExtended.js @@ -0,0 +1,131 @@ +import * as Y from 'yjs' // eslint-disable-line +import Peer from 'simple-peer/simplepeer.min.js' +const { Int64BE } = require('./int64-buffer.min.js') + +export const CHUNK_SIZE = (1024 * 16) - 512 // 16KB - data header +export const TX_SEND_TTL = 1000 * 30 // 30 seconds +export const MAX_BUFFERED_AMOUNT = 64 * 1024 // simple peer value + +function concatenate (Constructor, arrays) { + let totalLength = 0 + for (const arr of arrays) totalLength += arr.length + const result = new Constructor(totalLength) + let offset = 0 + for (const arr of arrays) { + result.set(arr, offset) + offset += arr.length + } + return result +} + +class SimplePeerExtended extends Peer { + constructor (opts) { + super(opts) + this._opts = opts + this._txOrdinal = 0 + this._rxPackets = [] + this._txPause = false + this.webRTCMessageQueue = [] + this.webRTCPaused = false + } + + encodePacket ({ chunk, txOrd, index, length, totalSize, chunkSize }) { + const encoded = concatenate(Uint8Array, [ + new Uint8Array(new Int64BE(txOrd).toArrayBuffer()), // 8 bytes + new Uint8Array(new Int64BE(index).toArrayBuffer()), // 8 bytes + new Uint8Array(new Int64BE(length).toArrayBuffer()), // 8 bytes + new Uint8Array(new Int64BE(totalSize).toArrayBuffer()), // 8 bytes + new Uint8Array(new Int64BE(chunkSize).toArrayBuffer()), // 8 bytes + chunk // CHUNK_SIZE + ]) + return encoded + } + + decodePacket (array) { + return { + txOrd: new Int64BE(array.slice(0, 8)).toNumber(), + index: new Int64BE(array.slice(8, 16)).toNumber(), + length: new Int64BE(array.slice(16, 24)).toNumber(), + totalSize: new Int64BE(array.slice(24, 32)).toNumber(), + chunkSize: new Int64BE(array.slice(32, 40)).toNumber(), + chunk: array.slice(40) + } + } + + packetArray (array, size) { + const txOrd = this._txOrdinal + this._txOrdinal++ + const chunkedArr = [] + const totalSize = array.length || array.byteLength + let index = 0 + while (index < totalSize) { + chunkedArr.push(array.slice(index, size + index)) + index += size + } + return chunkedArr.map((chunk, index) => { + return this.encodePacket({ + chunk, + txOrd, + index, + totalSize, + length: chunkedArr.length, + chunkSize: chunk.byteLength + }) + }) + } + + _onChannelMessage (event) { + const { data } = event + const packet = this.decodePacket(data) + if (packet.chunk instanceof ArrayBuffer) packet.chunk = new Uint8Array(packet.chunk) + if (packet.chunkSize === packet.totalSize) { + this.push(packet.chunk) + } else { + const data = this._rxPackets.filter((p) => p.txOrd === packet.txOrd) + data.push(packet) + const indices = data.map(p => p.index) + if (new Set(indices).size === packet.length) { + data.sort(this.sortPacketArray) + const chunks = concatenate(Uint8Array, data.map(p => p.chunk)) + this.push(chunks) + setTimeout(() => { this._rxPackets = this._rxPackets.filter((p) => p.txOrd !== packet.txOrd) }, TX_SEND_TTL) + } else { + this._rxPackets.push(packet) + } + } + } + + sortPacketArray (a, b) { return a.index > b.index ? 1 : -1 } + send (chunk) { + if (chunk instanceof ArrayBuffer) chunk = new Uint8Array(chunk) + const chunks = this.packetArray(chunk, CHUNK_SIZE) + this.webRTCMessageQueue = this.webRTCMessageQueue.concat(chunks) + if (this.webRTCPaused) return + this.sendMessageQueued() + } + + sendMessageQueued () { + this.webRTCPaused = false + let message = this.webRTCMessageQueue.shift() + while (message) { + if (this._channel.bufferedAmount && this._channel.bufferedAmount > MAX_BUFFERED_AMOUNT) { + this.webRTCPaused = true + this.webRTCMessageQueue.unshift(message) + const listener = () => { + this._channel.removeEventListener('bufferedamountlow', listener) + this.sendMessageQueued() + } + this._channel.addEventListener('bufferedamountlow', listener) + return + } + try { + super.send(message) + message = this.webRTCMessageQueue.shift() + } catch (error) { + console.warn(error) + } + } + } +} + +export default SimplePeerExtended diff --git a/src/int64-buffer.min.js b/src/int64-buffer.min.js new file mode 100644 index 0000000..81da6bf --- /dev/null +++ b/src/int64-buffer.min.js @@ -0,0 +1,25 @@ +/* +https://github.com/kawanet/int64-buffer +The MIT License (MIT) + +Copyright (c) 2015-2020 Yusuke Kawasaki + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ +var Uint64BE,Int64BE,Uint64LE,Int64LE;!function(t){var r,f="undefined",n=f!==typeof Buffer&&Buffer,e=f!==typeof Uint8Array&&Uint8Array,o=f!==typeof ArrayBuffer&&ArrayBuffer,i=[0,0,0,0,0,0,0,0],u=Array.isArray||function(t){return!!t&&"[object Array]"==Object.prototype.toString.call(t)},a=4294967296;function s(u,s,A){var U=s?0:4,I=s?4:0,L=s?0:3,d=s?1:2,w=s?2:1,m=s?3:0,S=s?b:E,j=s?B:g,x=k.prototype,M="is"+u,N="_"+M;return x.buffer=void 0,x.offset=0,x[N]=!0,x.toNumber=O,x.toString=function(t){var r=this.buffer,f=this.offset,n=_(r,f+U),e=_(r,f+I),o="",i=!A&&2147483648&n;i&&(n=~n,e=a-e);t=t||10;for(;;){var u=n%t*a+e;if(n=Math.floor(n/t),e=Math.floor(u/t),o=(u%t).toString(t)+o,!n&&!e)break}i&&(o="-"+o);return o},x.toJSON=O,x.toArray=c,n&&(x.toBuffer=y),e&&(x.toArrayBuffer=h),k[M]=function(t){return!(!t||!t[N])},t[u]=k,k;function k(t,u,s,c){return this instanceof k?function(t,u,s,c,y){e&&o&&(u instanceof o&&(u=new e(u)),c instanceof o&&(c=new e(c)));if(!(u||s||c||r))return void(t.buffer=p(i,0));if(!v(u,s)){var h=r||Array;y=s,c=u,s=0,u=r===n?n.alloc(8):new h(8)}if(t.buffer=u,t.offset=s|=0,f===typeof c)return;"string"==typeof c?function(t,r,f,n){var e=0,o=f.length,i=0,u=0;"-"===f[0]&&e++;var s=e;for(;e=0))break;u=u*n+c,i=i*n+Math.floor(u/a),u%=a}s&&(i=~i,u?u=a-u:i++);J(t,r+U,i),J(t,r+I,u)}(u,s,c,y||10):v(c,y)?l(u,s,c,y):"number"==typeof y?(J(u,s+U,c),J(u,s+I,y)):c>0?S(u,s,c):c<0?j(u,s,c):l(u,s,i,0)}(this,t,u,s,c):new k(t,u,s,c)}function O(){var t=this.buffer,r=this.offset,f=_(t,r+U),n=_(t,r+I);return A||(f|=0),f?f*a+n:n}function J(t,r,f){t[r+m]=255&f,f>>=8,t[r+w]=255&f,f>>=8,t[r+d]=255&f,f>>=8,t[r+L]=255&f}function _(t,r){return 16777216*t[r+L]+(t[r+d]<<16)+(t[r+w]<<8)+t[r+m]}}function c(t){var f=this.buffer,n=this.offset;return r=null,!1!==t&&u(f)?8===f.length?f:f.slice(n,n+8):p(f,n)}function y(t){var f=this.buffer,e=this.offset;return r=n,!1!==t&&n.isBuffer(f)?8===f.length?f:f.slice(e,e+8):n.from(h.call(this,t))}function h(t){var f=this.buffer,n=this.offset,i=f.buffer;if(r=e,!1!==t&&!f.offset&&i instanceof o)return 8===i.byteLength?i:i.slice(n,n+8);var u=new e(8);return l(u,0,f,n),u.buffer}function v(t,r){var f=t&&t.length;return r|=0,f&&r+8<=f&&"string"!=typeof t[r]}function l(t,r,f,n){r|=0,n|=0;for(var e=0;e<8;e++)t[r++]=255&f[n++]}function p(t,r){return Array.prototype.slice.call(t,r,r+8)}function b(t,r,f){for(var n=r+8;n>r;)t[--n]=255&f,f/=256}function B(t,r,f){var n=r+8;for(f++;n>r;)t[--n]=255&-f^255,f/=256}function E(t,r,f){for(var n=r+8;r