Skip to content

Commit

Permalink
fix: thread manager bugs (#1372)
Browse files Browse the repository at this point in the history
Co-authored-by: Anton Arnautov <[email protected]>
  • Loading branch information
isekovanic and arnautov-anton authored Nov 11, 2024
1 parent 767d9d7 commit 7435a58
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,8 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
this.activeChannels = {};
// reset client state
this.state = new ClientState();
// reset thread manager
this.threads.resetState();
// reset token manager
setTimeout(this.tokenManager.reset); // delay reseting to use token for disconnect calls

Expand Down
43 changes: 29 additions & 14 deletions src/thread_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ import type { DefaultGenerics, Event, ExtendableGenerics, OwnUserResponse, Query

const DEFAULT_CONNECTION_RECOVERY_THROTTLE_DURATION = 1000;
const MAX_QUERY_THREADS_LIMIT = 25;
export const THREAD_MANAGER_INITIAL_STATE = {
active: false,
isThreadOrderStale: false,
threads: [],
unreadThreadCount: 0,
unseenThreadIds: [],
lastConnectionDropAt: null,
pagination: {
isLoading: false,
isLoadingNext: false,
nextCursor: null,
},
ready: false,
};

export type ThreadManagerState<SCG extends ExtendableGenerics = DefaultGenerics> = {
active: boolean;
Expand Down Expand Up @@ -40,20 +54,7 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {

constructor({ client }: { client: StreamChat<SCG> }) {
this.client = client;
this.state = new StateStore<ThreadManagerState<SCG>>({
active: false,
isThreadOrderStale: false,
threads: [],
unreadThreadCount: 0,
unseenThreadIds: [],
lastConnectionDropAt: null,
pagination: {
isLoading: false,
isLoadingNext: false,
nextCursor: null,
},
ready: false,
});
this.state = new StateStore<ThreadManagerState<SCG>>(THREAD_MANAGER_INITIAL_STATE);

this.threadsByIdGetterCache = { threads: [], threadsById: {} };
}
Expand All @@ -76,6 +77,10 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
return threadsById;
}

public resetState = () => {
this.state.next(THREAD_MANAGER_INITIAL_STATE);
};

public activate = () => {
this.state.partialNext({ active: true });
};
Expand All @@ -92,6 +97,7 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
this.unsubscribeFunctions.add(this.subscribeReloadOnActivation());
this.unsubscribeFunctions.add(this.subscribeNewReplies());
this.unsubscribeFunctions.add(this.subscribeRecoverAfterConnectionDrop());
this.unsubscribeFunctions.add(this.subscribeChannelDeleted());
};

private subscribeUnreadThreadsCountChange = () => {
Expand All @@ -117,6 +123,15 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
return () => unsubscribeFunctions.forEach((unsubscribe) => unsubscribe());
};

private subscribeChannelDeleted = () =>
this.client.on('notification.channel_deleted', (event) => {
const { cid } = event;
const { threads } = this.state.getLatestValue();

const newThreads = threads.filter((thread) => thread.channel.cid !== cid);
this.state.partialNext({ threads: newThreads });
}).unsubscribe;

private subscribeManageThreadSubscriptions = () =>
this.state.subscribeWithSelector(
(nextValue) => ({ threads: nextValue.threads }),
Expand Down
6 changes: 3 additions & 3 deletions test/unit/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ describe('connection', function () {
const device = { id: 'device_id', push_provider: 'firebase' };
const client = newStreamChat();
client.options.device = device;
client.wsBaseURL = 'https://url.com';
client.wsBaseURL = 'https://stream-dummy-test.com';
const ws = new StableWSConnection({ client });

it('should create the correct url', function () {
const { host, pathname, query } = url.parse(ws._buildUrl(), true);

expect(host).to.be.eq('url.com');
expect(host).to.be.eq('stream-dummy-test.com');
expect(pathname).to.be.eq('/connect');
expect(query['api_key']).to.be.eq('key');
expect(query['stream-auth-type']).to.be.eq('jwt');
Expand Down Expand Up @@ -145,7 +145,7 @@ describe('connection', function () {

it('should set and unset the flag correctly without opening WS', async () => {
const client = newStreamChat();
client.wsBaseURL = 'https://url.com';
client.wsBaseURL = 'https://stream-dummy-test.com';
const c = new StableWSConnection({ client });

expect(c.isConnecting).to.be.false;
Expand Down
57 changes: 57 additions & 0 deletions test/unit/threads.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { v4 as uuidv4 } from 'uuid';
import { generateChannel } from './test-utils/generateChannel';
import { generateMsg } from './test-utils/generateMessage';
import { generateThreadResponse } from './test-utils/generateThreadResponse';
import { getClientWithUser } from './test-utils/getClient';

import sinon from 'sinon';
import {
Expand All @@ -14,6 +15,7 @@ import {
Thread,
ThreadManager,
ThreadResponse,
THREAD_MANAGER_INITIAL_STATE,
} from '../../src';

const TEST_USER_ID = 'observer';
Expand Down Expand Up @@ -927,6 +929,37 @@ describe('Threads 2.0', () => {
expect(state.pagination.nextCursor).to.be.null;
});

describe('resetState', () => {
it('resets the state properly', async () => {
threadManager.state.partialNext({
threads: [createTestThread(), createTestThread()],
unseenThreadIds: ['1', '2'],
});
threadManager.registerSubscriptions();
expect(threadManager.state.getLatestValue().threads).to.have.lengthOf(2);
expect(threadManager.state.getLatestValue().unseenThreadIds).to.have.lengthOf(2);
threadManager.resetState();
expect(threadManager.state.getLatestValue()).to.be.deep.equal(THREAD_MANAGER_INITIAL_STATE);
});
});

it('resets the thread state on disconnect', async () => {
const clientWithUser = await getClientWithUser({ id: 'user1' });
const thread = createTestThread();
clientWithUser.threads.state.partialNext({ ready: true, threads: [thread] });
clientWithUser.threads.registerSubscriptions();

const { threads, unseenThreadIds } = clientWithUser.threads.state.getLatestValue();

expect(threads).to.deep.equal([thread]);
expect(unseenThreadIds.length).to.equal(0);

await clientWithUser.disconnectUser();

expect(clientWithUser.threads.state.getLatestValue().threads).to.have.lengthOf(0);
expect(clientWithUser.threads.state.getLatestValue().unseenThreadIds).to.have.lengthOf(0);
});

describe('Subscription and Event Handlers', () => {
beforeEach(() => {
threadManager.registerSubscriptions();
Expand Down Expand Up @@ -954,6 +987,30 @@ describe('Threads 2.0', () => {
});
});

it('removes threads from the state if their channel got deleted', () => {
const thread = createTestThread();
const toBeRemoved = [
createTestThread({ channelOverrides: { id: 'channel1' } }),
createTestThread({ channelOverrides: { id: 'channel1' } }),
createTestThread({ channelOverrides: { id: 'channel2' } }),
];
threadManager.state.partialNext({ threads: [thread, ...toBeRemoved] });

expect(threadManager.state.getLatestValue().threads).to.have.lengthOf(4);

client.dispatchEvent({
type: 'notification.channel_deleted',
cid: 'messaging:channel1',
});

client.dispatchEvent({
type: 'notification.channel_deleted',
cid: 'messaging:channel2',
});

expect(threadManager.state.getLatestValue().threads).to.deep.equal([thread]);
});

describe('Event: notification.thread_message_new', () => {
it('ignores notification.thread_message_new before anything was loaded', () => {
client.dispatchEvent({
Expand Down

0 comments on commit 7435a58

Please sign in to comment.