Skip to content

Commit

Permalink
fix: throttle mark read API requests
Browse files Browse the repository at this point in the history
  • Loading branch information
szuperaz committed Aug 27, 2024
1 parent 164b676 commit 4873621
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 6 deletions.
85 changes: 84 additions & 1 deletion projects/stream-chat-angular/src/lib/channel.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first, take } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -208,6 +208,7 @@ describe('ChannelService', () => {
events$.next({ eventType: 'connection.recovered' } as ClientEvent);

tick();
flush();

expect(spy).toHaveBeenCalledWith(channels);
expect(activeChannelSpy).toHaveBeenCalledWith(channels[0]);
Expand Down Expand Up @@ -577,6 +578,10 @@ describe('ChannelService', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});
const spy = jasmine.createSpy();
service.activeChannelMessages$.subscribe(spy);
const prevCount = (spy.calls.mostRecent().args[0] as Channel[]).length;
Expand Down Expand Up @@ -991,6 +996,7 @@ describe('ChannelService', () => {

it('should add the new channel to the top of the list, and start watching it, if user is added to a channel', fakeAsync(async () => {
await init();
flush();
const newChannel = generateMockChannels()[0];
newChannel.cid = 'newchannel';
newChannel.id = 'newchannel';
Expand Down Expand Up @@ -1030,6 +1036,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];
const firstChannel = channels[0];
Expand Down Expand Up @@ -1059,6 +1066,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];

Expand Down Expand Up @@ -2242,6 +2250,7 @@ describe('ChannelService', () => {

it('should relaod active channel if active channel is not present after state reconnect', fakeAsync(async () => {
await init();
flush();
let activeChannel!: Channel<DefaultStreamChatGenerics>;
service.activeChannel$.subscribe((c) => (activeChannel = c!));
let channels!: Channel<DefaultStreamChatGenerics>[];
Expand All @@ -2251,6 +2260,7 @@ describe('ChannelService', () => {
mockChatClient.queryChannels.and.resolveTo(channels);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();
const spy = jasmine.createSpy();
service.activeChannel$.subscribe(spy);

Expand All @@ -2276,6 +2286,7 @@ describe('ChannelService', () => {
activeChannel.state.messages.push(newMessage);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).not.toHaveBeenCalled();
expect(service.deselectActiveChannel).not.toHaveBeenCalled();
Expand Down Expand Up @@ -2639,4 +2650,76 @@ describe('ChannelService', () => {
expect(customQuery).toHaveBeenCalledWith('next-page');
expect(hasMoreSpy).toHaveBeenCalledWith(false);
});

it('should throttle mark read API calls', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - channel change', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.setAsActiveChannel(service.channels[1]);

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - reset', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.reset();

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -235,6 +235,7 @@ describe('ChannelService - threads', () => {
spy.calls.reset();
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).toHaveBeenCalledWith(undefined);
}));
Expand Down Expand Up @@ -314,6 +315,10 @@ describe('ChannelService - threads', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});
const spy = jasmine.createSpy();
const parentMessage = mockMessage();
await service.setAsActiveParentMessage(parentMessage);
Expand Down
38 changes: 35 additions & 3 deletions projects/stream-chat-angular/src/lib/channel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ export class ChannelService<
};
private dismissErrorNotification?: () => void;
private areReadEventsPaused = false;
private markReadThrottleTime = 1050;
private markReadTimeout?: ReturnType<typeof setTimeout>;
private scheduledMarkReadRequest?: () => void;

constructor(
private chatClientService: ChatClientService<T>,
Expand Down Expand Up @@ -563,6 +566,7 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(prevActiveChannel);
this.flushMarkReadQueue();
this.areReadEventsPaused = false;
const readState =
channel.state.read[this.chatClientService.chatClient.user?.id || ''];
Expand Down Expand Up @@ -595,6 +599,7 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(activeChannel);
this.flushMarkReadQueue();
this.activeChannelMessagesSubject.next([]);
this.activeChannelSubject.next(undefined);
this.activeParentMessageIdSubject.next(undefined);
Expand Down Expand Up @@ -2220,16 +2225,43 @@ export class ChannelService<
this.usersTypingInThreadSubject.next([]);
}

private markRead(channel: Channel<T>) {
private markRead(channel: Channel<T>, isThrottled = true) {
if (
this.canSendReadEvents &&
this.shouldMarkActiveChannelAsRead &&
!this.areReadEventsPaused
!this.areReadEventsPaused &&
channel.countUnread() > 0
) {
void channel.markRead();
if (isThrottled) {
this.markReadThrottled(channel);
} else {
void channel.markRead();
}
}
}

private markReadThrottled(channel: Channel<T>) {
if (!this.markReadTimeout) {
this.markRead(channel, false);
this.markReadTimeout = setTimeout(() => {
this.flushMarkReadQueue();
}, this.markReadThrottleTime);
} else {
clearTimeout(this.markReadTimeout);
this.scheduledMarkReadRequest = () => this.markRead(channel, false);
this.markReadTimeout = setTimeout(() => {
this.flushMarkReadQueue();
}, this.markReadThrottleTime);
}
}

private flushMarkReadQueue() {
this.scheduledMarkReadRequest?.();
this.scheduledMarkReadRequest = undefined;
clearTimeout(this.markReadTimeout);
this.markReadTimeout = undefined;
}

private async _init(settings: {
shouldSetActiveChannel: boolean;
messagePageSize: number;
Expand Down
2 changes: 1 addition & 1 deletion projects/stream-chat-angular/src/lib/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export const generateMockChannels = (length = 25) => {
sendAction: () => {},
deleteImage: () => {},
deleteFile: () => {},
countUnread: () => {},
countUnread: () => 3,
markRead: () => {},
getReplies: () => {},
keystroke: () => {},
Expand Down

0 comments on commit 4873621

Please sign in to comment.