From 9ce6678e516356b577cb2c064d56b6e2f68b43ca Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Sun, 7 Oct 2018 00:25:57 +0900 Subject: [PATCH] Add reconnection method for websocket --- example/javascript/web_socket.js | 4 ++ src/web_socket.ts | 74 ++++++++++++++++++++++++-------- 2 files changed, 61 insertions(+), 17 deletions(-) diff --git a/example/javascript/web_socket.js b/example/javascript/web_socket.js index 1d73e6414..bc8840dcf 100644 --- a/example/javascript/web_socket.js +++ b/example/javascript/web_socket.js @@ -44,3 +44,7 @@ stream.on('close', () => { stream.on('parser-error', (err) => { console.error(err) }) + +setTimeout(() => { + stream.stop() +}, 10000) diff --git a/src/web_socket.ts b/src/web_socket.ts index 8f3b4324a..52176a10f 100644 --- a/src/web_socket.ts +++ b/src/web_socket.ts @@ -14,6 +14,9 @@ export default class WebSocket extends EventEmitter { public parser: Parser private _accessToken: string private _socketConnection: connection | null + private _reconnectInterval: number + private _reconnectMaxAttempts: number + private _reconnectCurrentAttempts: number /** * @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming @@ -27,12 +30,16 @@ export default class WebSocket extends EventEmitter { this.parser = new Parser() this._accessToken = accessToken this._socketConnection = null + this._reconnectInterval = 1000 + this._reconnectMaxAttempts = Infinity + this._reconnectCurrentAttempts = 0 } /** * Start websocket connection. */ public start() { + this._resetRetryParams() this._startWebSocketConnection() } @@ -42,7 +49,8 @@ export default class WebSocket extends EventEmitter { private _startWebSocketConnection() { this._resetConnection() this._setupParser() - this._getStream(this.url, this.stream, this._accessToken) + const cli = this._getClient() + this._connect(cli, this.url, this.stream, this._accessToken) } /** @@ -52,6 +60,7 @@ export default class WebSocket extends EventEmitter { if (this._socketConnection) { this._socketConnection.close() } + this._resetRetryParams() } /** @@ -69,39 +78,70 @@ export default class WebSocket extends EventEmitter { } /** - * Prepare websocket connection. + * Resets the parameters used in reconnect. + */ + private _resetRetryParams() { + this._reconnectCurrentAttempts = 0 + } + + /** + * Reconnects to the same endpoint. + */ + private _reconnect(cli: client) { + setTimeout(() => { + if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { + this._reconnectCurrentAttempts++ + // Call connect methods + console.log('Reconnecting') + this._connect(cli, this.url, this.stream, this._accessToken) + } + }, this._reconnectInterval) + } + + private _connect(cli: client, url: string, stream: string, accessToken: string) { + const params = [`stream=${stream}`] + + if (accessToken !== null) { + params.push(`access_token=${accessToken}`) + } + const req_url: string = `${url}/?${params.join('&')}` + cli.connect(req_url) + } + + /** + * Prepare websocket client. * @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming * @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28 * @param accessToken The access token. * @returns A Client instance of websocket. */ - private _getStream(url: string, stream: string, accessToken: string): client { - const connection = new client() - connection.on('connectFailed', (err) => { - this.emit('error', err) + private _getClient(): client { + const cli = new client() + cli.on('connectFailed', (err) => { + console.error(err) + this._reconnect(cli) }) - connection.on('connect', (conn) => { + cli.on('connect', (conn) => { this._socketConnection = conn this.emit('connect', {}) conn.on('error', (err) => { this.emit('error', err) }) - conn.on('close', () => { - this.emit('close', {}) + conn.on('close', (code) => { + // Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4 + if (code === 1000) { + this.emit('close', {}) + } else { + console.log(`Closed connection with ${code}`) + this._reconnect(cli) + } }) conn.on('message', (message: IMessage) => { this.parser.parser(message) }) }) - const params = [`stream=${stream}`] - - if (accessToken !== null) { - params.push(`access_token=${accessToken}`) - } - const req_url: string = `${url}/?${params.join('&')}` - connection.connect(req_url) - return connection + return cli } /**