Skip to content

Commit

Permalink
Publishes values of subsequent publisher after emptying publisher queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ohitsdaniel committed Nov 22, 2020
1 parent ec32fa7 commit 0cfb46d
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 13 deletions.
11 changes: 5 additions & 6 deletions Sources/Operators/ConcatMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,12 @@ private extension Publishers.ConcatMap {

override func receive(_ input: Upstream.Output) -> Subscribers.Demand {
let mapped = transform(input)

lock.lock()
defer { lock.unlock() }

if activePublisher == nil {
lock.unlock()
setActivePublisher(mapped)
} else {
lock.unlock()
bufferedPublishers.append(mapped)
}

Expand All @@ -132,15 +131,15 @@ private extension Publishers.ConcatMap {

publisher.sink(
receiveCompletion: { completion in
self.lock.lock()
defer { self.lock.unlock() }
switch completion {
case .finished:
self.lock.lock()
guard let next = self.bufferedPublishers.first else {
self.lock.unlock()
self.activePublisher = nil
return
}
self.bufferedPublishers.removeFirst()
self.lock.unlock()
self.setActivePublisher(next)
case .failure(let error):
self.receive(completion: .failure(error))
Expand Down
84 changes: 77 additions & 7 deletions Tests/ConcatMapTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ final class ConcatMapTests: XCTestCase {
cancellables = []
}

func test_publishes_values_inOrder() {
func test_publishes_values_in_order() {
var receivedValues = [Int]()
let expectedValues = [1, 2, 4, 5, 6]
let expectedValues = [1, 2, 3]

let firstPublisher = P()
let secondPublisher = P()
Expand All @@ -43,19 +43,89 @@ final class ConcatMapTests: XCTestCase {

sut.send(firstPublisher)
sut.send(secondPublisher)

firstPublisher.send(1)
firstPublisher.send(completion: .finished)

secondPublisher.send(2)
sut.send(thirdPublisher)
secondPublisher.send(completion: .finished)

thirdPublisher.send(3)

XCTAssertEqual(expectedValues, receivedValues)
}

func test_ignores_values_of_subsequent_while_previous_hasNot_completed() {
var receivedValues = [Int]()
let expectedValues = [1, 3]

let firstPublisher = P()
let secondPublisher = P()

let sut = PassthroughSubject<P, TestError>()

sut.concatMap { $0 }
.sink(
receiveCompletion: { _ in },
receiveValue: { value in receivedValues.append(value) }
)
.store(in: &cancellables)

sut.send(firstPublisher)
sut.send(secondPublisher)

firstPublisher.send(1)
firstPublisher.send(2)
// values sent onto the second publisher will be ignored as long as the first publisher hasn't completed
secondPublisher.send(2)
firstPublisher.send(completion: .finished)

secondPublisher.send(3)
secondPublisher.send(completion: .finished)

XCTAssertEqual(expectedValues, receivedValues)
}

func test_publishes_values_of_subsequent_publisher_after_emptying_publisher_queue() {
var receivedValues = [Int]()
let expectedValues = [1, 2]

let firstPublisher = P()
let secondPublisher = P()

let sut = PassthroughSubject<P, TestError>()

sut.concatMap { $0 }
.sink(
receiveCompletion: { _ in },
receiveValue: { value in receivedValues.append(value) }
)
.store(in: &cancellables)

sut.send(firstPublisher)
firstPublisher.send(1)
firstPublisher.send(completion: .finished)

secondPublisher.send(4)
secondPublisher.send(5)
sut.send(secondPublisher)
secondPublisher.send(2)
secondPublisher.send(completion: .finished)

thirdPublisher.send(6)
XCTAssertEqual(expectedValues, receivedValues)
}

func test_synchronous_completion() {
var receivedValues = [Int]()
let expectedValues = [1, 2]
let firstPublisher = Just<Int>(1)
let secondPublisher = Just<Int>(2)

let sut = PassthroughSubject<Just<Int>, Never>()

sut.concatMap { $0 }
.sink { value in receivedValues.append(value) }
.store(in: &cancellables)

sut.send(firstPublisher)
sut.send(secondPublisher)

XCTAssertEqual(expectedValues, receivedValues)
}
Expand Down

0 comments on commit 0cfb46d

Please sign in to comment.