From c0c6dafd17a28cddb85cf317969f0f37f85efa5c Mon Sep 17 00:00:00 2001 From: Keita Watanabe Date: Thu, 25 Jul 2024 17:28:51 +0900 Subject: [PATCH] replace to async method for URLSessionWebSocketTask.receive() --- IceCubesApp/App/Main/IceCubesApp+Scene.swift | 4 +- IceCubesApp/App/Main/IceCubesApp.swift | 10 +-- Packages/Env/Sources/Env/StreamWatcher.swift | 80 +++++++++----------- 3 files changed, 45 insertions(+), 49 deletions(-) diff --git a/IceCubesApp/App/Main/IceCubesApp+Scene.swift b/IceCubesApp/App/Main/IceCubesApp+Scene.swift index 799d9b291..0e18b61fa 100644 --- a/IceCubesApp/App/Main/IceCubesApp+Scene.swift +++ b/IceCubesApp/App/Main/IceCubesApp+Scene.swift @@ -71,7 +71,9 @@ extension IceCubesApp { .onChange(of: appAccountsManager.currentClient) { _, newValue in setNewClientsInEnv(client: newValue) if newValue.isAuth { - watcher.watch(streams: [.user, .direct]) + Task { + await watcher.watch(streams: [.user, .direct]) + } } } } diff --git a/IceCubesApp/App/Main/IceCubesApp.swift b/IceCubesApp/App/Main/IceCubesApp.swift index cc64b64d8..bf31bed62 100644 --- a/IceCubesApp/App/Main/IceCubesApp.swift +++ b/IceCubesApp/App/Main/IceCubesApp.swift @@ -44,8 +44,8 @@ struct IceCubesApp: App { userPreferences.setClient(client: client) Task { await currentInstance.fetchCurrentInstance() - watcher.setClient(client: client, instanceStreamingURL: currentInstance.instance?.urls?.streamingApi) - watcher.watch(streams: [.user, .direct]) + await watcher.setClient(client: client, instanceStreamingURL: currentInstance.instance?.urls?.streamingApi) + await watcher.watch(streams: [.user, .direct]) } } @@ -54,10 +54,10 @@ struct IceCubesApp: App { case .background: watcher.stopWatching() case .active: - watcher.watch(streams: [.user, .direct]) - UNUserNotificationCenter.current().setBadgeCount(0) - userPreferences.reloadNotificationsCount(tokens: appAccountsManager.availableAccounts.compactMap(\.oauthToken)) Task { + await watcher.watch(streams: [.user, .direct]) + try? await UNUserNotificationCenter.current().setBadgeCount(0) + userPreferences.reloadNotificationsCount(tokens: appAccountsManager.availableAccounts.compactMap(\.oauthToken)) await userPreferences.refreshServerPreferences() } default: diff --git a/Packages/Env/Sources/Env/StreamWatcher.swift b/Packages/Env/Sources/Env/StreamWatcher.swift index f8690bab5..15e8f33c5 100644 --- a/Packages/Env/Sources/Env/StreamWatcher.swift +++ b/Packages/Env/Sources/Env/StreamWatcher.swift @@ -36,16 +36,16 @@ import OSLog decoder.keyDecodingStrategy = .convertFromSnakeCase } - public func setClient(client: Client, instanceStreamingURL: URL?) { + public func setClient(client: Client, instanceStreamingURL: URL?) async { if self.client != nil { stopWatching() } self.client = client self.instanceStreamingURL = instanceStreamingURL - connect() + await connect() } - private func connect() { + private func connect() async { guard let task = try? client?.makeWebSocketTask( endpoint: Streaming.streaming, instanceStreamingURL: instanceStreamingURL @@ -54,15 +54,15 @@ import OSLog } self.task = task self.task?.resume() - receiveMessage() + await receiveMessage() } - public func watch(streams: [Stream]) { + public func watch(streams: [Stream]) async { if client?.isAuth == false { return } if task == nil { - connect() + await connect() } watchedStreams = streams for stream in streams { @@ -83,48 +83,42 @@ import OSLog } } - private func receiveMessage() { - task?.receive(completionHandler: { [weak self] result in - guard let self else { return } - switch result { - case let .success(message): - switch message { - case let .string(string): - do { - guard let data = string.data(using: .utf8) else { - logger.error("Error decoding streaming event string") - return - } - let rawEvent = try decoder.decode(RawStreamEvent.self, from: data) - logger.info("Stream update: \(rawEvent.event)") - if let event = rawEventToEvent(rawEvent: rawEvent) { - Task { @MainActor in - self.events.append(event) - self.latestEvent = event - if let event = event as? StreamEventNotification, event.notification.status?.visibility != .direct { - self.unreadNotificationsCount += 1 - } - } + private func receiveMessage() async { + do { + guard let message = try await task?.receive() else { return } + + switch message { + case let .string(string): + do { + guard let data = string.data(using: .utf8) else { + logger.error("Error decoding streaming event string") + return + } + let rawEvent = try decoder.decode(RawStreamEvent.self, from: data) + logger.info("Stream update: \(rawEvent.event)") + if let event = rawEventToEvent(rawEvent: rawEvent) { + events.append(event) + latestEvent = event + if let event = event as? StreamEventNotification, event.notification.status?.visibility != .direct { + unreadNotificationsCount += 1 } - } catch { - logger.error("Error decoding streaming event: \(error.localizedDescription)") } - - default: - break + } catch { + logger.error("Error decoding streaming event: \(error.localizedDescription)") } - receiveMessage() - - case .failure: - DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(retryDelay)) { - self.retryDelay += 30 - self.stopWatching() - self.connect() - self.watch(streams: self.watchedStreams) - } + default: + break } - }) + + await receiveMessage() + } catch { + try? await Task.sleep(nanoseconds: UInt64(retryDelay * 1000 * 1000 * 1000)) + retryDelay += 30 + stopWatching() + await connect() + await watch(streams: watchedStreams) + } } private func rawEventToEvent(rawEvent: RawStreamEvent) -> (any StreamEvent)? {