Skip to content

Commit

Permalink
feat: peer-store v0
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 2, 2019
1 parent a23d4d2 commit 3468fca
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 15 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
"once": "^1.4.0",
"p-queue": "^6.1.1",
"p-settle": "^3.1.0",
"peer-book": "^0.9.1",
"peer-id": "^0.13.3",
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
Expand Down
30 changes: 16 additions & 14 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const promisify = require('promisify-es6')
const each = require('async/each')
const nextTick = require('async/nextTick')

const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
Expand All @@ -29,6 +28,7 @@ const { codes } = require('./errors')
const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')

const notStarted = (action, state) => {
return errCode(
Expand All @@ -54,21 +54,23 @@ class Libp2p extends EventEmitter {

this.datastore = this._options.datastore
this.peerInfo = this._options.peerInfo
this.peerBook = this._options.peerBook || new PeerBook()
this.peerStore = new PeerStore()

this._modules = this._options.modules
this._config = this._options.config
this._transport = [] // Transport instances/references
this._discovery = [] // Discovery service instances/references

// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)

// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)

this.peerStore.put(peerInfo)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
Expand Down Expand Up @@ -179,7 +181,7 @@ class Libp2p extends EventEmitter {

// Once we start, emit and dial any peers we may have already discovered
this.state.on('STARTED', () => {
this.peerBook.getAllArray().forEach((peerInfo) => {
this.peerStore.getAllArray().forEach((peerInfo) => {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})
Expand Down Expand Up @@ -244,7 +246,7 @@ class Libp2p extends EventEmitter {

/**
* Dials to the provided peer. If successful, the `PeerInfo` of the
* peer will be added to the nodes `PeerBook`
* peer will be added to the nodes `peerStore`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {object} options
Expand All @@ -257,7 +259,7 @@ class Libp2p extends EventEmitter {

/**
* Dials to the provided peer and handshakes with the given protocol.
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
* If successful, the `PeerInfo` of the peer will be added to the nodes `peerStore`,
* and the `Connection` will be sent in the callback
*
* @async
Expand All @@ -278,7 +280,13 @@ class Libp2p extends EventEmitter {

// If a protocol was provided, create a new stream
if (protocols) {
return connection.newStream(protocols)
const stream = await connection.newStream(protocols)
const peerInfo = getPeerInfo(connection.remotePeer)

peerInfo.protocols.add(stream.protocol)
this.peerStore.put(peerInfo)

return stream
}

return connection
Expand Down Expand Up @@ -368,12 +376,6 @@ class Libp2p extends EventEmitter {
* the `peer:discovery` event. If auto dial is enabled for libp2p
* and the current connection count is under the low watermark, the
* peer will be dialed.
*
* TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
* it would be ideal if only new peers were emitted. Currently, with
* other modules adding peers to the `PeerBook` we have no way of knowing
* if a peer is new or not, so it has to be emitted.
*
* @private
* @param {PeerInfo} peerInfo
*/
Expand All @@ -382,7 +384,7 @@ class Libp2p extends EventEmitter {
log.error(new Error(codes.ERR_DISCOVERED_SELF))
return
}
peerInfo = this.peerBook.put(peerInfo)
peerInfo = this.peerStore.put(peerInfo)

if (!this.isStarted()) return

Expand Down
3 changes: 3 additions & 0 deletions src/peer-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Peerstore

WIP
155 changes: 155 additions & 0 deletions src/peer-store/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
'use strict'

const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')
const errCode = require('err-code')

const { EventEmitter } = require('events')

const PeerInfo = require('peer-info')

/**
* Responsible for managing known peers, as well as their addresses and metadata
* @fires PeerStore#peer Emitted when a peer is connected to this node
* @fires PeerStore#change:protocols
* @fires PeerStore#change:multiaddrs
*/
class PeerStore extends EventEmitter {
constructor () {
super()

/**
* Map of peers
*
* @type {Map<string, PeerInfo>}
*/
this.peers = new Map()

// TODO: Track ourselves. We should split `peerInfo` up into its pieces so we get better
// control and observability. This will be the initial step for removing PeerInfo
// https://github.com/libp2p/go-libp2p-core/blob/master/peerstore/peerstore.go
// this.addressBook = new Map()
// this.protoBook = new Map()
}

/**
* Stores the peerInfo of a new peer.
* If already exist, its info is updated.
* @param {PeerInfo} peerInfo
*/
put (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
this.update(peerInfo)
} else {
this.add(peerInfo)

// Emit the new peer found
this.emit('peer', peerInfo)
}
}

/**
* Add a new peer to the store.
* @param {PeerInfo} peerInfo
*/
add (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

this.peers.set(peerInfo.id.toB58String(), peerInfo)
}

/**
* Updates an already known peer.
* If already exist, updates ids info if outdated.
* @param {PeerInfo} peerInfo
*/
update (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

const recorded = this.peers.get(peerInfo.id.toB58String())

// pass active connection state
const ma = peerInfo.isConnected()
if (ma) {
recorded.connect(ma)
}

// Verify new multiaddrs
// TODO: better track added and removed multiaddrs
if (peerInfo.multiaddrs.size || recorded.multiaddrs.size) {
recorded.multiaddrs = peerInfo.multiaddrs

this.emit('change:multiaddrs', {
peerInfo: recorded,
multiaddrs: Array.from(recorded.multiaddrs)
})
}

// Update protocols
// TODO: better track added and removed protocols
if (peerInfo.protocols.size || recorded.protocols.size) {
recorded.protocols = new Set(peerInfo.protocols)

this.emit('change:protocols', {
peerInfo: recorded,
protocols: Array.from(recorded.protocols)
})
}

// Add the public key if missing
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
recorded.id.pubKey = peerInfo.id.pubKey
}
}

/**
* Get the info to the given id.
* @param {string} peerId b58str id
* @returns {PeerInfo}
*/
get (peerId) {
const peerInfo = this.peers.get(peerId)

if (peerInfo) {
return peerInfo
}

throw errCode(new Error('PeerInfo was not found'), 'ERR_NO_PEER_INFO')
}

/**
* Get an array with all peers known.
* @returns {Array<PeerInfo>}
*/
getAllArray () {
return Array.from(this.peers.values())
}

/**
* Remove the info of the peer with the given id.
* @param {string} peerId b58str id
* @returns {boolean} true if found and removed
*/
remove (peerId) {
return this.peers.delete(peerId)
}

/**
* Replace the info stored of the given peer.
* @param {PeerInfo} peerInfo
* @returns {void}
*/
replace (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

this.remove(peerInfo.id.toB58String())
this.add(peerInfo)
}
}

module.exports = PeerStore
68 changes: 68 additions & 0 deletions test/peer-store/peer-store.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const sinon = require('sinon')

const mergeOptions = require('merge-options')

const multiaddr = require('multiaddr')
const Libp2p = require('../../src')

const baseOptions = require('../utils/base-options')
const peerUtils = require('../utils/creators/peer')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')

describe('peer-store on dial', () => {
let peerInfo
let remotePeerInfo
let libp2p
let remoteLibp2p
let remoteAddr

before(async () => {
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo: remotePeerInfo
}))

await remoteLibp2p.transportManager.listen([listenAddr])
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
})

after(async () => {
sinon.restore()
await remoteLibp2p.stop()
libp2p && await libp2p.stop()
})

it('should put the remote peerInfo after dial and emit event', async () => {
// TODO: needs crypto PR fix
// const remoteId = remotePeerInfo.id.toB58String()
const remoteId = peerInfo.id.toB58String()

libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo
}))

sinon.spy(libp2p.peerStore, 'put')
sinon.spy(libp2p.peerStore, 'add')
sinon.spy(libp2p.peerStore, 'update')

const connection = await libp2p.dial(remoteAddr)
await connection.close()

expect(libp2p.peerStore.put.callCount).to.equal(1)
expect(libp2p.peerStore.add.callCount).to.equal(1)
expect(libp2p.peerStore.update.callCount).to.equal(0)

const storedPeer = libp2p.peerStore.get(remoteId)
expect(storedPeer).to.exist()
})
})

describe('peer-store on discovery', () => {
// TODO: implement with discovery
})
Loading

0 comments on commit 3468fca

Please sign in to comment.