Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: queue channel WS events until the channel is initialized #1179

Merged
merged 3 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
lastTypingEvent: Date | null;
isTyping: boolean;
disconnected: boolean;
/**
* Collects the incoming WS events before the channel is marked as initialized.
* This prevents executing procedures that depend on channel being initialized.
* Once the channel is marked as initialized the queue is flushed.
*/
wsEventQueue: Event<StreamChatGenerics>[];

/**
* constructor - Create a channel
Expand Down Expand Up @@ -132,6 +138,7 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
this.lastTypingEvent = null;
this.isTyping = false;
this.disconnected = false;
this.wsEventQueue = [];
}

/**
Expand Down Expand Up @@ -780,6 +787,7 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
const combined = { ...defaultOptions, ...options };
const state = await this.query(combined, 'latest');
this.initialized = true;
this._flushWsEventQueue();
this.data = state.channel;

this._client.logger('info', `channel:watch() - started watching channel ${this.cid}`, {
Expand Down Expand Up @@ -1207,6 +1215,12 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
// eslint-disable-next-line sonarjs/cognitive-complexity
_handleChannelEvent(event: Event<StreamChatGenerics>) {
const channel = this;

if (!this._isInitialized()) {
this.wsEventQueue.push(event);
return;
}

this._client.logger(
'info',
`channel:_handleChannelEvent - Received event of type { ${event.type} } on ${this.cid}`,
Expand Down Expand Up @@ -1442,6 +1456,10 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
return `${this.getClient().baseURL}/channels/${this.type}/${this.id}`;
};

_isInitialized() {
return this.initialized || this.offlineMode || this.getClient()._isUsingServerAuth();
}

_checkInitialized() {
if (!this.initialized && !this.offlineMode && !this.getClient()._isUsingServerAuth()) {
throw Error(
Expand Down Expand Up @@ -1555,4 +1573,11 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
this.disconnected = true;
this.state.setIsUpToDate(false);
}

_flushWsEventQueue() {
while (this.wsEventQueue.length) {
const event = this.wsEventQueue.shift();
if (event) this.getClient().dispatchEvent(event);
}
}
}
25 changes: 25 additions & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,28 @@ export const EVENT_MAP = {
'connection.recovered': true,
'transport.changed': true,
};

// events handled by channel._handleChannelEvent
export const CHANNEL_HANDLED_EVENTS = {
'typing.start': true,
'typing.stop': true,
'message.read': true,
'user.watching.start': true,
'user.updated': true,
'user.watching.stop': true,
'message.deleted': true,
'message.new': true,
'message.updated': true,
'channel.truncated': true,
'member.added': true,
'member.updated': true,
'member.removed': true,
'channel.updated': true,
'reaction.new': true,
'reaction.deleted': true,
'reaction.updated': true,
'channel.hidden': true,
'channel.visible': true,
'user.banned': true,
'user.unbanned': true,
};
74 changes: 72 additions & 2 deletions test/unit/channel.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import chai from 'chai';
import sinon from 'sinon';
import { v4 as uuidv4 } from 'uuid';

import { generateChannel } from './test-utils/generateChannel';
Expand All @@ -8,8 +9,7 @@ import { generateUser } from './test-utils/generateUser';
import { getClientWithUser } from './test-utils/getClient';
import { getOrCreateChannelApi } from './test-utils/getOrCreateChannelApi';

import { StreamChat } from '../../src/client';
import { ChannelState } from '../../src';
import { CHANNEL_HANDLED_EVENTS, ChannelState, StreamChat } from '../../src';

const expect = chai.expect;

Expand Down Expand Up @@ -497,6 +497,76 @@ describe('Channel _handleChannelEvent', function () {
expect(channel.state.members[user.id].shadow_banned).eq(expectAfterSecond.shadow_banned);
});
});

const eventTypes = Object.keys(CHANNEL_HANDLED_EVENTS);
const receiveAllChannelEvents = (channel) => {
eventTypes.forEach((type) => {
channel._handleChannelEvent({ type });
});
};

it('buffers WS events when uninitialized', () => {
channel.initialized = false;
channel.offlineMode = false;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.have.length(eventTypes.length);
});

it('does not buffer WS events when in offline mode', () => {
channel.initialized = false;
channel.offlineMode = true;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.be.empty;
});

it('does not buffer WS events with server-side client', () => {
client = new StreamChat('apiKey', 'secret');
client.user = user;
client.userID = user.id;
channel = client.channel('messaging', 'id');
channel.initialized = false;
channel.offlineMode = false;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.be.empty;
});

it('does not buffer WS events on initialized channel', () => {
channel.initialized = true;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.be.empty;
});

it('buffers WS events and channel.watch() flushes upon channel initialization', async () => {
channel.initialized = false;
sinon.stub(client, 'doAxiosRequest').resolves({ channel: generateChannel(), members: [] });

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.have.length(eventTypes.length);
await channel.watch();
expect(channel.wsEventQueue).to.be.empty;
client.doAxiosRequest.restore();
});

it('buffers WS events and channel.query() does not flush the queue', async () => {
channel.initialized = false;
sinon.stub(client, 'doAxiosRequest').resolves({ channel: generateChannel(), members: [] });

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.have.length(eventTypes.length);
await channel.query();
expect(channel.wsEventQueue).to.have.length(eventTypes.length);
client.doAxiosRequest.restore();
});
});

describe('Channels - Constructor', function () {
Expand Down
1 change: 1 addition & 0 deletions test/unit/channel_state.js
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ describe('ChannelState clean', () => {
client.userID = 'observer';
channel = new Channel(client, 'live', 'stream', {});
client.activeChannels[channel.cid] = channel;
channel.initialized = true;
});

it('should remove any stale typing events with either string or Date received_at', async () => {
Expand Down
Loading