diff --git a/packages/backend/package-lock.json b/packages/backend/package-lock.json index 665ee2ce67..3672afa985 100644 --- a/packages/backend/package-lock.json +++ b/packages/backend/package-lock.json @@ -58,6 +58,7 @@ "https-proxy-agent": "^7.0.5", "image-size": "^1.0.1", "it-drain": "^3.0.7", + "it-first": "^3.0.6", "it-pipe": "^3.0.1", "it-ws": "^6.1.5", "joi": "^17.8.1", @@ -1898,11 +1899,6 @@ "it-peekable": "^3.0.0" } }, - "node_modules/@helia/unixfs/node_modules/it-first": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", - "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" - }, "node_modules/@helia/unixfs/node_modules/it-glob": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/it-glob/-/it-glob-2.0.7.tgz", @@ -5017,11 +5013,6 @@ "node": ">=12.0.0" } }, - "node_modules/@libp2p/ping/node_modules/it-first": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", - "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" - }, "node_modules/@libp2p/ping/node_modules/protons-runtime": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/protons-runtime/-/protons-runtime-5.5.0.tgz", @@ -19931,11 +19922,6 @@ "it-peekable": "^3.0.0" } }, - "node_modules/helia/node_modules/it-first": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", - "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" - }, "node_modules/helia/node_modules/it-foreach": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/it-foreach/-/it-foreach-2.1.1.tgz", @@ -23869,6 +23855,11 @@ "resolved": "https://registry.npmjs.org/it-drain/-/it-drain-3.0.7.tgz", "integrity": "sha512-vy6S1JKjjHSIFHgBpLpD1zhkCRl3z1zYWUxE14+kAYf+BL9ssWSFImJfhl361IIcwr0ofw8etzg11VqqB+ntUA==" }, + "node_modules/it-first": { + "version": "3.0.6", + "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", + "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" + }, "node_modules/it-length-prefixed-stream": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/it-length-prefixed-stream/-/it-length-prefixed-stream-1.2.0.tgz", @@ -33300,11 +33291,6 @@ "it-peekable": "^3.0.0" } }, - "it-first": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", - "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" - }, "it-glob": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/it-glob/-/it-glob-2.0.7.tgz", @@ -35843,11 +35829,6 @@ "tslib": "^2.4.0" } }, - "it-first": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", - "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" - }, "protons-runtime": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/protons-runtime/-/protons-runtime-5.5.0.tgz", @@ -47069,11 +47050,6 @@ "it-peekable": "^3.0.0" } }, - "it-first": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", - "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" - }, "it-foreach": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/it-foreach/-/it-foreach-2.1.1.tgz", @@ -50082,6 +50058,11 @@ "resolved": "https://registry.npmjs.org/it-drain/-/it-drain-3.0.7.tgz", "integrity": "sha512-vy6S1JKjjHSIFHgBpLpD1zhkCRl3z1zYWUxE14+kAYf+BL9ssWSFImJfhl361IIcwr0ofw8etzg11VqqB+ntUA==" }, + "it-first": { + "version": "3.0.6", + "resolved": "https://registry.npmjs.org/it-first/-/it-first-3.0.6.tgz", + "integrity": "sha512-ExIewyK9kXKNAplg2GMeWfgjUcfC1FnUXz/RPfAvIXby+w7U4b3//5Lic0NV03gXT8O/isj5Nmp6KiY0d45pIQ==" + }, "it-length-prefixed-stream": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/it-length-prefixed-stream/-/it-length-prefixed-stream-1.2.0.tgz", diff --git a/packages/backend/package.json b/packages/backend/package.json index 63a1eaf94e..5cba25a5db 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -126,6 +126,7 @@ "@ipld/dag-cbor": "^9.2.1", "@ipld/dag-pb": "^4.1.2", "it-drain": "^3.0.7", + "it-first": "^3.0.6", "it-pipe": "^3.0.1", "it-ws": "^6.1.5", "joi": "^17.8.1", diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 9711c2a4ee..28fbd0a4d1 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -7,7 +7,6 @@ import { PeerId, type Libp2p } from '@libp2p/interface' import { kadDHT } from '@libp2p/kad-dht' import { keychain } from '@libp2p/keychain' import { peerIdFromString } from '@libp2p/peer-id' -import { ping } from '@libp2p/ping' import { preSharedKey } from '@libp2p/pnet' import * as filters from '@libp2p/websockets/filters' import { createLibp2p } from 'libp2p' @@ -34,6 +33,8 @@ import { ServerIoProviderTypes } from '../types' import { webSockets } from '../websocketOverTor' import { Libp2pConnectedPeer, Libp2pEvents, Libp2pNodeParams } from './libp2p.types' import { createLogger } from '../common/logger' +// import { ping } from './ping.service' +import { ping } from '@libp2p/ping' const KEY_LENGTH = 32 export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' diff --git a/packages/backend/src/nest/libp2p/ping.service.ts b/packages/backend/src/nest/libp2p/ping.service.ts index 60728f103d..b8ce41c185 100644 --- a/packages/backend/src/nest/libp2p/ping.service.ts +++ b/packages/backend/src/nest/libp2p/ping.service.ts @@ -1,126 +1,158 @@ -// import { randomBytes } from '@libp2p/crypto'; -// import { CodeError, ERR_INVALID_MESSAGE, ERR_TIMEOUT } from '@libp2p/interface'; -// import first from 'it-first'; -// import { pipe } from 'it-pipe'; -// import { Components } from 'libp2p/dist/src/components.js'; -// import { equals as uint8ArrayEquals } from 'uint8arrays/equals'; -// import { PROTOCOL_PREFIX, PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION, TIMEOUT, MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS, ERR_WRONG_PING_ACK } from './constants.js'; -// export class PingService { -// protocol; -// components; -// started; -// timeout; -// maxInboundStreams; -// maxOutboundStreams; -// runOnTransientConnection; -// log; -// constructor(components: Components, init = {}) { -// this.components = components; -// this.log = components.logger.forComponent('libp2p:ping'); -// this.started = false; -// this.protocol = `/${init.protocolPrefix ?? PROTOCOL_PREFIX}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`; -// this.timeout = init.timeout ?? TIMEOUT; -// this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS; -// this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS; -// this.runOnTransientConnection = init.runOnTransientConnection ?? true; -// this.handleMessage = this.handleMessage.bind(this); -// } -// [Symbol.toStringTag] = '@libp2p/ping'; -// async start() { -// await this.components.registrar.handle(this.protocol, this.handleMessage, { -// maxInboundStreams: this.maxInboundStreams, -// maxOutboundStreams: this.maxOutboundStreams, -// runOnTransientConnection: this.runOnTransientConnection -// }); -// this.started = true; -// } -// async stop() { -// await this.components.registrar.unhandle(this.protocol); -// this.started = false; -// } -// isStarted() { -// return this.started; -// } -// /** -// * A handler to register with Libp2p to process ping messages -// */ -// handleMessage(data) { -// this.log('incoming ping from %p', data.connection.remotePeer); -// const { stream } = data; -// const start = Date.now(); -// const signal = AbortSignal.timeout(this.timeout); -// signal.addEventListener('abort', () => { -// stream?.abort(new CodeError('ping timeout', ERR_TIMEOUT)); -// }); -// void pipe(stream, async function* (source) { -// let received = 0; -// for await (const buf of source) { -// received += buf.byteLength; -// if (received > PING_LENGTH) { -// stream?.abort(new CodeError('Too much data received', ERR_INVALID_MESSAGE)); -// return; -// } -// yield buf; -// } -// }, stream) -// .catch(err => { -// this.log.error('incoming ping from %p failed with error', data.connection.remotePeer, err); -// stream?.abort(err); -// }) -// .finally(() => { -// const ms = Date.now() - start; -// this.log('incoming ping from %p complete in %dms', data.connection.remotePeer, ms); -// }); -// } -// /** -// * Ping a given peer and wait for its response, getting the operation latency. -// */ -// async ping(peer, options = {}) { -// this.log('pinging %p', peer); -// const start = Date.now(); -// const data = randomBytes(PING_LENGTH); -// const connection = await this.components.connectionManager.openConnection(peer, options); -// let stream; -// let onAbort = () => { }; -// if (options.signal == null) { -// const signal = AbortSignal.timeout(this.timeout); -// options = { -// ...options, -// signal -// }; -// } -// try { -// stream = await connection.newStream(this.protocol, { -// ...options, -// runOnTransientConnection: this.runOnTransientConnection -// }); -// onAbort = () => { -// stream?.abort(new CodeError('ping timeout', ERR_TIMEOUT)); -// }; -// // make stream abortable -// options.signal?.addEventListener('abort', onAbort, { once: true }); -// const result = await pipe([data], stream, async (source) => first(source)); -// const ms = Date.now() - start; -// if (result == null) { -// throw new CodeError(`Did not receive a ping ack after ${ms}ms`, ERR_WRONG_PING_ACK); -// } -// if (!uint8ArrayEquals(data, result.subarray())) { -// throw new CodeError(`Received wrong ping ack after ${ms}ms`, ERR_WRONG_PING_ACK); -// } -// this.log('ping %p complete in %dms', connection.remotePeer, ms); -// return ms; -// } -// catch (err) { -// this.log.error('error while pinging %p', connection.remotePeer, err); -// stream?.abort(err); -// throw err; -// } -// finally { -// options.signal?.removeEventListener('abort', onAbort); -// if (stream != null) { -// await stream.close(); -// } -// } -// } -// } -// //# sourceMappingURL=ping.js.map +import { randomBytes } from '@libp2p/crypto' +import { + AbortOptions, + CodeError, + ERR_INVALID_MESSAGE, + ERR_TIMEOUT, + IncomingStreamData, + PeerId, + Stream, +} from '@libp2p/interface' +import { PingServiceComponents, PingServiceInit } from '@libp2p/ping' +import first from 'it-first' +import { pipe } from 'it-pipe' +import { equals as uint8ArrayEquals } from 'uint8arrays/equals' + +// errors +const ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK' + +// init constants +const PROTOCOL_PREFIX = 'ipfs' +const PROTOCOL_NAME = 'ping' +const PROTOCOL_VERSION = '1.0.0' +const MAX_INBOUND_STREAMS = 2 +const MAX_OUTBOUND_STREAMS = 1 +const TIMEOUT = 10_000 +const PING_LENGTH = 32 + +export class PingService { + protocol + components + started + timeout + maxInboundStreams + maxOutboundStreams + runOnTransientConnection + log + constructor(components: PingServiceComponents, init: PingServiceInit = {}) { + this.components = components + this.log = components.logger.forComponent('libp2p:ping') + this.started = false + this.protocol = `/${init.protocolPrefix ?? PROTOCOL_PREFIX}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + this.timeout = init.timeout ?? TIMEOUT + this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS + this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS + this.runOnTransientConnection = init.runOnTransientConnection ?? true + this.handleMessage = this.handleMessage.bind(this) + } + + [Symbol.toStringTag] = '@libp2p/ping' + + async start() { + this.log(`Starting ping service`) + await this.components.registrar.handle(this.protocol, this.handleMessage, { + maxInboundStreams: this.maxInboundStreams, + maxOutboundStreams: this.maxOutboundStreams, + runOnTransientConnection: this.runOnTransientConnection, + }) + this.started = true + } + + async stop() { + await this.components.registrar.unhandle(this.protocol) + this.started = false + } + + isStarted() { + return this.started + } + + /** + * A handler to register with Libp2p to process ping messages + */ + handleMessage(data: IncomingStreamData) { + this.log('incoming ping from %p', data.connection.remotePeer) + const { stream } = data + const start = Date.now() + const signal = AbortSignal.timeout(this.timeout) + signal.addEventListener('abort', () => { + stream?.abort(new CodeError('ping timeout', ERR_TIMEOUT)) + }) + void pipe( + stream, + async function* (source) { + let received = 0 + for await (const buf of source) { + received += buf.byteLength + if (received > PING_LENGTH) { + stream?.abort(new CodeError('Too much data received', ERR_INVALID_MESSAGE)) + return + } + yield buf + } + }, + stream + ) + .catch((err: Error) => { + this.log.error('incoming ping from %p failed with error', data.connection.remotePeer, err) + stream?.abort(err) + }) + .finally(() => { + const ms = Date.now() - start + this.log('incoming ping from %p complete in %dms', data.connection.remotePeer, ms) + }) + } + + /** + * Ping a given peer and wait for its response, getting the operation latency. + */ + async ping(peer: PeerId, options: AbortOptions = {}) { + this.log('pinging %p', peer) + const start = Date.now() + const data = randomBytes(PING_LENGTH) + const connection = await this.components.connectionManager.openConnection(peer, options) + let stream: Stream | undefined + let onAbort = () => {} + if (options.signal == null) { + const signal = AbortSignal.timeout(this.timeout) + options = { + ...options, + signal, + } + } + try { + stream = await connection.newStream(this.protocol, { + ...options, + runOnTransientConnection: this.runOnTransientConnection, + }) + onAbort = () => { + stream?.abort(new CodeError('ping timeout', ERR_TIMEOUT)) + } + // make stream abortable + options.signal?.addEventListener('abort', onAbort, { once: true }) + const result = await pipe([data], stream, async source => first(source)) + const ms = Date.now() - start + if (result == null) { + throw new CodeError(`Did not receive a ping ack after ${ms}ms`, ERR_WRONG_PING_ACK) + } + if (!uint8ArrayEquals(data, result.subarray())) { + throw new CodeError(`Received wrong ping ack after ${ms}ms`, ERR_WRONG_PING_ACK) + } + this.log('ping %p complete in %dms', connection.remotePeer, ms) + return ms + } catch (err) { + this.log.error('error while pinging %p', connection.remotePeer, err) + stream?.abort(err) + throw err + } finally { + options.signal?.removeEventListener('abort', onAbort) + if (stream != null) { + await stream.close() + } + } + } +} + +export function ping(init: PingServiceInit = {}): (components: PingServiceComponents) => PingService { + return components => new PingService(components, init) +} diff --git a/packages/e2e-tests/src/selectors.ts b/packages/e2e-tests/src/selectors.ts index 8038fe6bc8..988d041af7 100644 --- a/packages/e2e-tests/src/selectors.ts +++ b/packages/e2e-tests/src/selectors.ts @@ -404,7 +404,7 @@ export class Channel { return labelText === label }) return properLabels.length > 0 - }) + }, 20_000) } async waitForLabelsNotPresent(username: string) { diff --git a/packages/e2e-tests/src/tests/multipleClients.test.ts b/packages/e2e-tests/src/tests/multipleClients.test.ts index 193b5556ee..f9038f8f36 100644 --- a/packages/e2e-tests/src/tests/multipleClients.test.ts +++ b/packages/e2e-tests/src/tests/multipleClients.test.ts @@ -254,13 +254,10 @@ describe('Multiple Clients', () => { it('Second user can send a message, they see their message tagged as "unregistered"', async () => { logger.info('Second guest FETCHING CHANNEL MESSAGES!') - await new Promise(resolve => - setTimeout(() => { - resolve() - }, 15000) - ) + await sleep(10_000) await generalChannelUser3.sendMessage(users.user3.messages[0]) generalChannelUser3 = new Channel(users.user3.app.driver, 'general') + await sleep(4_000) await generalChannelUser3.waitForLabel(users.user3.username, 'Unregistered') })