Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: thread manager bugs #1372

Merged
merged 8 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,8 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
this.activeChannels = {};
// reset client state
this.state = new ClientState();
// reset thread manager
this.threads.resetState();
// reset token manager
setTimeout(this.tokenManager.reset); // delay reseting to use token for disconnect calls

Expand Down
44 changes: 30 additions & 14 deletions src/thread_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ import type { DefaultGenerics, Event, ExtendableGenerics, OwnUserResponse, Query

const DEFAULT_CONNECTION_RECOVERY_THROTTLE_DURATION = 1000;
const MAX_QUERY_THREADS_LIMIT = 25;
export const INITIAL_STATE = {
isekovanic marked this conversation as resolved.
Show resolved Hide resolved
active: false,
isThreadOrderStale: false,
threads: [],
unreadThreadCount: 0,
unseenThreadIds: [],
lastConnectionDropAt: null,
pagination: {
isLoading: false,
isLoadingNext: false,
nextCursor: null,
},
ready: false,
};

export type ThreadManagerState<SCG extends ExtendableGenerics = DefaultGenerics> = {
active: boolean;
Expand Down Expand Up @@ -40,20 +54,7 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {

constructor({ client }: { client: StreamChat<SCG> }) {
this.client = client;
this.state = new StateStore<ThreadManagerState<SCG>>({
active: false,
isThreadOrderStale: false,
threads: [],
unreadThreadCount: 0,
unseenThreadIds: [],
lastConnectionDropAt: null,
pagination: {
isLoading: false,
isLoadingNext: false,
nextCursor: null,
},
ready: false,
});
this.state = new StateStore<ThreadManagerState<SCG>>(INITIAL_STATE);

this.threadsByIdGetterCache = { threads: [], threadsById: {} };
}
Expand All @@ -76,6 +77,11 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
return threadsById;
}

public resetState = () => {
this.unregisterSubscriptions();
isekovanic marked this conversation as resolved.
Show resolved Hide resolved
this.state.next(INITIAL_STATE);
};

public activate = () => {
this.state.partialNext({ active: true });
};
Expand All @@ -92,6 +98,7 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
this.unsubscribeFunctions.add(this.subscribeReloadOnActivation());
this.unsubscribeFunctions.add(this.subscribeNewReplies());
this.unsubscribeFunctions.add(this.subscribeRecoverAfterConnectionDrop());
this.unsubscribeFunctions.add(this.subscribeChannelDeleted());
};

private subscribeUnreadThreadsCountChange = () => {
Expand All @@ -117,6 +124,15 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
return () => unsubscribeFunctions.forEach((unsubscribe) => unsubscribe());
};

private subscribeChannelDeleted = () =>
this.client.on('notification.channel_deleted', (event) => {
const { cid } = event;
const { threads } = this.state.getLatestValue();

const newThreads = threads.filter((thread) => thread.channel.cid !== cid);
this.state.partialNext({ threads: newThreads });
}).unsubscribe;

private subscribeManageThreadSubscriptions = () =>
this.state.subscribeWithSelector(
(nextValue) => [nextValue.threads] as const,
Expand Down
6 changes: 3 additions & 3 deletions test/unit/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ describe('connection', function () {
const device = { id: 'device_id', push_provider: 'firebase' };
const client = newStreamChat();
client.options.device = device;
client.wsBaseURL = 'https://url.com';
client.wsBaseURL = 'https://stream-dummy-test.com';
const ws = new StableWSConnection({ client });

it('should create the correct url', function () {
const { host, pathname, query } = url.parse(ws._buildUrl(), true);

expect(host).to.be.eq('url.com');
expect(host).to.be.eq('stream-dummy-test.com');
expect(pathname).to.be.eq('/connect');
expect(query['api_key']).to.be.eq('key');
expect(query['stream-auth-type']).to.be.eq('jwt');
Expand Down Expand Up @@ -145,7 +145,7 @@ describe('connection', function () {

it('should set and unset the flag correctly without opening WS', async () => {
const client = newStreamChat();
client.wsBaseURL = 'https://url.com';
client.wsBaseURL = 'https://stream-dummy-test.com';
const c = new StableWSConnection({ client });

expect(c.isConnecting).to.be.false;
Expand Down
59 changes: 59 additions & 0 deletions test/unit/threads.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { v4 as uuidv4 } from 'uuid';
import { generateChannel } from './test-utils/generateChannel';
import { generateMsg } from './test-utils/generateMessage';
import { generateThreadResponse } from './test-utils/generateThreadResponse';
import { getClientWithUser } from './test-utils/getClient';

import sinon from 'sinon';
import {
Expand All @@ -14,6 +15,7 @@ import {
Thread,
ThreadManager,
ThreadResponse,
INITIAL_STATE as THREAD_MANAGER_INITIAL_STATE,
} from '../../src';

const TEST_USER_ID = 'observer';
Expand Down Expand Up @@ -927,6 +929,39 @@ describe('Threads 2.0', () => {
expect(state.pagination.nextCursor).to.be.null;
});

describe('resetState', () => {
it('resets the state properly', async () => {
threadManager.state.partialNext({
threads: [createTestThread(), createTestThread()],
unseenThreadIds: ['1', '2'],
});
threadManager.registerSubscriptions();
expect(threadManager.state.getLatestValue().threads).to.have.lengthOf(2);
expect(threadManager.state.getLatestValue().unseenThreadIds).to.have.lengthOf(2);
threadManager.resetState();
expect(threadManager.state.getLatestValue()).to.be.deep.equal(THREAD_MANAGER_INITIAL_STATE);
});
});

it('resets the thread state on disconnect', async () => {
const clientWithUser = await getClientWithUser({ id: 'user1' });
const thread = createTestThread();
clientWithUser.threads.state.partialNext({ ready: true, threads: [thread] });
clientWithUser.threads.registerSubscriptions();

const { threads, unseenThreadIds } = clientWithUser.threads.state.getLatestValue();

expect(threads).to.deep.equal([thread]);
expect(unseenThreadIds.length).to.equal(0);
expect(clientWithUser.threads.unsubscribeFunctions.size).to.be.greaterThan(0);

await clientWithUser.disconnectUser();

expect(clientWithUser.threads.state.getLatestValue().threads).to.have.lengthOf(0);
expect(clientWithUser.threads.state.getLatestValue().unseenThreadIds).to.have.lengthOf(0);
expect(clientWithUser.threads.unsubscribeFunctions.size).to.be.equal(0);
});

describe('Subscription and Event Handlers', () => {
beforeEach(() => {
threadManager.registerSubscriptions();
Expand Down Expand Up @@ -954,6 +989,30 @@ describe('Threads 2.0', () => {
});
});

it('removes threads from the state if their channel got deleted', () => {
const thread = createTestThread();
const toBeRemoved = [
createTestThread({ channelOverrides: { id: 'channel1' } }),
createTestThread({ channelOverrides: { id: 'channel1' } }),
createTestThread({ channelOverrides: { id: 'channel2' } }),
];
threadManager.state.partialNext({ threads: [thread, ...toBeRemoved] });

expect(threadManager.state.getLatestValue().threads).to.have.lengthOf(4);

client.dispatchEvent({
type: 'notification.channel_deleted',
cid: 'messaging:channel1',
});

client.dispatchEvent({
type: 'notification.channel_deleted',
cid: 'messaging:channel2',
});

expect(threadManager.state.getLatestValue().threads).to.deep.equal([thread]);
});

describe('Event: notification.thread_message_new', () => {
it('ignores notification.thread_message_new before anything was loaded', () => {
client.dispatchEvent({
Expand Down
Loading