Skip to content

Commit

Permalink
Merge pull request #15 from h3poteto/fix/websocket/reconnect
Browse files Browse the repository at this point in the history
Add reconnection method for websocket
  • Loading branch information
h3poteto authored Oct 6, 2018
2 parents 3e6ef50 + 9ce6678 commit 0a4a650
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 17 deletions.
4 changes: 4 additions & 0 deletions example/javascript/web_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ stream.on('close', () => {
stream.on('parser-error', (err) => {
console.error(err)
})

setTimeout(() => {
stream.stop()
}, 10000)
74 changes: 57 additions & 17 deletions src/web_socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ export default class WebSocket extends EventEmitter {
public parser: Parser
private _accessToken: string
private _socketConnection: connection | null
private _reconnectInterval: number
private _reconnectMaxAttempts: number
private _reconnectCurrentAttempts: number

/**
* @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming
Expand All @@ -27,12 +30,16 @@ export default class WebSocket extends EventEmitter {
this.parser = new Parser()
this._accessToken = accessToken
this._socketConnection = null
this._reconnectInterval = 1000
this._reconnectMaxAttempts = Infinity
this._reconnectCurrentAttempts = 0
}

/**
* Start websocket connection.
*/
public start() {
this._resetRetryParams()
this._startWebSocketConnection()
}

Expand All @@ -42,7 +49,8 @@ export default class WebSocket extends EventEmitter {
private _startWebSocketConnection() {
this._resetConnection()
this._setupParser()
this._getStream(this.url, this.stream, this._accessToken)
const cli = this._getClient()
this._connect(cli, this.url, this.stream, this._accessToken)
}

/**
Expand All @@ -52,6 +60,7 @@ export default class WebSocket extends EventEmitter {
if (this._socketConnection) {
this._socketConnection.close()
}
this._resetRetryParams()
}

/**
Expand All @@ -69,39 +78,70 @@ export default class WebSocket extends EventEmitter {
}

/**
* Prepare websocket connection.
* Resets the parameters used in reconnect.
*/
private _resetRetryParams() {
this._reconnectCurrentAttempts = 0
}

/**
* Reconnects to the same endpoint.
*/
private _reconnect(cli: client) {
setTimeout(() => {
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
this._reconnectCurrentAttempts++
// Call connect methods
console.log('Reconnecting')
this._connect(cli, this.url, this.stream, this._accessToken)
}
}, this._reconnectInterval)
}

private _connect(cli: client, url: string, stream: string, accessToken: string) {
const params = [`stream=${stream}`]

if (accessToken !== null) {
params.push(`access_token=${accessToken}`)
}
const req_url: string = `${url}/?${params.join('&')}`
cli.connect(req_url)
}

/**
* Prepare websocket client.
* @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming
* @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28
* @param accessToken The access token.
* @returns A Client instance of websocket.
*/
private _getStream(url: string, stream: string, accessToken: string): client {
const connection = new client()
connection.on('connectFailed', (err) => {
this.emit('error', err)
private _getClient(): client {
const cli = new client()
cli.on('connectFailed', (err) => {
console.error(err)
this._reconnect(cli)
})
connection.on('connect', (conn) => {
cli.on('connect', (conn) => {
this._socketConnection = conn
this.emit('connect', {})
conn.on('error', (err) => {
this.emit('error', err)
})
conn.on('close', () => {
this.emit('close', {})
conn.on('close', (code) => {
// Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4
if (code === 1000) {
this.emit('close', {})
} else {
console.log(`Closed connection with ${code}`)
this._reconnect(cli)
}
})
conn.on('message', (message: IMessage) => {
this.parser.parser(message)
})
})
const params = [`stream=${stream}`]

if (accessToken !== null) {
params.push(`access_token=${accessToken}`)
}
const req_url: string = `${url}/?${params.join('&')}`
connection.connect(req_url)

return connection
return cli
}

/**
Expand Down

0 comments on commit 0a4a650

Please sign in to comment.