diff --git a/Sources/Operators/DelaySubscription.swift b/Sources/Operators/DelaySubscription.swift new file mode 100644 index 0000000..3f745db --- /dev/null +++ b/Sources/Operators/DelaySubscription.swift @@ -0,0 +1,126 @@ +// +// DelaySubscription.swift +// CombineExt +// +// Created by Jack Stone on 06/03/2021. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if canImport(Combine) +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publisher { + + /// Time shifts the delivery of all output to the downstream receiver by delaying + /// the time a subscriber starts receiving elements from its subscription. + /// + /// Note that delaying a subscription may result in skipped elements for "hot" publishers. + /// However, this won't make a difference for "cold" publishers. + /// + /// - Parameter interval: The amount of delay time. + /// - Parameter tolerance: The allowed tolerance in the firing of the delayed subscription. + /// - Parameter scheduler: The scheduler to schedule the subscription delay on. + /// - Parameter options: Any additional scheduler options. + /// + /// - Returns: A publisher with its subscription delayed. + /// + func delaySubscription(for interval: S.SchedulerTimeType.Stride, + tolerance: S.SchedulerTimeType.Stride? = nil, + scheduler: S, + options: S.SchedulerOptions? = nil) -> Publishers.DelaySubscription { + return Publishers.DelaySubscription(upstream: self, + interval: interval, + tolerance: tolerance ?? scheduler.minimumTolerance, + scheduler: scheduler, + options: options) + } +} + +// MARK: - Publisher +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publishers { + + /// A publisher that delays the upstream subscription. + struct DelaySubscription: Publisher { + + public typealias Output = U.Output // Upstream output + public typealias Failure = U.Failure // Upstream failure + + /// The publisher that this publisher receives signals from. + public let upstream: U + + /// The amount of delay time. + public let interval: S.SchedulerTimeType.Stride + + /// The allowed tolerance in the firing of the delayed subscription. + public let tolerance: S.SchedulerTimeType.Stride + + /// The scheduler to run the subscription delay timer on. + public let scheduler: S + + /// Any additional scheduler options. + public let options: S.SchedulerOptions? + + init(upstream: U, + interval: S.SchedulerTimeType.Stride, + tolerance: S.SchedulerTimeType.Stride, + scheduler: S, + options: S.SchedulerOptions?) { + self.upstream = upstream + self.interval = interval + self.tolerance = tolerance + self.scheduler = scheduler + self.options = options + } + + public func receive(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input { + self.upstream.subscribe(DelayedSubscription(publisher: self, downstream: subscriber)) + } + } +} + +// MARK: - Subscription +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.DelaySubscription { + + /// The delayed subscription where the scheduler advancing takes place. + final class DelayedSubscription: Subscriber where D.Input == Output, D.Failure == U.Failure { + + typealias Input = U.Output // Upstream output + typealias Failure = U.Failure // Upstream failure + + private let interval: S.SchedulerTimeType.Stride + private let tolerance: S.SchedulerTimeType.Stride + private let scheduler: S + private let options: S.SchedulerOptions? + + private let downstream: D + + init(publisher: Publishers.DelaySubscription, + downstream: D) { + self.interval = publisher.interval + self.tolerance = publisher.tolerance + self.scheduler = publisher.scheduler + self.options = publisher.options + self.downstream = downstream + } + + func receive(subscription: Subscription) { + scheduler.schedule(after: scheduler.now.advanced(by: interval), + tolerance: tolerance, + options: options) { [weak self] in + self?.downstream.receive(subscription: subscription) + } + } + + func receive(_ input: U.Output) -> Subscribers.Demand { + return downstream.receive(input) + } + + func receive(completion: Subscribers.Completion) { + downstream.receive(completion: completion) + } + } +} +#endif diff --git a/Tests/DelaySubscriptionTests.swift b/Tests/DelaySubscriptionTests.swift new file mode 100644 index 0000000..7d4938b --- /dev/null +++ b/Tests/DelaySubscriptionTests.swift @@ -0,0 +1,555 @@ +// +// DelaySubscriptionTests.swift +// CombineExt +// +// Created by Jack Stone on 06/03/2021. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if !os(watchOS) +import XCTest +import Combine +import CombineSchedulers +import CombineExt + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class DelaySubscriptionTests: XCTestCase { + + private var subscriptions = Set() + private var scheduler: TestScheduler! + + override func setUp() { + super.setUp() + subscriptions = Set() + scheduler = DispatchQueue.testScheduler + } + + // MARK: - Timespan tests + func testDelaySubscriptionDropsElementsWhileSubscriptionIsDelayed() { + + var output = [Int]() + let subscriptionDelaySeconds = 10_000 + + let subject = CurrentValueSubject(0) + + subject + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + XCTFail() + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + subject.send(1) + subject.send(2) + subject.send(3) + subject.send(4) + + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [4]) + } + + func testDelaySubscriptionTimeSpanSecondsSimple() { + + var output = [Int]() + let subscriptionDelaySeconds = 100_000_000 + var delayRemaining = subscriptionDelaySeconds + + Just(1) + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .seconds(50_000_000)) + delayRemaining -= 50_000_000 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .seconds(49_999_999)) + delayRemaining -= 49_999_999 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .seconds(1)) + delayRemaining -= 1 + + XCTAssertEqual(delayRemaining, 0) + XCTAssertFalse(output.isEmpty) + XCTAssertEqual(output, [1]) + } + + func testDelaySubscriptionTimeSpanMillisecondsSimple() { + + var output = [Int]() + let subscriptionDelayMilliseconds = 100_000_000 + var delayRemaining = subscriptionDelayMilliseconds + + Just(1) + .receive(on: scheduler) + .delaySubscription(for: .milliseconds(subscriptionDelayMilliseconds), scheduler: scheduler) + .sink { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .milliseconds(50_000_000)) + delayRemaining -= 50_000_000 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .milliseconds(49_999_999)) + delayRemaining -= 49_999_999 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .milliseconds(1)) + delayRemaining -= 1 + + XCTAssertEqual(delayRemaining, 0) + XCTAssertFalse(output.isEmpty) + XCTAssertEqual(output, [1]) + } + + func testDelaySubscriptionTimeSpanMicrosecondsSimple() { + + var output = [Int]() + let subscriptionDelayMicroseconds = 100_000_000 + var delayRemaining = subscriptionDelayMicroseconds + + Just(1) + .receive(on: scheduler) + .delaySubscription(for: .microseconds(subscriptionDelayMicroseconds), scheduler: scheduler) + .sink { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .microseconds(50_000_000)) + delayRemaining -= 50_000_000 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .microseconds(49_999_999)) + delayRemaining -= 49_999_999 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .microseconds(1)) + delayRemaining -= 1 + + XCTAssertEqual(delayRemaining, 0) + XCTAssertFalse(output.isEmpty) + XCTAssertEqual(output, [1]) + } + + func testDelaySubscriptionTimeSpanNanosecondsSimple() { + + var output = [Int]() + let subscriptionDelayNanoseconds = 100_000_000 + var delayRemaining = subscriptionDelayNanoseconds + + Just(1) + .receive(on: scheduler) + .delaySubscription(for: .nanoseconds(subscriptionDelayNanoseconds), scheduler: scheduler) + .sink { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .nanoseconds(50_000_000)) + delayRemaining -= 50_000_000 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .nanoseconds(49_999_999)) + delayRemaining -= 49_999_999 + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .nanoseconds(1)) + delayRemaining -= 1 + + XCTAssertEqual(delayRemaining, 0) + XCTAssertFalse(output.isEmpty) + XCTAssertEqual(output, [1]) + } + + func testDelaySubscriptionTimeSpanZeroSimple() { + + var output = [Int]() + + Just(1) + .receive(on: scheduler) + .delaySubscription(for: .zero, scheduler: scheduler) + .sink { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + + scheduler.advance() + XCTAssertFalse(output.isEmpty) + XCTAssertEqual(output, [1]) + } + + // MARK: - Value propagation tests + func testDelaySubscriptionTimeSpanCurrentValueSubject() { + + let subscriptionDelaySeconds = 10_000 + var output = [Int]() + var hasFinished = false + + let subject = CurrentValueSubject(-1) + + subject + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .finished = completion else { + XCTFail() + return + } + + hasFinished = true + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + + subject.send(0) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [0]) + XCTAssertFalse(hasFinished) + + subject.send(1) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [0, 1]) + XCTAssertFalse(hasFinished) + + subject.send(2) + subject.send(3) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [0, 1, 2, 3]) + XCTAssertFalse(hasFinished) + + subject.send(4) + subject.send(5) + subject.send(6) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [0, 1, 2, 3, 4, 5, 6]) + XCTAssertFalse(hasFinished) + + subject.send(completion: .finished) + + scheduler.advance() + XCTAssertTrue(hasFinished) + } + + func testDelaySubscriptionTimeSpanPassthroughSubject() { + + let subscriptionDelaySeconds = 10_000 + var output = [Int]() + var hasFinished = false + + let subject = PassthroughSubject() + + subject + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .finished = completion else { + XCTFail() + return + } + + hasFinished = true + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, []) + XCTAssertFalse(hasFinished) + + subject.send(1) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [1]) + XCTAssertFalse(hasFinished) + + subject.send(2) + subject.send(3) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [1, 2, 3]) + XCTAssertFalse(hasFinished) + + subject.send(4) + subject.send(5) + subject.send(6) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertEqual(output, [1, 2, 3, 4, 5, 6]) + XCTAssertFalse(hasFinished) + + subject.send(completion: .finished) + + scheduler.advance() + XCTAssertTrue(hasFinished) + } + + func testDelaySubscriptionTimeSpanSequence() { + + let subscriptionDelaySeconds = 10_000 + var output = [Int]() + var hasFinished = false + + Array(0...100).publisher + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .finished = completion else { + XCTFail() + return + } + + hasFinished = true + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + XCTAssertTrue(output.isEmpty) + XCTAssertFalse(hasFinished) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertFalse(output.isEmpty) + XCTAssertEqual(output, Array(0...100)) + XCTAssertTrue(hasFinished) + } + + // MARK: - Error tests + func testDelaySubscriptionTimespanErrorPropogationBeforeDelay() { + + let subscriptionDelaySeconds = 10_000 + var output = [Int]() + var testError: Error? + + Fail(error: TestError.generic) + .setOutputType(to: Int.self) + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .failure(let error) = completion else { + XCTFail() + return + } + + testError = error + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + scheduler.advance() + + // Delay is bypassed for upstream failures + XCTAssertTrue(output.isEmpty) + XCTAssertNotNil(testError) + } + + func testDelaySubscriptionTimespanErrorPropogationDuringDelay() { + + let subject = PassthroughSubject() + + var output = [Int]() + let subscriptionDelaySeconds = 10_000 + var testError: Error? + + subject + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .failure(let error) = completion else { + XCTFail() + return + } + + testError = error + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + subject.send(42) + subject.send(43) + subject.send(completion: .failure(.generic)) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertTrue(output.isEmpty) // As the error was emitted while subscription was being delayed, no elements were emitted. + XCTAssertNotNil(testError) + } + + func testDelaySubscriptionTimespanErrorPropogationAfterDelay() { + + let subject = PassthroughSubject() + + var output = [Int]() + let subscriptionDelaySeconds = 10_000 + var testError: Error? + + subject + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .failure(let error) = completion else { + XCTFail() + return + } + + testError = error + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + subject.send(42) + subject.send(43) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertTrue(output.isEmpty) + XCTAssertNil(testError) + + subject.send(completion: .failure(.generic)) + scheduler.advance() + XCTAssertNotNil(testError) + } + + // MARK: - Completion tests + func testDelaySubscriptionTimespanCompletedDuringDelay() { + + let subject = PassthroughSubject() + + var output = [Int]() + let subscriptionDelaySeconds = 10_000 + var hasFinished = false + + subject + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .finished = completion else { + XCTFail() + return + } + + hasFinished = true + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + subject.send(42) + subject.send(43) + subject.send(completion: .finished) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertTrue(output.isEmpty) // As the sequence was completed while subscription was being delayed, no elements were emitted. + XCTAssertTrue(hasFinished) + } + + func testDelaySubscriptionTimespanCompletedAfterDelay() { + + let subscriptionDelaySeconds = 10_000 + var hasFinished = false + + let subject = PassthroughSubject() + + subject + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .finished = completion else { + XCTFail() + return + } + + hasFinished = true + + } receiveValue: { _ in } + .store(in: &subscriptions) + + subject.send(42) + + scheduler.advance(by: .seconds(subscriptionDelaySeconds)) + XCTAssertFalse(hasFinished) + + subject.send(completion: .finished) + scheduler.advance() + XCTAssertTrue(hasFinished) + } + + // MARK: - Empty tests + func testDelaySubscriptionTimespanEmpty() { + + var output = [Int]() + let subscriptionDelaySeconds = 10_000 + var hasFinished = false + + Empty() + .setOutputType(to: Int.self) + .setFailureType(to: Error.self) + .receive(on: scheduler) + .delaySubscription(for: .seconds(subscriptionDelaySeconds), scheduler: scheduler) + .sink { completion in + guard case .finished = completion else { + XCTFail() + return + } + + hasFinished = true + + } receiveValue: { value in + output.append(value) + } + .store(in: &subscriptions) + + scheduler.advance() + XCTAssertTrue(output.isEmpty) + XCTAssertTrue(hasFinished) + } +} + +// MARK: - Test helpers +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension DelaySubscriptionTests { + + private enum TestError: Error { + case generic + } +} +#endif