Skip to content

Commit

Permalink
fix: queue channel WS events until the channel is initialized (#1179)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinCupela authored Oct 3, 2023
1 parent e50701b commit 2073579
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 2 deletions.
25 changes: 25 additions & 0 deletions src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
lastTypingEvent: Date | null;
isTyping: boolean;
disconnected: boolean;
/**
* Collects the incoming WS events before the channel is marked as initialized.
* This prevents executing procedures that depend on channel being initialized.
* Once the channel is marked as initialized the queue is flushed.
*/
wsEventQueue: Event<StreamChatGenerics>[];

/**
* constructor - Create a channel
Expand Down Expand Up @@ -132,6 +138,7 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
this.lastTypingEvent = null;
this.isTyping = false;
this.disconnected = false;
this.wsEventQueue = [];
}

/**
Expand Down Expand Up @@ -780,6 +787,7 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
const combined = { ...defaultOptions, ...options };
const state = await this.query(combined, 'latest');
this.initialized = true;
this._flushWsEventQueue();
this.data = state.channel;

this._client.logger('info', `channel:watch() - started watching channel ${this.cid}`, {
Expand Down Expand Up @@ -1207,6 +1215,12 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
// eslint-disable-next-line sonarjs/cognitive-complexity
_handleChannelEvent(event: Event<StreamChatGenerics>) {
const channel = this;

if (!this._isInitialized()) {
this.wsEventQueue.push(event);
return;
}

this._client.logger(
'info',
`channel:_handleChannelEvent - Received event of type { ${event.type} } on ${this.cid}`,
Expand Down Expand Up @@ -1442,6 +1456,10 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
return `${this.getClient().baseURL}/channels/${this.type}/${this.id}`;
};

_isInitialized() {
return this.initialized || this.offlineMode || this.getClient()._isUsingServerAuth();
}

_checkInitialized() {
if (!this.initialized && !this.offlineMode && !this.getClient()._isUsingServerAuth()) {
throw Error(
Expand Down Expand Up @@ -1555,4 +1573,11 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
this.disconnected = true;
this.state.setIsUpToDate(false);
}

_flushWsEventQueue() {
while (this.wsEventQueue.length) {
const event = this.wsEventQueue.shift();
if (event) this.getClient().dispatchEvent(event);
}
}
}
25 changes: 25 additions & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,28 @@ export const EVENT_MAP = {
'connection.recovered': true,
'transport.changed': true,
};

// events handled by channel._handleChannelEvent
export const CHANNEL_HANDLED_EVENTS = {
'typing.start': true,
'typing.stop': true,
'message.read': true,
'user.watching.start': true,
'user.updated': true,
'user.watching.stop': true,
'message.deleted': true,
'message.new': true,
'message.updated': true,
'channel.truncated': true,
'member.added': true,
'member.updated': true,
'member.removed': true,
'channel.updated': true,
'reaction.new': true,
'reaction.deleted': true,
'reaction.updated': true,
'channel.hidden': true,
'channel.visible': true,
'user.banned': true,
'user.unbanned': true,
};
74 changes: 72 additions & 2 deletions test/unit/channel.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import chai from 'chai';
import sinon from 'sinon';
import { v4 as uuidv4 } from 'uuid';

import { generateChannel } from './test-utils/generateChannel';
Expand All @@ -8,8 +9,7 @@ import { generateUser } from './test-utils/generateUser';
import { getClientWithUser } from './test-utils/getClient';
import { getOrCreateChannelApi } from './test-utils/getOrCreateChannelApi';

import { StreamChat } from '../../src/client';
import { ChannelState } from '../../src';
import { CHANNEL_HANDLED_EVENTS, ChannelState, StreamChat } from '../../src';

const expect = chai.expect;

Expand Down Expand Up @@ -497,6 +497,76 @@ describe('Channel _handleChannelEvent', function () {
expect(channel.state.members[user.id].shadow_banned).eq(expectAfterSecond.shadow_banned);
});
});

const eventTypes = Object.keys(CHANNEL_HANDLED_EVENTS);
const receiveAllChannelEvents = (channel) => {
eventTypes.forEach((type) => {
channel._handleChannelEvent({ type });
});
};

it('buffers WS events when uninitialized', () => {
channel.initialized = false;
channel.offlineMode = false;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.have.length(eventTypes.length);
});

it('does not buffer WS events when in offline mode', () => {
channel.initialized = false;
channel.offlineMode = true;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.be.empty;
});

it('does not buffer WS events with server-side client', () => {
client = new StreamChat('apiKey', 'secret');
client.user = user;
client.userID = user.id;
channel = client.channel('messaging', 'id');
channel.initialized = false;
channel.offlineMode = false;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.be.empty;
});

it('does not buffer WS events on initialized channel', () => {
channel.initialized = true;

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.be.empty;
});

it('buffers WS events and channel.watch() flushes upon channel initialization', async () => {
channel.initialized = false;
sinon.stub(client, 'doAxiosRequest').resolves({ channel: generateChannel(), members: [] });

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.have.length(eventTypes.length);
await channel.watch();
expect(channel.wsEventQueue).to.be.empty;
client.doAxiosRequest.restore();
});

it('buffers WS events and channel.query() does not flush the queue', async () => {
channel.initialized = false;
sinon.stub(client, 'doAxiosRequest').resolves({ channel: generateChannel(), members: [] });

receiveAllChannelEvents(channel);

expect(channel.wsEventQueue).to.have.length(eventTypes.length);
await channel.query();
expect(channel.wsEventQueue).to.have.length(eventTypes.length);
client.doAxiosRequest.restore();
});
});

describe('Channels - Constructor', function () {
Expand Down
1 change: 1 addition & 0 deletions test/unit/channel_state.js
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ describe('ChannelState clean', () => {
client.userID = 'observer';
channel = new Channel(client, 'live', 'stream', {});
client.activeChannels[channel.cid] = channel;
channel.initialized = true;
});

it('should remove any stale typing events with either string or Date received_at', async () => {
Expand Down

0 comments on commit 2073579

Please sign in to comment.