Skip to content

Commit

Permalink
chore(api): update protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
heapwolf committed Jul 24, 2024
1 parent 3c49816 commit 0bf46da
Showing 1 changed file with 42 additions and 12 deletions.
54 changes: 42 additions & 12 deletions api/latica/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ export class Peer {
natType = NAT.UNKNOWN
nextNatType = NAT.UNKNOWN
clusters = {}
syncs = {}
reflectionId = null
reflectionTimeout = null
reflectionStage = 0
Expand Down Expand Up @@ -283,7 +284,7 @@ export class Peer {
//
// The purpose of this.config is to seperate transitioned state from initial state.
//
this.config = {
this.config = { // TODO(@heapwolf): Object.freeze this maybe
keepalive: DEFAULT_KEEP_ALIVE,
...config
}
Expand Down Expand Up @@ -585,6 +586,7 @@ export class Peer {

return {
peers,
syncs: this.syncs,
config: this.config,
data: [...this.cache.data.entries()],
unpublished: this.unpublished
Expand Down Expand Up @@ -617,8 +619,11 @@ export class Peer {
}

async reconnect () {
this.lastUpdate = 0
this.requestReflection()
for (const cluster of Object.values(this.clusters)) {
for (const subcluster of Object.values(cluster)) {
this.join(subcluster.sharedKey, subcluster)
}
}
}

async disconnect () {
Expand Down Expand Up @@ -1145,7 +1150,7 @@ export class Peer {

if (this.gate.has(pid)) return
this.returnRoutes.set(p.usr3.toString('hex'), {})
this.gate.set(pid, 1) // don't accidentally spam
this.gate.set(pid, 1) // prevent accidental spam

this._onDebug(`-> QUERY (type=question, query=${query}, packet=${pid.slice(0, 8)})`)

Expand Down Expand Up @@ -1233,6 +1238,30 @@ export class Peer {

if (firstContact && this.onConnection) {
this.onConnection(packet, peer, port, address)

const now = Date.now()
const key = [peer.address, peer.port].join(':')
let first = false

//
// If you've never sync'd before, you can ask for 6 hours of data from
// other peers. If we have synced with a peer before we can just ask for
// data that they have seen since then, this will avoid the risk of
// spamming them and getting rate-limited.
//
if (!this.syncs[key]) {
this.syncs[key] = now - Packet.ttl
first = true
}

const lastSyncSeconds = (now - this.syncs[key]) / 1000
const syncWindow = this.config.syncWindow ?? 6000

if (first || now - this.syncs[key] > syncWindow) {
this.sync(peer.peerId, this.syncs[key])
this._onDebug(`-> SYNC SEND (peerId=${peer.peerId.slice(0, 6)}, address=${key}, since=${lastSyncSeconds} seconds ago)`)
this.syncs[key] = now
}
}
}

Expand Down Expand Up @@ -1332,11 +1361,8 @@ export class Peer {
this.metrics.i[packet.type]++

const pid = packet.packetId.toString('hex')
if (this.gate.has(pid)) return
this.gate.set(pid, 1)

const queryTimestamp = parseInt(packet.usr1.toString(), 10)
const queryId = packet.usr3.toString('hex')
const queryTimestamp = parseInt(packet.usr1.toString(), 10)
const queryType = parseInt(packet.usr4.toString(), 10)

// if the timestamp in usr1 is older than now - 2s, bail
Expand All @@ -1350,7 +1376,7 @@ export class Peer {
//
// receiving an answer
//
if (this.returnRoutes.has(queryId)) {
if (this.returnRoutes.has(queryId) && type === 'answer') {
rinfo = this.returnRoutes.get(queryId)

let p = packet.copy()
Expand All @@ -1365,7 +1391,8 @@ export class Peer {
}

if (!rinfo.address) return
} else {
} else if (type === 'question') {
if (this.gate.has(pid)) return
//
// receiving a query
//
Expand Down Expand Up @@ -1398,14 +1425,17 @@ export class Peer {
p.usr3 = packet.usr3 // ensure the packet has the queryId
p.usr4 = Buffer.from(String(2)) // mark it as an answer packet
this.send(await Packet.encode(p), rinfo.port, rinfo.address)
this.gate.set(pid, 1)
}
return
}
}

if (packet.hops >= this.maxHops) return
if (this.gate.has(pid)) return

this._onDebug('>> QUERY RELAY', port, address)
return await this.mcast(packet)
await this.mcast(packet)
}

/**
Expand Down Expand Up @@ -1570,7 +1600,7 @@ export class Peer {
})
}

this._mainLoop(Date.now())
this._setTimeout(() => this._mainLoop(Date.now()), 1024)

if (this.onNat) this.onNat(this.natType)

Expand Down

0 comments on commit 0bf46da

Please sign in to comment.