Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend simple peer (not peerjs, oops) to handle buffered/packet transmission; add raw dependency w/MIT license #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions src/SimplePeerExtended.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import * as Y from 'yjs' // eslint-disable-line
import Peer from 'simple-peer/simplepeer.min.js'
const { Int64BE } = require('./int64-buffer.min.js')

export const CHUNK_SIZE = (1024 * 16) - 512 // 16KB - data header
export const TX_SEND_TTL = 1000 * 30 // 30 seconds
export const MAX_BUFFERED_AMOUNT = 64 * 1024 // simple peer value

function concatenate (Constructor, arrays) {
let totalLength = 0
for (const arr of arrays) totalLength += arr.length
const result = new Constructor(totalLength)
let offset = 0
for (const arr of arrays) {
result.set(arr, offset)
offset += arr.length
}
return result
}

class SimplePeerExtended extends Peer {
constructor (opts) {
super(opts)
this._opts = opts
this._txOrdinal = 0
this._rxPackets = []
this._txPause = false
this.webRTCMessageQueue = []
this.webRTCPaused = false
}

encodePacket ({ chunk, txOrd, index, length, totalSize, chunkSize }) {
const encoded = concatenate(Uint8Array, [
new Uint8Array(new Int64BE(txOrd).toArrayBuffer()), // 8 bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the reason why you don't use BigUint64Array is that it is not yet supported in Safari.

I developed an efficient encoder exactly for this problem: https://github.com/dmonad/lib0/blob/main/encoding.js

import * as encoding from 'lib0/encoding'
import * as decoding from 'lib0/decoding'

// example of encoding unsigned integers and strings efficiently

const encoder = new encoding.createEncoder()
encoding.writeVarUint(encoder, 256) 
encoding.writeVarString(encoder, 'Hello world!')
const buf = encoding.toUint8Array(encoder)

const decoder = new decoding.createDecoder(buf) 
decoding.readVarUint(decoder) // => 256 
decoding.readVarString(decoder) // => 'Hello world!' 
decoding.hasContent(decoder) // => false - all data is read

The documentation for other encoding techniques is here: https://github.com/dmonad/lib0

But I realize that this is already working in the current state. But maybe we can avoid pulling in more dependencies that are superflous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a header, we need to pad the values. Note the // 8 bytes comments I've mocked up a replacement class like:

class Int64 {
  constuctor (int64) {
    this.encoder = new encoding.createEncoder()
    encoding.writeVarUint(encoder, int64) 
  }
  toArrayBuffer () {
    return encoding.toUint8Array(this.encoder)
  }
}

However, when I test the recommended dependency, we get:

...
encoding.writeVarUint(encoder, 1)
encoding.toUint8Array(encoder)
> Uint8Array(1) [ 1 ]

its not obvious to me how to do pad/unpad safely, so the whole decode/encode would need a rewrite

new Uint8Array(new Int64BE(index).toArrayBuffer()), // 8 bytes
new Uint8Array(new Int64BE(length).toArrayBuffer()), // 8 bytes
new Uint8Array(new Int64BE(totalSize).toArrayBuffer()), // 8 bytes
new Uint8Array(new Int64BE(chunkSize).toArrayBuffer()), // 8 bytes
chunk // CHUNK_SIZE
])
return encoded
}

decodePacket (array) {
return {
txOrd: new Int64BE(array.slice(0, 8)).toNumber(),
index: new Int64BE(array.slice(8, 16)).toNumber(),
length: new Int64BE(array.slice(16, 24)).toNumber(),
totalSize: new Int64BE(array.slice(24, 32)).toNumber(),
chunkSize: new Int64BE(array.slice(32, 40)).toNumber(),
chunk: array.slice(40)
}
}

packetArray (array, size) {
const txOrd = this._txOrdinal
this._txOrdinal++
const chunkedArr = []
const totalSize = array.length || array.byteLength
let index = 0
while (index < totalSize) {
chunkedArr.push(array.slice(index, size + index))
index += size
}
return chunkedArr.map((chunk, index) => {
return this.encodePacket({
chunk,
txOrd,
index,
totalSize,
length: chunkedArr.length,
chunkSize: chunk.byteLength
})
})
}

_onChannelMessage (event) {
const { data } = event
const packet = this.decodePacket(data)
if (packet.chunk instanceof ArrayBuffer) packet.chunk = new Uint8Array(packet.chunk)
if (packet.chunkSize === packet.totalSize) {
this.push(packet.chunk)
} else {
const data = this._rxPackets.filter((p) => p.txOrd === packet.txOrd)
data.push(packet)
const indices = data.map(p => p.index)
if (new Set(indices).size === packet.length) {
data.sort(this.sortPacketArray)
const chunks = concatenate(Uint8Array, data.map(p => p.chunk))
this.push(chunks)
setTimeout(() => { this._rxPackets = this._rxPackets.filter((p) => p.txOrd !== packet.txOrd) }, TX_SEND_TTL)
} else {
this._rxPackets.push(packet)
}
}
}

sortPacketArray (a, b) { return a.index > b.index ? 1 : -1 }
send (chunk) {
if (chunk instanceof ArrayBuffer) chunk = new Uint8Array(chunk)
const chunks = this.packetArray(chunk, CHUNK_SIZE)
this.webRTCMessageQueue = this.webRTCMessageQueue.concat(chunks)
if (this.webRTCPaused) return
this.sendMessageQueued()
}

sendMessageQueued () {
this.webRTCPaused = false
let message = this.webRTCMessageQueue.shift()
while (message) {
if (this._channel.bufferedAmount && this._channel.bufferedAmount > MAX_BUFFERED_AMOUNT) {
this.webRTCPaused = true
this.webRTCMessageQueue.unshift(message)
const listener = () => {
this._channel.removeEventListener('bufferedamountlow', listener)
this.sendMessageQueued()
}
this._channel.addEventListener('bufferedamountlow', listener)
return
}
try {
super.send(message)
message = this.webRTCMessageQueue.shift()
} catch (error) {
console.warn(error)
}
}
}
}

export default SimplePeerExtended
25 changes: 25 additions & 0 deletions src/int64-buffer.min.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/y-webrtc.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import * as math from 'lib0/math.js'
import { createMutex } from 'lib0/mutex.js'

import * as Y from 'yjs' // eslint-disable-line
import Peer from 'simple-peer/simplepeer.min.js'

// import Peer from 'simple-peer/simplepeer.min.js'
import Peer from './SimplePeerExtended'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting. So you implemented a layer around simple-peer that handles this issue.

Would it be possible that you publish a separate package that we can include as a polyfill for the default implementation? This is already done in y-websocket where we define the WebSocket as an argument.

new WebsocketProvider(URL, room, { WebSocket: MyCustomWebsocketPolyfill })

We could do something similar to y-webrtc without breaking the existing API.

new WebrtcProvider(room, yjs, { SimplePeer: SimplePeerExtended })

I'd prefer that approach because it allows us to test out your implementation before breaking everyone else's existing deployments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can publish a module and put something up on github, but it seems like you'd need move SimplePeer to a peerDependency to avoid duplicating code, otherwise your library is pulling in Peer while your user is also pulling in a modified peer.

Also, it looks like WebrtcConn is what's using the Peer and that's being called from SignalingConn...etc...

I can't figure out how you'd thread that option into the proper slot because it call comes from a global import Peer from 'simple-peer/simplepeer.min.js'


import * as syncProtocol from 'y-protocols/sync.js'
import * as awarenessProtocol from 'y-protocols/awareness.js'
Expand Down