Skip to content

Commit

Permalink
tests: use new API
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Aug 1, 2024
1 parent 3f14bba commit 3d698e6
Show file tree
Hide file tree
Showing 21 changed files with 448 additions and 352 deletions.
10 changes: 6 additions & 4 deletions packages/interfaces/src/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ export interface IReceiver {
toSubscriptionIterator: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
) => Promise<IAsyncIterator<T>>;
subscribeWithUnsubscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Unsubscribe | Promise<Unsubscribe>;
subscribeWithUnsubscribe: SubscribeWithUnsubscribe;
}

type SubscribeWithUnsubscribe = <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Unsubscribe | Promise<Unsubscribe>;
2 changes: 2 additions & 0 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class Relay implements IRelay {
};
}

public subscribe = this.subscribeWithUnsubscribe;

private removeObservers<T extends IDecodedMessage>(
observers: Array<[PubsubTopic, Observer<T>]>
): void {
Expand Down
15 changes: 4 additions & 11 deletions packages/tests/tests/ephemeral.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
DecodedMessage,
waitForRemotePeer
} from "@waku/core";
import { ISubscriptionSDK, Protocols } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import {
generatePrivateKey,
Expand Down Expand Up @@ -83,8 +83,6 @@ describe("Waku Message Ephemeral field", function () {
let waku: LightNode;
let nwaku: ServiceNode;

let subscription: ISubscriptionSDK;

afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
Expand Down Expand Up @@ -122,11 +120,6 @@ describe("Waku Message Ephemeral field", function () {
Protocols.LightPush,
Protocols.Store
]);

const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestEncoder.pubsubTopic);
if (error) throw error;
subscription = _subscription;
});

it("Ephemeral messages are not stored", async function () {
Expand Down Expand Up @@ -218,7 +211,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([TestDecoder], callback);
await waku.filter.subscribe([TestDecoder], callback);

await delay(200);
const normalTxt = "Normal message";
Expand Down Expand Up @@ -265,7 +258,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([decoder], callback);
await waku.filter.subscribe([decoder], callback);

await delay(200);
const normalTxt = "Normal message";
Expand Down Expand Up @@ -316,7 +309,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([decoder], callback);
await waku.filter.subscribe([decoder], callback);

await delay(200);
const normalTxt = "Normal message";
Expand Down
101 changes: 68 additions & 33 deletions packages/tests/tests/filter/peer_management.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ describe("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: ISubscriptionSDK;

const contentTopic = "/test";

Expand All @@ -47,13 +46,6 @@ describe("Waku Filter: Peer Management: E2E", function () {
undefined,
5
);
const { error, subscription: sub } = await waku.filter.createSubscription(
DefaultTestPubsubTopic
);
if (!sub || error) {
throw new Error("Could not create subscription");
}
subscription = sub;
});

afterEachCustom(this, async () => {
Expand All @@ -62,12 +54,15 @@ describe("Waku Filter: Peer Management: E2E", function () {

it("Number of peers are maintained correctly", async function () {
const messages: DecodedMessage[] = [];
const { failures, successes } = await subscription.subscribe(
[decoder],
(msg) => {
messages.push(msg);
}
);
const { error, results } = await waku.filter.subscribe([decoder], (msg) => {
messages.push(msg);
});

if (error) {
throw error;
}

const { successes, failures } = results;

await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
Expand All @@ -82,20 +77,41 @@ describe("Waku Filter: Peer Management: E2E", function () {
});

it("Ping succeeds for all connected peers", async function () {
await subscription.subscribe([decoder], () => {});
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {}
);
if (error) {
throw error;
}
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
expect(pingResult.failures.length).to.equal(0);
});

it("Ping fails for unsubscribed peers", async function () {
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {}
);
if (error) {
throw error;
}
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(0);
expect(pingResult.failures.length).to.be.greaterThan(0);
});

it("Keep-alive pings maintain the connection", async function () {
await subscription.subscribe([decoder], () => {}, { keepAlive: 100 });
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {},
undefined,
{ keepAlive: 100 }
);
if (error) {
throw error;
}

await delay(1000);

Expand All @@ -106,9 +122,17 @@ describe("Waku Filter: Peer Management: E2E", function () {

it("Renews peer on consistent ping failures", async function () {
const maxPingFailures = 3;
await subscription.subscribe([decoder], () => {}, {
pingsBeforePeerRenewed: maxPingFailures
});
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {},
undefined,
{
pingsBeforePeerRenewed: maxPingFailures
}
);
if (error) {
throw error;
}

const disconnectedNodePeerId = waku.filter.connectedPeers[0].id;
await waku.connectionManager.dropConnection(disconnectedNodePeerId);
Expand All @@ -135,9 +159,17 @@ describe("Waku Filter: Peer Management: E2E", function () {

it("Tracks peer failures correctly", async function () {
const maxPingFailures = 3;
await subscription.subscribe([decoder], () => {}, {
pingsBeforePeerRenewed: maxPingFailures
});
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {},
undefined,
{
pingsBeforePeerRenewed: maxPingFailures
}
);
if (error) {
throw error;
}

const targetPeer = waku.filter.connectedPeers[0];
await waku.connectionManager.dropConnection(targetPeer.id);
Expand All @@ -163,8 +195,14 @@ describe("Waku Filter: Peer Management: E2E", function () {
});

it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () {
let subscription: ISubscriptionSDK;
for (let i = 0; i < 3; i++) {
await subscription.subscribe([decoder], () => {});
const { error, subscription: _subscription } =
await waku.filter.subscribe([decoder], () => {});
if (error) {
throw error;
}
subscription = _subscription;
let pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);

Expand All @@ -173,8 +211,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
expect(pingResult.failures.length).to.be.greaterThan(0);
}

await subscription.subscribe([decoder], () => {});
const finalPingResult = await subscription.ping();
const finalPingResult = await subscription!.ping();
expect(finalPingResult.successes.length).to.equal(
waku.filter.numPeersToUse
);
Expand All @@ -200,17 +237,15 @@ describe("Waku Filter: Peer Management: E2E", function () {
).toString();
await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId());

const { error, subscription: sub } = await waku.filter.createSubscription(
DefaultTestPubsubTopic
);
if (!sub || error) {
throw new Error("Could not create subscription");
}

const messages: DecodedMessage[] = [];
const { successes } = await sub.subscribe([decoder], (msg) => {
const { error, results } = await waku.filter.subscribe([decoder], (msg) => {
messages.push(msg);
});
if (error) {
throw error;
}

const { successes } = results;

expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);
Expand Down
36 changes: 25 additions & 11 deletions packages/tests/tests/filter/ping.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,23 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: ISubscriptionSDK;

beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
});

afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});

it("Ping on subscribed peer", async function () {
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
Expand All @@ -60,14 +58,24 @@ const runTests = (strictCheckNodes: boolean): void => {
});

it("Ping on peer without subscriptions", async function () {
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await validatePingError(subscription);
});

it("Ping on unsubscribed peer", async function () {
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await subscription.ping();
await subscription.unsubscribe([TestContentTopic]);

Expand All @@ -76,11 +84,17 @@ const runTests = (strictCheckNodes: boolean): void => {
});

it("Reopen subscription with peer with lost subscription", async function () {
let subscription: ISubscriptionSDK;
const openSubscription = async (): Promise<void> => {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
const { error, subscription: _subscription } =
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
subscription = _subscription;
};

const unsubscribe = async (): Promise<void> => {
Expand Down
Loading

0 comments on commit 3d698e6

Please sign in to comment.