Skip to content

Commit

Permalink
chore: address review
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 15, 2019
1 parent 4cc9736 commit 340edf5
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 15 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"it-protocol-buffers": "^0.2.0",
"latency-monitor": "~0.2.1",
"libp2p-crypto": "^0.17.1",
"libp2p-interfaces": "^0.1.4",
"libp2p-interfaces": "^0.1.5",
"mafmt": "^7.0.0",
"merge-options": "^1.0.1",
"moving-average": "^1.0.0",
Expand Down
7 changes: 7 additions & 0 deletions src/registrar.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')

const Topology = require('libp2p-interfaces/src/topology')
const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')
const { Connection } = require('libp2p-interfaces/src/connection')
const PeerInfo = require('peer-info')

Expand Down Expand Up @@ -109,6 +111,11 @@ class Registrar {
* @return {string} registrar identifier
*/
register (topology) {
assert(
Topology.isTopology(topology) ||
MulticodecTopology.isMulticodecTopology(topology),
'topology must be an instance of interfaces/topology')

// Create topology
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()

Expand Down
6 changes: 3 additions & 3 deletions src/upgrader.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class Upgrader {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
connection.addStream(stream, protocol)
this._onStream({ connection, stream, protocol, remotePeer })
this._onStream({ connection, stream, protocol })
} catch (err) {
log.error(err)
}
Expand Down Expand Up @@ -254,9 +254,9 @@ class Upgrader {
* @param {Stream} options.stream
* @param {string} options.protocol
*/
_onStream ({ connection, stream, protocol, remotePeer }) {
_onStream ({ connection, stream, protocol }) {
const handler = this.protocols.get(protocol)
handler({ connection, stream, protocol, remotePeer })
handler({ connection, stream, protocol })
}

/**
Expand Down
32 changes: 22 additions & 10 deletions test/pubsub/operation.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ describe('Pubsub subsystem operates correctly', () => {
})
})

// TODO: Needs identify push
describe.skip('pubsub started after connect', () => {
describe('pubsub started after connect', () => {
beforeEach(async () => {
libp2p = await create(mergeOptions(subsystemOptions, {
peerInfo
Expand Down Expand Up @@ -132,7 +131,7 @@ describe('Pubsub subsystem operates correctly', () => {
sinon.restore()
})

it.skip('should get notified of connected peers after starting', async () => {
it('should get notified of connected peers after starting', async () => {
const connection = await libp2p.dial(remAddr)

expect(connection).to.exist()
Expand All @@ -141,30 +140,43 @@ describe('Pubsub subsystem operates correctly', () => {

remoteLibp2p.pubsub.start()

// Wait for
// Validate
await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1)

expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1)
expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1)
})

it.skip('should receive pubsub messages', async () => {
it('should receive pubsub messages', async function () {
this.timeout(10e3)
const defer = pDefer()
const libp2pId = libp2p.peerInfo.id.toB58String()
const topic = 'test-topic'
const data = 'hey!'

await libp2p.dial(remAddr)

remoteLibp2p.pubsub.start()

// TODO: wait for
await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1)

let subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.not.include(topic)

libp2p.pubsub.subscribe(topic)
libp2p.pubsub.once(topic, (msg) => {
libp2p.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data)
defer.resolve()
})

libp2p.pubsub.publish(topic, data)
subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.include(topic)

// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic)
return subscribedPeers.includes(libp2pId)
})

remoteLibp2p.pubsub.publish(topic, data)

await defer.promise
})
Expand Down
13 changes: 12 additions & 1 deletion test/registrar/registrar.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ describe('registrar', () => {
throw new Error('should fail to register a protocol if no multicodec is provided')
})

// TODO: not valid topology
it('should fail to register a protocol if an invalid topology is provided', () => {
const fakeTopology = {
random: 1
}
try {
registrar.register()
} catch (err) {
expect(err).to.exist(fakeTopology)
return
}
throw new Error('should fail to register a protocol if an invalid topology is provided')
})
})

describe('registration', () => {
Expand Down

0 comments on commit 340edf5

Please sign in to comment.