Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multi purpose websocket client #2160

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions src/lib/WebSocketClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/* eslint-disable class-methods-use-this */
import { v4 as genUuid } from 'uuid';
import camelcaseKeysDeep from 'camelcase-keys-deep';
import {
AE_NETWORK_ADDITIONAL_SETTINGS,
WEB_SOCKET_SOURCE,
WEB_SOCKET_CHANNELS,
WEB_SOCKET_SUBSCRIBE,
WEB_SOCKET_UNSUBSCRIBE,
WEB_SOCKET_RECONNECT_TIMEOUT,
} from '@/protocols/aeternity/config';
import { handleUnknownError } from '@/utils';
import { NETWORK_TYPE_MAINNET } from '@/constants';
import type {
IMiddlewareWebSocketSubscriptionMessage,
ITopHeader,
ITransaction,
WebSocketChannelName,
} from '../types';

class WebSocketClient {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add middleware to the name because it is aware of it

// eslint-disable-next-line no-use-before-define
private static instance: WebSocketClient;

wsClient: WebSocket = new WebSocket(
AE_NETWORK_ADDITIONAL_SETTINGS[NETWORK_TYPE_MAINNET].websocketUrl,
);

isWsConnected: boolean = false;

subscribersQueue: IMiddlewareWebSocketSubscriptionMessage[] = [];

subscribers: Record<WebSocketChannelName, Record<string, (
payload: ITransaction | ITopHeader
) => void>> = {
[WEB_SOCKET_CHANNELS.Transactions]: {},
[WEB_SOCKET_CHANNELS.MicroBlocks]: {},
[WEB_SOCKET_CHANNELS.KeyBlocks]: {},
[WEB_SOCKET_CHANNELS.Object]: {},
};

handleWebsocketOpen() {
this.isWsConnected = true;
try {
this.subscribersQueue.forEach((message) => {
this.wsClient.send(JSON.stringify(message));
});
} catch (error) {
handleUnknownError(error);
setTimeout(() => {
this.handleWebsocketOpen();
}, WEB_SOCKET_RECONNECT_TIMEOUT);
}
Comment on lines +49 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe introduce a maximum tries check here to avoid the infinite loop when there is no internet

}

private handleWebsocketClose() {
this.isWsConnected = false;
}

isConnected(): boolean {
return this.isWsConnected;
}

subscribeForChannel(
message: IMiddlewareWebSocketSubscriptionMessage,
callback: (payload: any) => void,
) {
if (this.isWsConnected) {
Object.keys(WEB_SOCKET_SOURCE).forEach((source) => {
this.wsClient.send(JSON.stringify({
...message,
source,
}));
});
}

this.subscribersQueue.push(message);

const uuid = genUuid();
this.subscribers[message.payload][uuid] = callback;
return () => {
delete this.subscribers[message.payload][uuid];
if (Object.keys(this.subscribers[message.payload]).length === 0) {
// should remove the message from the queue if there are no subscribers
this.subscribersQueue = this.subscribersQueue.filter(
(msg) => msg.payload !== message.payload,
);

// should unsubscribe from the channel if there are no subscribers
Object.keys(WEB_SOCKET_SOURCE).forEach((source) => {
this.wsClient.send(JSON.stringify({
...message,
op: WEB_SOCKET_UNSUBSCRIBE,
source,
}));
});
}
};
}

subscribeForAccountUpdates(address: string, callback: (payload: ITransaction) => void) {
return this.subscribeForChannel(
{
op: WEB_SOCKET_SUBSCRIBE,
payload: WEB_SOCKET_CHANNELS.Object,
target: address,
},
callback,
);
}

subscribeForTransactionsUpdates(callback: (payload: ITransaction) => void) {
return this.subscribeForChannel(
{
op: WEB_SOCKET_SUBSCRIBE,
payload: WEB_SOCKET_CHANNELS.Transactions,
},
callback,
);
}

subscribeForMicroBlocksUpdates(callback: (payload: ITopHeader) => void) {
return this.subscribeForChannel(
{
op: WEB_SOCKET_SUBSCRIBE,
payload: WEB_SOCKET_CHANNELS.MicroBlocks,
},
callback,
);
}

subscribeForKeyBlocksUpdates(callback: (payload: ITopHeader) => void) {
return this.subscribeForChannel(
{
op: WEB_SOCKET_SUBSCRIBE,
payload: WEB_SOCKET_CHANNELS.KeyBlocks,
},
callback,
);
}

private handleWebsocketMessage(message: MessageEvent) {
if (!message.data) {
return;
}
try {
const data = camelcaseKeysDeep(JSON.parse(message.data));

if (!data.payload) {
return;
}

// Call all subscribers for the channel
Object.values(this.subscribers[data.subscription as WebSocketChannelName]).forEach(
(subscriberCb) => subscriberCb(data.payload),
);
} catch (error) {
handleUnknownError(error);
}
}

disconnect() {
this.subscribersQueue.forEach((message) => {
Object.keys(WEB_SOCKET_SOURCE).forEach((source) => {
this.wsClient.send(JSON.stringify({
...message,
source,
op: WEB_SOCKET_UNSUBSCRIBE,
}));
});
});
this.wsClient.close();
this.wsClient.removeEventListener('open', this.handleWebsocketOpen);
this.wsClient.removeEventListener('close', this.handleWebsocketClose);
this.wsClient.removeEventListener('message', this.handleWebsocketClose);
}

connect(url: string) {
if (this.wsClient) {
this.disconnect();
}

this.wsClient = new WebSocket(url);
this.wsClient.addEventListener('open', () => this.handleWebsocketOpen());
this.wsClient.addEventListener('close', () => this.handleWebsocketClose());
this.wsClient.addEventListener('message', (message) => this.handleWebsocketMessage(message));
}

static getInstance(): WebSocketClient {
if (!WebSocketClient.instance) {
WebSocketClient.instance = new WebSocketClient();
}
return WebSocketClient.instance;
}
}

export default WebSocketClient.getInstance();
9 changes: 9 additions & 0 deletions src/popup/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ import {
useNotifications,
useUi,
} from '@/composables';
import { useAeNetworkSettings } from '@/protocols/aeternity/composables';
import { useTransferSendHandler } from '@/composables/transferSendHandler';
import WebSocketClient from '@/lib/WebSocketClient';

import Header from '@/popup/components/Header.vue';
import NodeConnectionStatus from '@/popup/components/NodeConnectionStatus.vue';
Expand All @@ -120,6 +122,7 @@ export default defineComponent({
const router = useRouter();
const { t } = useI18n();

const { aeActiveNetworkPredefinedSettings } = useAeNetworkSettings();
const { watchConnectionStatus } = useConnection();
const {
isSeedBackedUp,
Expand Down Expand Up @@ -229,6 +232,12 @@ export default defineComponent({
{ immediate: true },
);

watch(aeActiveNetworkPredefinedSettings, (network, prevNetwork) => {
if (network?.websocketUrl !== prevNetwork?.websocketUrl) {
WebSocketClient.connect(network.websocketUrl);
}
}, { immediate: true });

initVisibilityListeners();

onBeforeMount(async () => {
Expand Down
20 changes: 20 additions & 0 deletions src/protocols/aeternity/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ export const AE_NETWORK_ADDITIONAL_SETTINGS: IDefaultNetworkTypeData<
[NETWORK_TYPE_MAINNET]: {
explorerUrl: 'https://aescan.io',
multisigBackendUrl: 'https://ga-multisig-backend-mainnet.prd.service.aepps.com',
websocketUrl: 'wss://mainnet.aeternity.io/mdw/v2/websocket',
},
[NETWORK_TYPE_TESTNET]: {
explorerUrl: 'https://testnet.aescan.io',
multisigBackendUrl: 'https://ga-multisig-backend-testnet.prd.service.aepps.com',
websocketUrl: 'wss://testnet.aeternity.io/mdw/v2/websocket',
},
};

Expand All @@ -71,6 +73,24 @@ export const AE_AENS_NAME_MAX_LENGTH = 63 + AE_AENS_DOMAIN.length;
export const AE_AENS_NAME_AUCTION_MAX_LENGTH = 12 + AE_AENS_DOMAIN.length;
export const AE_AENS_BID_MIN_RATIO = 1.05;

export const WEB_SOCKET_CHANNELS = {
Transactions: 'Transactions',
MicroBlocks: 'MicroBlocks',
KeyBlocks: 'KeyBlocks',
Object: 'Object',
};

export const WEB_SOCKET_SOURCE = {
mdw: 'mdw',
node: 'node',
};

export const WEB_SOCKET_SUBSCRIBE = 'Subscribe';
export const WEB_SOCKET_UNSUBSCRIBE = 'Unsubscribe';
export const WEB_SOCKET_RECONNECT_TIMEOUT = 1000;

export const PUSH_NOTIFICATION_AUTO_CLOSE_TIMEOUT = 10000;

/**
* Estimated time we need to wait for the middleware to sync it's state
* with the node. There is a high risk that in some cases this won't be enough
Expand Down
3 changes: 2 additions & 1 deletion src/protocols/aeternity/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export type AeNetworkProtocolSettings =
*/
export type AeNetworkProtocolPredefinedSettings =
| 'explorerUrl'
| 'multisigBackendUrl';
| 'multisigBackendUrl'
| 'websocketUrl';

export type IAeNetworkSettings = INetworkProtocolSettings<AeNetworkProtocolSettings>;

Expand Down
13 changes: 13 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
TX_FUNCTIONS,
TX_FUNCTIONS_MULTISIG,
TX_RETURN_TYPES,
WEB_SOCKET_CHANNELS,
WEB_SOCKET_SOURCE,
} from '@/protocols/aeternity/config';
import { BTC_CONTRACT_ID } from '@/protocols/bitcoin/config';
import { ETH_CONTRACT_ID } from '@/protocols/ethereum/config';
Expand Down Expand Up @@ -858,3 +860,14 @@ export interface IAmountDecimalPlaces {
highPrecision?: boolean;
amount?: number;
}

export type WebSocketChannelName = ObjectValues<typeof WEB_SOCKET_CHANNELS>;
export type WebSocketSourceName = ObjectValues<typeof WEB_SOCKET_SOURCE>;

// https://github.com/aeternity/ae_mdw#websocket-interface
export interface IMiddlewareWebSocketSubscriptionMessage {
op: 'Subscribe' | 'Unsubscribe';
payload: WebSocketChannelName;
target?: string;
source?: WebSocketSourceName;
}
Loading
Loading