diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 1d4354aeba..758817d29d 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -1,10 +1,15 @@ -import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + ISubscriptionSDK, + LightNode +} from "@waku/interfaces"; import { createDecoder, createEncoder, DecodedMessage, utf8ToBytes } from "@waku/sdk"; +import { delay } from "@waku/utils"; import { expect } from "chai"; import { describe } from "mocha"; @@ -21,9 +26,20 @@ import { //TODO: add unit tests, describe("Waku Filter: Peer Management: E2E", function () { - this.timeout(15000); + this.timeout(30000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; + let subscription: ISubscriptionSDK; + + const pubsubTopic = DefaultPubsubTopic; + const contentTopic = "/test"; + + const encoder = createEncoder({ + pubsubTopic, + contentTopic + }); + + const decoder = createDecoder(contentTopic, pubsubTopic); beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes( @@ -32,29 +48,19 @@ describe("Waku Filter: Peer Management: E2E", function () { undefined, 5 ); + const { error, subscription: sub } = + await waku.filter.createSubscription(pubsubTopic); + if (!sub || error) { + throw new Error("Could not create subscription"); + } + subscription = sub; }); afterEachCustom(this, async () => { await teardownNodesWithRedundancy(serviceNodes, waku); }); - const pubsubTopic = DefaultPubsubTopic; - const contentTopic = "/test"; - - const encoder = createEncoder({ - pubsubTopic, - contentTopic - }); - - const decoder = createDecoder(contentTopic, pubsubTopic); - it("Number of peers are maintained correctly", async function () { - const { error, subscription } = - await waku.filter.createSubscription(pubsubTopic); - if (!subscription || error) { - expect.fail("Could not create subscription"); - } - const messages: DecodedMessage[] = []; const { failures, successes } = await subscription.subscribe( [decoder], @@ -74,4 +80,67 @@ describe("Waku Filter: Peer Management: E2E", function () { expect(failures.length).to.equal(0); } }); + + it("Ping succeeds for all connected peers", async function () { + await subscription.subscribe([decoder], () => {}); + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + }); + + it("Ping fails for unsubscribed peers", async function () { + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(0); + expect(pingResult.failures.length).to.be.greaterThan(0); + }); + + it("Keep-alive pings maintain the connection", async function () { + await subscription.subscribe([decoder], () => {}, { keepAlive: 100 }); + + await delay(1000); + + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + }); + + it("Renews peer on consistent ping failures", async function () { + await subscription.subscribe([decoder], () => {}, { keepAlive: 100 }); + + const disconnectedNodePeerId = waku.filter.connectedPeers[0].id; + await waku.connectionManager.dropConnection(disconnectedNodePeerId); + + await delay(1000); + + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + + expect(waku.filter.connectedPeers.length).to.equal( + waku.filter.numPeersToUse + ); + expect( + waku.filter.connectedPeers.some((peer) => + peer.id.equals(disconnectedNodePeerId) + ) + ).to.eq(false); + }); + + it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () { + for (let i = 0; i < 3; i++) { + await subscription.subscribe([decoder], () => {}); + let pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + + await subscription.unsubscribe([contentTopic]); + pingResult = await subscription.ping(); + expect(pingResult.failures.length).to.be.greaterThan(0); + } + + await subscription.subscribe([decoder], () => {}); + const finalPingResult = await subscription.ping(); + expect(finalPingResult.successes.length).to.equal( + waku.filter.numPeersToUse + ); + }); });