diff --git a/packages/stream_chat/CHANGELOG.md b/packages/stream_chat/CHANGELOG.md index 6b73d0abc..c1809c1cf 100644 --- a/packages/stream_chat/CHANGELOG.md +++ b/packages/stream_chat/CHANGELOG.md @@ -1,3 +1,9 @@ +## Upcoming + +🐞 Fixed + +- Only listen to client events when the user is connected to the websocket. + ## 4.5.0 🐞 Fixed diff --git a/packages/stream_chat/lib/src/client/client.dart b/packages/stream_chat/lib/src/client/client.dart index c29503771..79443394d 100644 --- a/packages/stream_chat/lib/src/client/client.dart +++ b/packages/stream_chat/lib/src/client/client.dart @@ -397,6 +397,10 @@ class StreamChatClient { user, includeUserDetails: includeUserDetailsInConnectCall, ); + + // Start listening to events + state.subscribeToEvents(); + return user.merge(event.me); } catch (e, stk) { logger.severe('error connecting ws', e, stk); @@ -418,6 +422,9 @@ class StreamChatClient { _connectionStatusSubscription?.cancel(); _connectionStatusSubscription = null; + // Stop listening to events + state.cancelEventSubscription(); + _ws.disconnect(); } @@ -1471,29 +1478,40 @@ class StreamChatClient { /// The class that handles the state of the channel listening to the events class ClientState { /// Creates a new instance listening to events and updating the state - ClientState(this._client) { - _subscriptions.addAll([ - _client + ClientState(this._client); + + CompositeSubscription? _eventsSubscription; + + /// Starts listening to the client events. + void subscribeToEvents() { + if (_eventsSubscription != null) { + cancelEventSubscription(); + } + + _eventsSubscription = CompositeSubscription(); + _eventsSubscription! + ..add(_client .on() .where((event) => event.me != null && event.type != EventType.healthCheck) .map((e) => e.me!) - .listen((user) => currentUser = currentUser?.merge(user) ?? user), - _client + .listen((user) { + currentUser = currentUser?.merge(user) ?? user; + })) + ..add(_client .on() .map((event) => event.unreadChannels) .whereType() .listen((count) { currentUser = currentUser?.copyWith(unreadChannels: count); - }), - _client + })) + ..add(_client .on() .map((event) => event.totalUnreadCount) .whereType() .listen((count) { currentUser = currentUser?.copyWith(totalUnreadCount: count); - }), - ]); + })); _listenChannelDeleted(); @@ -1504,56 +1522,73 @@ class ClientState { _listenAllChannelsRead(); } - final _subscriptions = []; + /// Stops listening to the client events. + void cancelEventSubscription() { + if (_eventsSubscription != null) { + _eventsSubscription!.cancel(); + _eventsSubscription = null; + } + } + + /// Pauses listening to the client events. + void pauseEventSubscription([Future? resumeSignal]) { + _eventsSubscription?.pause(resumeSignal); + } - /// Used internally for optimistic update of unread count - set totalUnreadCount(int unreadCount) { - _totalUnreadCountController.add(unreadCount); + /// Resumes listening to the client events. + void resumeEventSubscription() { + _eventsSubscription?.resume(); } void _listenChannelHidden() { - _subscriptions - .add(_client.on(EventType.channelHidden).listen((event) async { - final eventChannel = event.channel!; - await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]); - channels[eventChannel.cid]?.dispose(); - channels = channels..remove(eventChannel.cid); - })); + _eventsSubscription?.add( + _client.on(EventType.channelHidden).listen((event) async { + final eventChannel = event.channel!; + await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]); + channels[eventChannel.cid]?.dispose(); + channels = channels..remove(eventChannel.cid); + }), + ); } void _listenUserUpdated() { - _subscriptions.add(_client.on(EventType.userUpdated).listen((event) { - if (event.user!.id == currentUser!.id) { - currentUser = OwnUser.fromJson(event.user!.toJson()); - } - updateUser(event.user); - })); + _eventsSubscription?.add( + _client.on(EventType.userUpdated).listen((event) { + if (event.user!.id == currentUser!.id) { + currentUser = OwnUser.fromJson(event.user!.toJson()); + } + updateUser(event.user); + }), + ); } void _listenAllChannelsRead() { - _subscriptions - .add(_client.on(EventType.notificationMarkRead).listen((event) { - if (event.cid == null) { - channels.forEach((key, value) { - value.state?.unreadCount = 0; - }); - } - })); + _eventsSubscription?.add( + _client.on(EventType.notificationMarkRead).listen((event) { + if (event.cid == null) { + channels.forEach((key, value) { + value.state?.unreadCount = 0; + }); + } + }), + ); } void _listenChannelDeleted() { - _subscriptions.add(_client - .on( - EventType.channelDeleted, - EventType.notificationRemovedFromChannel, - EventType.notificationChannelDeleted, - ) - .listen((Event event) async { - final eventChannel = event.channel!; - await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]); - channels[eventChannel.cid]?.dispose(); - channels = channels..remove(eventChannel.cid); - })); + _eventsSubscription?.add( + _client + .on( + EventType.channelDeleted, + EventType.notificationRemovedFromChannel, + EventType.notificationChannelDeleted, + ) + .listen((Event event) async { + final eventChannel = event.channel!; + await _client.chatPersistenceClient?.deleteChannels([eventChannel.cid]); + channels[eventChannel.cid]?.dispose(); + channels = channels..remove(eventChannel.cid); + }), + ); } final StreamChatClient _client; @@ -1615,6 +1650,11 @@ class ClientState { _channelsController.add(newChannels); } + /// Used internally for optimistic update of unread count + set totalUnreadCount(int unreadCount) { + _totalUnreadCountController.add(unreadCount); + } + void _computeUnreadCounts(OwnUser? user) { final totalUnreadCount = user?.totalUnreadCount; if (totalUnreadCount != null) { @@ -1635,7 +1675,7 @@ class ClientState { /// Call this method to dispose this object void dispose() { - _subscriptions.forEach((s) => s.cancel()); + cancelEventSubscription(); _currentUserController.close(); _unreadChannelsController.close(); _totalUnreadCountController.close();