Skip to content

Commit

Permalink
Merge pull request #1321 from GetStream/fix/client-subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
xsahil03x authored Aug 29, 2022
2 parents ceab917 + 31f9292 commit 4532a0d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 47 deletions.
6 changes: 6 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Upcoming

🐞 Fixed

- Only listen to client events when the user is connected to the websocket.

## 4.5.0

🐞 Fixed
Expand Down
134 changes: 87 additions & 47 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@ class StreamChatClient {
user,
includeUserDetails: includeUserDetailsInConnectCall,
);

// Start listening to events
state.subscribeToEvents();

return user.merge(event.me);
} catch (e, stk) {
logger.severe('error connecting ws', e, stk);
Expand All @@ -418,6 +422,9 @@ class StreamChatClient {
_connectionStatusSubscription?.cancel();
_connectionStatusSubscription = null;

// Stop listening to events
state.cancelEventSubscription();

_ws.disconnect();
}

Expand Down Expand Up @@ -1471,29 +1478,40 @@ class StreamChatClient {
/// The class that handles the state of the channel listening to the events
class ClientState {
/// Creates a new instance listening to events and updating the state
ClientState(this._client) {
_subscriptions.addAll([
_client
ClientState(this._client);

CompositeSubscription? _eventsSubscription;

/// Starts listening to the client events.
void subscribeToEvents() {
if (_eventsSubscription != null) {
cancelEventSubscription();
}

_eventsSubscription = CompositeSubscription();
_eventsSubscription!
..add(_client
.on()
.where((event) =>
event.me != null && event.type != EventType.healthCheck)
.map((e) => e.me!)
.listen((user) => currentUser = currentUser?.merge(user) ?? user),
_client
.listen((user) {
currentUser = currentUser?.merge(user) ?? user;
}))
..add(_client
.on()
.map((event) => event.unreadChannels)
.whereType<int>()
.listen((count) {
currentUser = currentUser?.copyWith(unreadChannels: count);
}),
_client
}))
..add(_client
.on()
.map((event) => event.totalUnreadCount)
.whereType<int>()
.listen((count) {
currentUser = currentUser?.copyWith(totalUnreadCount: count);
}),
]);
}));

_listenChannelDeleted();

Expand All @@ -1504,56 +1522,73 @@ class ClientState {
_listenAllChannelsRead();
}

final _subscriptions = <StreamSubscription>[];
/// Stops listening to the client events.
void cancelEventSubscription() {
if (_eventsSubscription != null) {
_eventsSubscription!.cancel();
_eventsSubscription = null;
}
}

/// Pauses listening to the client events.
void pauseEventSubscription([Future<void>? resumeSignal]) {
_eventsSubscription?.pause(resumeSignal);
}

/// Used internally for optimistic update of unread count
set totalUnreadCount(int unreadCount) {
_totalUnreadCountController.add(unreadCount);
/// Resumes listening to the client events.
void resumeEventSubscription() {
_eventsSubscription?.resume();
}

void _listenChannelHidden() {
_subscriptions
.add(_client.on(EventType.channelHidden).listen((event) async {
final eventChannel = event.channel!;
await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]);
channels[eventChannel.cid]?.dispose();
channels = channels..remove(eventChannel.cid);
}));
_eventsSubscription?.add(
_client.on(EventType.channelHidden).listen((event) async {
final eventChannel = event.channel!;
await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]);
channels[eventChannel.cid]?.dispose();
channels = channels..remove(eventChannel.cid);
}),
);
}

void _listenUserUpdated() {
_subscriptions.add(_client.on(EventType.userUpdated).listen((event) {
if (event.user!.id == currentUser!.id) {
currentUser = OwnUser.fromJson(event.user!.toJson());
}
updateUser(event.user);
}));
_eventsSubscription?.add(
_client.on(EventType.userUpdated).listen((event) {
if (event.user!.id == currentUser!.id) {
currentUser = OwnUser.fromJson(event.user!.toJson());
}
updateUser(event.user);
}),
);
}

void _listenAllChannelsRead() {
_subscriptions
.add(_client.on(EventType.notificationMarkRead).listen((event) {
if (event.cid == null) {
channels.forEach((key, value) {
value.state?.unreadCount = 0;
});
}
}));
_eventsSubscription?.add(
_client.on(EventType.notificationMarkRead).listen((event) {
if (event.cid == null) {
channels.forEach((key, value) {
value.state?.unreadCount = 0;
});
}
}),
);
}

void _listenChannelDeleted() {
_subscriptions.add(_client
.on(
EventType.channelDeleted,
EventType.notificationRemovedFromChannel,
EventType.notificationChannelDeleted,
)
.listen((Event event) async {
final eventChannel = event.channel!;
await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]);
channels[eventChannel.cid]?.dispose();
channels = channels..remove(eventChannel.cid);
}));
_eventsSubscription?.add(
_client
.on(
EventType.channelDeleted,
EventType.notificationRemovedFromChannel,
EventType.notificationChannelDeleted,
)
.listen((Event event) async {
final eventChannel = event.channel!;
await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]);
channels[eventChannel.cid]?.dispose();
channels = channels..remove(eventChannel.cid);
}),
);
}

final StreamChatClient _client;
Expand Down Expand Up @@ -1615,6 +1650,11 @@ class ClientState {
_channelsController.add(newChannels);
}

/// Used internally for optimistic update of unread count
set totalUnreadCount(int unreadCount) {
_totalUnreadCountController.add(unreadCount);
}

void _computeUnreadCounts(OwnUser? user) {
final totalUnreadCount = user?.totalUnreadCount;
if (totalUnreadCount != null) {
Expand All @@ -1635,7 +1675,7 @@ class ClientState {

/// Call this method to dispose this object
void dispose() {
_subscriptions.forEach((s) => s.cancel());
cancelEventSubscription();
_currentUserController.close();
_unreadChannelsController.close();
_totalUnreadCountController.close();
Expand Down

0 comments on commit 4532a0d

Please sign in to comment.