Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bootstrap and pubsub replicators #87

Merged
merged 69 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
375b846
Fork the live replicator and use content routing to find peers.
saul-jb Jul 12, 2023
775641e
Parse the heads fetched from peers.
saul-jb Jul 12, 2023
ee83d98
Send the heads in a stream when requested.
saul-jb Jul 12, 2023
2847d50
Add handler for replcia events.
saul-jb Jul 12, 2023
0e618d0
Publish and parse heads over pubsub.
saul-jb Jul 12, 2023
c20a531
Remove redundant imports.
saul-jb Jul 12, 2023
951a2fe
Subscribe to pubsub channel and fix problems in bootstrapping.
saul-jb Jul 12, 2023
258ac1a
Split live into pubsub and bootstrap replicators.
saul-jb Jul 12, 2023
67a5a32
Export new replicators.
saul-jb Jul 12, 2023
61b433d
Add replicator helpers.
saul-jb Jul 12, 2023
4d6a7d2
Simplify bootstrap replicator with utils.
saul-jb Jul 12, 2023
07b88bc
Simplify pubsub replicator with utils.
saul-jb Jul 12, 2023
fafe19f
Cleanup new replicators.
saul-jb Jul 12, 2023
9014243
More cleanup and make bootstrapping more efficient.
saul-jb Jul 12, 2023
35b6d3b
Fix decode heads util.
saul-jb Jul 12, 2023
e24a28f
Simplify replicator utils.
saul-jb Jul 12, 2023
188fe26
More cleanup.
saul-jb Jul 12, 2023
38576c3
Add options to bootstrap.
saul-jb Jul 13, 2023
9183031
Don't wait for bootstrap to complete.
saul-jb Jul 13, 2023
d8dcf87
Make the bootstrap options optional.
saul-jb Jul 13, 2023
aa83652
Linting.
saul-jb Jul 13, 2023
2387e5f
Increase default bootstrap timeout.
saul-jb Jul 13, 2023
b25e576
Don't broadcast on updates.
saul-jb Jul 13, 2023
d1948e2
Fix bootstrap handle.
saul-jb Jul 13, 2023
7e2f106
fix: don't overwrite peer store data.
saul-jb Jul 13, 2023
39e66d1
refactor: replicator test.
saul-jb Jul 13, 2023
d297cde
test: add pubsub replicator test
saul-jb Jul 14, 2023
d33dc94
fix: expose topic in pubsub replicator
saul-jb Jul 14, 2023
95f688a
test: wait to join pubsub channel
saul-jb Jul 14, 2023
4edcab6
test: add bootstrap replicator test
saul-jb Jul 14, 2023
c40506e
fix: add the entry to the replica write event
saul-jb Jul 14, 2023
fefbb87
refactor: make pubsub replication more efficient
saul-jb Jul 14, 2023
f3b007e
wip: add protobuf for encoding heads
saul-jb Jul 14, 2023
313be42
perf: use protobuf to encode heads
saul-jb Jul 14, 2023
d6cb537
wip: add heads request method
saul-jb Jul 14, 2023
c986245
fix: add seed parameter to request method
saul-jb Jul 14, 2023
e350234
wip: add create filter and get difference methods
saul-jb Jul 14, 2023
7595eb3
wip: add hash heads method
saul-jb Jul 14, 2023
528a752
wip: remove encode/decode methods from replicator utils
saul-jb Jul 15, 2023
a30b51e
wip: add head exchange protocol
saul-jb Jul 15, 2023
13576e0
refactor: remove filter utils from replicator
saul-jb Jul 15, 2023
8d1da0b
wip: add match parameter to heads protobuf
saul-jb Jul 16, 2023
e479e8b
wip: add message type enum and method
saul-jb Jul 16, 2023
742f41f
refactor: rework protcol logic to class
saul-jb Jul 16, 2023
62924ec
refactor: use both peers in seed calculation
saul-jb Jul 17, 2023
e11ed67
fix: throw error if stream is closed
saul-jb Jul 17, 2023
ed65d2c
fix: add new heads to cache
saul-jb Jul 17, 2023
333abc8
fix: filter creation and piping
saul-jb Jul 17, 2023
3c6867c
feat: use head exchange protocol on bootstrap
saul-jb Jul 17, 2023
72e679e
fix: end the writer and ignore errors in the exchange
saul-jb Jul 17, 2023
2cda5be
fix: add missing packages
saul-jb Jul 17, 2023
01f5414
feat: add config options to bootstrap replicator
saul-jb Jul 17, 2023
7975e12
fix: change seed every round
saul-jb Jul 17, 2023
6262885
test: disable reverse sync in bootstrap test
saul-jb Jul 17, 2023
751a4e6
style: linting
saul-jb Jul 17, 2023
b5a6bfb
fix: change protocol name
saul-jb Jul 17, 2023
0f5b72b
chore: update package lock
saul-jb Jul 17, 2023
570956d
chore: regenerate package lock
saul-jb Jul 17, 2023
c8fc3fc
Merge branch 'master' into test
tabcat Jul 18, 2023
7deb468
test: skip bootstrap replicator tests for browser
tabcat Sep 17, 2023
d5c25a1
test: skip zzzync replicator tests for browser
tabcat Sep 17, 2023
3aa50ad
remove console logs from zzzync replicator
tabcat Sep 17, 2023
fd410eb
chore: commit package-lock file
tabcat Sep 17, 2023
f06204a
test: remove advertise option so tests exit?
tabcat Sep 17, 2023
ce5e79c
Merge branch 'master' into feat/replicators
tabcat Sep 17, 2023
c01f508
style: fix linter errors
tabcat Sep 18, 2023
5586eb7
test: fix w3 token env variable logic
tabcat Sep 18, 2023
6844533
chore: build before tests to make .aegir.js
tabcat Sep 18, 2023
6bcc17d
test: fix noToken logic
tabcat Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,159 changes: 1,990 additions & 1,169 deletions package-lock.json

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
},
"prettier": "prettier-config-standard",
"scripts": {
"generate": "protons src/message/*.proto",
"prepublishOnly": "npm run build",
"reset": "rm -rf node_modules package-lock.json && npm install",
"lint": "aegir lint",
Expand All @@ -48,7 +49,7 @@
"build": "aegir build && tsc-alias && ln -sf ./dist/test/.aegir.js ./.aegir.js",
"docs": "NODE_OPTIONS=--max_old_space_size=4096 aegir docs",
"test:chrome": "aegir test -b -t browser -f ./dist/test/**/test-*.js",
"test:node": "npm run clean && aegir test -b -t node -f ./dist/test/**/test-*.js -- --exit && npm run clean",
"test:node": "npm run build && npm run clean && aegir test -b -t node -f ./dist/test/**/test-*.js -- --exit && npm run clean",
"test": "npm run test:node && npm run test:chrome",
"clean": "git clean -fq test/fixtures && git clean -fqX test/temp .polendina/ && git checkout -- test/fixtures"
},
Expand Down Expand Up @@ -104,6 +105,7 @@
"path-browserify": "^1.0.1",
"prettier": "^2.7.1",
"prettier-config-standard": "^5.0.0",
"protons": "^7.0.4",
"tsc-alias": "^1.8.7",
"typescript": "^5.1.6",
"wherearewe": "^2.0.1"
Expand All @@ -116,14 +118,21 @@
"@libp2p/interfaces": "^3.3.2",
"@libp2p/peer-id": "^2.0.3",
"@libp2p/peer-id-factory": "^2.0.3",
"@open-draft/deferred-promise": "^2.1.0",
"@tabcat/zzzync": "^2.0.0",
"datastore-core": "^9.1.1",
"fission-bloom-filters": "^1.7.1",
"helia": "^1.1.2",
"interface-blockstore": "5.2",
"it-all": "^2.0.0",
"it-concat": "^3.0.1",
"it-drain": "3.0",
"it-length-prefixed": "^9.0.1",
"it-pushable": "^3.2.1",
"multiformats": "^11.0.1",
"p-queue": "^7.3.0",
"protons-runtime": "^5.0.1",
"streaming-iterables": "^7.1.0",
"uint8arrays": "^4.0.3",
"w3name": "^1.0.8",
"web3.storage": "^4.5.4"
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export { Entry, basalEntry } from '@/entry/basal/index.js'
export { Identity, basalIdentity } from '@/identity/basal/index.js'
export { Keyvalue, keyvalueStore } from '@/store/keyvalue/index.js'
export { liveReplicator } from '@/replicator/live/index.js'
export { pubsubReplicator } from '@/replicator/pubsub/index.js'
export { bootstrapReplicator } from '@/replicator/bootstrap/index.js'
export { Address } from '@/manifest/index.js'
export type { Manifest } from '@/manifest/index.js'
export type { Playable } from '@/utils/playable.js'
Expand Down
14 changes: 14 additions & 0 deletions src/message/heads.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

message Message {
message Filter {
bytes data = 1;
uint32 hashes = 2;
optional uint32 seed = 3;
}

repeated bytes heads = 1;
optional Filter filter = 2;
optional bytes hash = 3;
optional bool match = 4;
}
173 changes: 173 additions & 0 deletions src/message/heads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/* eslint-disable import/export */
/* eslint-disable complexity */
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */

import { encodeMessage, decodeMessage, message } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface Message {
heads: Uint8Array[]
filter?: Message.Filter
hash?: Uint8Array
match?: boolean
}

export namespace Message {
export interface Filter {
data: Uint8Array
hashes: number
seed?: number
}

export namespace Filter {
let _codec: Codec<Filter>

export const codec = (): Codec<Filter> => {
if (_codec == null) {
_codec = message<Filter>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if ((obj.data != null && obj.data.byteLength > 0)) {
w.uint32(10)
w.bytes(obj.data)
}

if ((obj.hashes != null && obj.hashes !== 0)) {
w.uint32(16)
w.uint32(obj.hashes)
}

if (obj.seed != null) {
w.uint32(24)
w.uint32(obj.seed)
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length) => {
const obj: any = {
data: new Uint8Array(0),
hashes: 0
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.data = reader.bytes()
break
case 2:
obj.hashes = reader.uint32()
break
case 3:
obj.seed = reader.uint32()
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<Filter>): Uint8Array => {
return encodeMessage(obj, Filter.codec())
}

export const decode = (buf: Uint8Array | Uint8ArrayList): Filter => {
return decodeMessage(buf, Filter.codec())
}
}

let _codec: Codec<Message>

export const codec = (): Codec<Message> => {
if (_codec == null) {
_codec = message<Message>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if (obj.heads != null) {
for (const value of obj.heads) {
w.uint32(10)
w.bytes(value)
}
}

if (obj.filter != null) {
w.uint32(18)
Message.Filter.codec().encode(obj.filter, w)
}

if (obj.hash != null) {
w.uint32(26)
w.bytes(obj.hash)
}

if (obj.match != null) {
w.uint32(32)
w.bool(obj.match)
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length) => {
const obj: any = {
heads: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.heads.push(reader.bytes())
break
case 2:
obj.filter = Message.Filter.codec().decode(reader, reader.uint32())
break
case 3:
obj.hash = reader.bytes()
break
case 4:
obj.match = reader.bool()
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<Message>): Uint8Array => {
return encodeMessage(obj, Message.codec())
}

export const decode = (buf: Uint8Array | Uint8ArrayList): Message => {
return decodeMessage(buf, Message.codec())
}
}
26 changes: 13 additions & 13 deletions src/replica/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import { decodeCbor, encodeCbor } from '@/utils/block.js'

const rootHashKey = new Key('rootHash')

interface ReplicaEvents {
write: CustomEvent<undefined>
export interface ReplicaEvents {
write: CustomEvent<EntryInstance<any>>
update: CustomEvent<undefined>
}

Expand All @@ -44,7 +44,7 @@ export class Replica extends Playable {
readonly components: Pick<DbComponents, 'entry' | 'identity'>

#datastore: Datastore
#blockstore: Blockstore
blockstore: Blockstore
#graph: Graph | null

_queue: PQueue
Expand All @@ -69,10 +69,10 @@ export class Replica extends Playable {
const starting = async (): Promise<void> => {
const root: BlockView<GraphRoot> | null = await getRoot(
this.#datastore,
this.#blockstore
this.blockstore
).catch(() => null)

this.#graph = new Graph(this.#blockstore, root?.value)
this.#graph = new Graph(this.blockstore, root?.value)
await start(this.#graph)

if (root?.cid == null) {
Expand All @@ -95,7 +95,7 @@ export class Replica extends Playable {
this.components = components

this.#datastore = datastore
this.#blockstore = blockstore
this.blockstore = blockstore
this.#graph = null
this._queue = new PQueue({})

Expand Down Expand Up @@ -133,7 +133,7 @@ export class Replica extends Playable {
direction: 'descend'
}
): Promise<Array<EntryInstance<any>>> {
const blockstore = this.#blockstore
const blockstore = this.blockstore
const entry = this.components.entry
const identity = this.components.identity

Expand Down Expand Up @@ -180,7 +180,7 @@ export class Replica extends Playable {
}

async add (entries: Array<EntryInstance<any>>): Promise<void> {
if (this.#datastore == null || this.#blockstore == null) {
if (this.#datastore == null || this.blockstore == null) {
throw new Error('replica not started')
}

Expand All @@ -193,8 +193,8 @@ export class Replica extends Playable {
continue
}

await this.#blockstore.put(entry.cid, entry.block.bytes)
await this.#blockstore.put(entry.identity.auth, entry.identity.block.bytes)
await this.blockstore.put(entry.cid, entry.block.bytes)
await this.blockstore.put(entry.identity.auth, entry.identity.block.bytes)

if (await this.access.canAppend(entry)) {
await this.graph.add(entry.cid, entry.next)
Expand All @@ -221,10 +221,10 @@ export class Replica extends Playable {
refs: [] // refs are empty for now
})

await this.#blockstore.put(entry.cid, entry.block.bytes)
await this.blockstore.put(entry.cid, entry.block.bytes)

return await this.add([entry]).then(() => {
this.events.dispatchEvent(new CustomEvent<undefined>('write'))
this.events.dispatchEvent(new CustomEvent<EntryInstance<any>>('write', { detail: entry }))
return entry
})
}
Expand Down Expand Up @@ -278,7 +278,7 @@ export class Replica extends Playable {

async #updateRoot (): Promise<void> {
const block = await encodeRoot(this.graph.root)
await setRoot(this.#datastore, this.#blockstore, block)
await setRoot(this.#datastore, this.blockstore, block)
this.root = block.cid
this.events.dispatchEvent(new CustomEvent<undefined>('update'))
}
Expand Down
Loading
Loading