From 2d56245038070a42ee9c1b2c159d2c6bd0e7bb6b Mon Sep 17 00:00:00 2001 From: Nikita Konashenko Date: Mon, 14 Nov 2022 00:42:17 +0300 Subject: [PATCH 1/3] Add Canceling Previous Subscription in Sink --- Sources/Common/DemandBuffer.swift | 12 +++++++++++- Sources/Common/Sink.swift | 8 +++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/Sources/Common/DemandBuffer.swift b/Sources/Common/DemandBuffer.swift index 04be08a..b84d684 100644 --- a/Sources/Common/DemandBuffer.swift +++ b/Sources/Common/DemandBuffer.swift @@ -33,7 +33,17 @@ class DemandBuffer { init(subscriber: S) { self.subscriber = subscriber } - + + /// Signal to the buffer that the upstream has changed + /// - Returns: A demand that has already been requested from the previous upstream, but has not yet been processed + func attachToNewUpstream() -> Subscribers.Demand { + lock.lock() + defer { lock.unlock() } + + demandState.sent = demandState.requested + return demandState.requested - demandState.processed + } + /// Buffer an upstream value to later be forwarded to /// the downstream subscriber, once it demands it /// diff --git a/Sources/Common/Sink.swift b/Sources/Common/Sink.swift index 813ec5c..a2c4f2d 100644 --- a/Sources/Common/Sink.swift +++ b/Sources/Common/Sink.swift @@ -62,7 +62,13 @@ class Sink: Subscriber { } func receive(subscription: Subscription) { - upstreamSubscription = subscription + defer { upstreamSubscription = subscription } + + if let upstreamSubscription { + upstreamSubscription.cancel() + let newDemand = buffer.attachToNewUpstream() + subscription.requestIfNeeded(newDemand) + } } func receive(_ input: Upstream.Output) -> Subscribers.Demand { From 9724a18aaa88617edff1b04d85aabd0861ee4b22 Mon Sep 17 00:00:00 2001 From: Nikita Konashenko Date: Mon, 14 Nov 2022 00:43:34 +0300 Subject: [PATCH 2/3] Add Test for Cancelling Upstream and Custom Demand --- Tests/RetryWhenTests.swift | 86 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/Tests/RetryWhenTests.swift b/Tests/RetryWhenTests.swift index 3f3fc65..f2287a4 100644 --- a/Tests/RetryWhenTests.swift +++ b/Tests/RetryWhenTests.swift @@ -71,7 +71,93 @@ class RetryWhenTests: XCTestCase { XCTAssertEqual(completion, .finished) XCTAssertEqual(times, 2) } + + func testSuccessfulRetryWithManyRetries() { + var times = 0 + var retriesCount = 0 + var subscriptionsCount = 0 + var cancelCount = 0 + var resultOutput: [Int] = [] + var completion: Subscribers.Completion? + + subscription = Deferred(createPublisher: { () -> AnyPublisher in + defer { times += 1 } + if times == 0 { + return Fail(error: MyError.someError).eraseToAnyPublisher() + } else { + return Just(times) + .setFailureType(to: MyError.self) + .handleEvents( + receiveSubscription: { _ in subscriptionsCount += 1 }, + receiveCancel: { cancelCount += 1 } + ) + .eraseToAnyPublisher() + } + }) + .retryWhen { error in + return error + .handleEvents(receiveOutput: { _ in + retriesCount += 1 + }) + .flatMapLatest { _ in [1, 2].publisher } + } + .sink( + receiveCompletion: { completion = $0 }, + receiveValue: { resultOutput.append($0) } + ) + + XCTAssertEqual(resultOutput, [2]) + XCTAssertEqual(completion, .finished) + XCTAssertEqual(times, 3) + XCTAssertEqual(retriesCount, 1) + XCTAssertEqual(subscriptionsCount, 2) + XCTAssertEqual(cancelCount, 1) + } + + func testSuccessfulRetryWithCustomDemand() { + var times = 0 + var retriesCount = 0 + var resultOutput: [Int] = [] + var completion: Subscribers.Completion? + + AnyPublisher.init { subscriber in + defer { times += 1 } + if times < 2 { + subscriber.send(times) + subscriber.send(completion: .failure(MyError.someError)) + } + else { + subscriber.send(times) + subscriber.send(completion: .finished) + } + + return AnyCancellable({}) + } + .retryWhen { error in + error + .handleEvents(receiveOutput: { _ in retriesCount += 1}) + .map { _ in } + } + .subscribe( + AnySubscriber( + receiveSubscription: { subscription in + self.subscription = AnyCancellable { subscription.cancel() } + subscription.request(.max(3)) + }, + receiveValue: { + resultOutput.append($0) + return .none + }, + receiveCompletion: { completion = $0 } + ) + ) + XCTAssertEqual(resultOutput, [0, 1, 2]) + XCTAssertEqual(completion, .finished) + XCTAssertEqual(times, 3) + XCTAssertEqual(retriesCount, 2) + } + func testRetryFailure() { var expectedOutput: Int? From dac00ed9f6dd70a28bdb526c56b7fe3011cfe67c Mon Sep 17 00:00:00 2001 From: Nikita Konashenko Date: Mon, 14 Nov 2022 00:50:50 +0300 Subject: [PATCH 3/3] Add Manual Cancellation for Internal Subscriptions --- Sources/Operators/RetryWhen.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Sources/Operators/RetryWhen.swift b/Sources/Operators/RetryWhen.swift index 5fae26f..9a9496d 100644 --- a/Sources/Operators/RetryWhen.swift +++ b/Sources/Operators/RetryWhen.swift @@ -89,6 +89,9 @@ extension Publishers.RetryWhen { } func cancel() { + cancellable?.cancel() + cancellable = nil + sink?.cancelUpstream() sink = nil } }