diff --git a/Package.swift b/Package.swift index 021ebf8e50..1b63eae193 100644 --- a/Package.swift +++ b/Package.swift @@ -105,7 +105,8 @@ let package = Package( "_NIODataStructures", swiftAtomics, swiftCollections, - ] + ], + swiftSettings: strictConcurrencySettings ), .target( name: "NIOPosix", @@ -429,7 +430,8 @@ let package = Package( "NIOConcurrencyHelpers", "NIOCore", "NIOEmbedded", - ] + ], + swiftSettings: strictConcurrencySettings ), .testTarget( name: "NIOPosixTests", diff --git a/Sources/NIOEmbedded/AsyncTestingChannel.swift b/Sources/NIOEmbedded/AsyncTestingChannel.swift index 75889e5c37..3a51514ee8 100644 --- a/Sources/NIOEmbedded/AsyncTestingChannel.swift +++ b/Sources/NIOEmbedded/AsyncTestingChannel.swift @@ -199,24 +199,23 @@ public final class NIOAsyncTestingChannel: Channel { /// `nil` because ``NIOAsyncTestingChannel``s don't have parents. public let parent: Channel? = nil - // This is only written once, from a single thread, and never written again, so it's _technically_ thread-safe. Most methods cannot safely + // These two variables are only written once, from a single thread, and never written again, so they're _technically_ thread-safe. Most methods cannot safely // be used from multiple threads, but `isActive`, `isOpen`, `eventLoop`, and `closeFuture` can all safely be used from any thread. Just. @usableFromInline - var channelcore: EmbeddedChannelCore! + nonisolated(unsafe) var channelcore: EmbeddedChannelCore! - /// Guards any of the getters/setters that can be accessed from any thread. - private let stateLock: NIOLock = NIOLock() - - // Guarded by `stateLock` - private var _isWritable: Bool = true - - // Guarded by `stateLock` - private var _localAddress: SocketAddress? = nil + nonisolated(unsafe) private var _pipeline: ChannelPipeline! - // Guarded by `stateLock` - private var _remoteAddress: SocketAddress? = nil + private struct State { + var isWritable: Bool + var localAddress: SocketAddress? + var remoteAddress: SocketAddress? + } - private var _pipeline: ChannelPipeline! + /// Guards any of the getters/setters that can be accessed from any thread. + private let stateLock = NIOLockedValueBox( + State(isWritable: true, localAddress: nil, remoteAddress: nil) + ) /// - see: `Channel._channelCore` public var _channelCore: ChannelCore { @@ -231,11 +230,11 @@ public final class NIOAsyncTestingChannel: Channel { /// - see: `Channel.isWritable` public var isWritable: Bool { get { - self.stateLock.withLock { self._isWritable } + self.stateLock.withLockedValue { $0.isWritable } } set { - self.stateLock.withLock { () -> Void in - self._isWritable = newValue + self.stateLock.withLockedValue { + $0.isWritable = newValue } } } @@ -243,11 +242,11 @@ public final class NIOAsyncTestingChannel: Channel { /// - see: `Channel.localAddress` public var localAddress: SocketAddress? { get { - self.stateLock.withLock { self._localAddress } + self.stateLock.withLockedValue { $0.localAddress } } set { - self.stateLock.withLock { () -> Void in - self._localAddress = newValue + self.stateLock.withLockedValue { + $0.localAddress = newValue } } } @@ -255,11 +254,11 @@ public final class NIOAsyncTestingChannel: Channel { /// - see: `Channel.remoteAddress` public var remoteAddress: SocketAddress? { get { - self.stateLock.withLock { self._remoteAddress } + self.stateLock.withLockedValue { $0.remoteAddress } } set { - self.stateLock.withLock { () -> Void in - self._remoteAddress = newValue + self.stateLock.withLockedValue { + $0.remoteAddress = newValue } } } @@ -283,7 +282,8 @@ public final class NIOAsyncTestingChannel: Channel { /// - Parameters: /// - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register. /// - loop: The ``NIOAsyncTestingEventLoop`` to use. - public convenience init(handler: ChannelHandler, loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) async + @preconcurrency + public convenience init(handler: ChannelHandler & Sendable, loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) async { await self.init(handlers: [handler], loop: loop) } @@ -295,8 +295,9 @@ public final class NIOAsyncTestingChannel: Channel { /// - Parameters: /// - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register. /// - loop: The ``NIOAsyncTestingEventLoop`` to use. + @preconcurrency public convenience init( - handlers: [ChannelHandler], + handlers: [ChannelHandler & Sendable], loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop() ) async { self.init(loop: loop) @@ -671,3 +672,7 @@ extension NIOAsyncTestingChannel.LeftOverState: @unchecked Sendable {} @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension NIOAsyncTestingChannel.BufferState: @unchecked Sendable {} #endif + +// Synchronous options are never Sendable. +@available(*, unavailable) +extension NIOAsyncTestingChannel.SynchronousOptions: Sendable { } diff --git a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift index 7997d38e36..c312e23002 100644 --- a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift +++ b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift @@ -125,7 +125,7 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { self.scheduledTasks.removeFirst { $0.id == taskID } } - private func insertTask( + private func insertTask( taskID: UInt64, deadline: NIODeadline, promise: EventLoopPromise, @@ -152,7 +152,8 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { /// - see: `EventLoop.scheduleTask(deadline:_:)` @discardableResult - public func scheduleTask(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled { + @preconcurrency + public func scheduleTask(deadline: NIODeadline, _ task: @escaping @Sendable () throws -> T) -> Scheduled { let promise: EventLoopPromise = self.makePromise() let taskID = self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed) @@ -190,7 +191,8 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { /// - see: `EventLoop.scheduleTask(in:_:)` @discardableResult - public func scheduleTask(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled { + @preconcurrency + public func scheduleTask(in: TimeAmount, _ task: @escaping @Sendable () throws -> T) -> Scheduled { self.scheduleTask(deadline: self.now + `in`, task) } @@ -230,7 +232,8 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { /// On an `NIOAsyncTestingEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. Unlike with the other operations, this will /// immediately execute, to eliminate a common class of bugs. - public func execute(_ task: @escaping () -> Void) { + @preconcurrency + public func execute(_ task: @escaping @Sendable () -> Void) { if self.inEventLoop { self.scheduleTask(deadline: self.now, task) } else { @@ -359,7 +362,8 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { } /// - see: `EventLoop.shutdownGracefully` - public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + @preconcurrency + public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void) { self.queue.async { self._shutdownGracefully() queue.async { diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index fed89fc1ae..50a1898681 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -185,7 +185,7 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { insertOrder: self.nextTaskNumber(), task: { do { - promise.succeed(try task()) + promise.assumeIsolated().succeed(try task()) } catch let err { promise.fail(err) } @@ -365,6 +365,11 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { }() } +// EmbeddedEventLoop is extremely _not_ Sendable. However, the EventLoop protocol +// requires it to be. We are doing some runtime enforcement of correct use, but +// ultimately we can't have the compiler validating this usage. +extension EmbeddedEventLoop: @unchecked Sendable { } + @usableFromInline class EmbeddedChannelCore: ChannelCore { var isOpen: Bool { @@ -484,8 +489,11 @@ class EmbeddedChannelCore: ChannelCore { self.pipeline.syncOperations.fireChannelInactive() self.pipeline.syncOperations.fireChannelUnregistered() + let loopBoundSelf = NIOLoopBound(self, eventLoop: self.eventLoop) + eventLoop.execute { // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline + let `self` = loopBoundSelf.value self.removeHandlers(pipeline: self.pipeline) self.closePromise.succeed(()) } @@ -583,6 +591,10 @@ class EmbeddedChannelCore: ChannelCore { } } +// ChannelCores are basically never Sendable. +@available(*, unavailable) +extension EmbeddedChannelCore: Sendable { } + /// `EmbeddedChannel` is a `Channel` implementation that does neither any /// actual IO nor has a proper eventing mechanism. The prime use-case for /// `EmbeddedChannel` is in unit tests when you want to feed the inbound events @@ -867,8 +879,8 @@ public final class EmbeddedChannel: Channel { @inlinable @discardableResult public func writeInbound(_ data: T) throws -> BufferState { self.embeddedEventLoop.checkCorrectThread() - self.pipeline.fireChannelRead(data) - self.pipeline.fireChannelReadComplete() + self.pipeline.syncOperations.fireChannelRead(NIOAny(data)) + self.pipeline.syncOperations.fireChannelReadComplete() try self.throwIfErrorCaught() return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer)) } @@ -1086,5 +1098,16 @@ extension EmbeddedChannel { } } +// EmbeddedChannel is extremely _not_ Sendable. However, the Channel protocol +// requires it to be. We are doing some runtime enforcement of correct use, but +// ultimately we can't have the compiler validating this usage. +extension EmbeddedChannel: @unchecked Sendable { } + +@available(*, unavailable) +extension EmbeddedChannel.LeftOverState: @unchecked Sendable {} + +@available(*, unavailable) +extension EmbeddedChannel.BufferState: @unchecked Sendable {} + @available(*, unavailable) extension EmbeddedChannel.SynchronousOptions: Sendable {} diff --git a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift index 5fa112f6d1..5022dcd3ca 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift @@ -36,7 +36,7 @@ class AsyncTestingChannelTests: XCTestCase { } let channel = NIOAsyncTestingChannel() - XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).wait()) { e in + XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).map { _ in }.wait()) { e in XCTAssertEqual(e as? ChannelPipelineError, .notFound) } diff --git a/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift index 66c40c7414..d5eb57266f 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift @@ -19,7 +19,7 @@ import XCTest @testable import NIOEmbedded -private class EmbeddedTestError: Error {} +private final class EmbeddedTestError: Error {} @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) final class NIOAsyncTestingEventLoopTests: XCTestCase { @@ -336,10 +336,12 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { // advanceTime(by:) is the same as on MultiThreadedEventLoopGroup: specifically, that tasks run via // schedule that expire "now" all run at the same time, and that any work they schedule is run // after all such tasks expire. + struct TestState { + var firstScheduled: Scheduled? + var secondScheduled: Scheduled? + } let loop = NIOAsyncTestingEventLoop() - let lock = NIOLock() - var firstScheduled: Scheduled? = nil - var secondScheduled: Scheduled? = nil + let lock = NIOLockedValueBox(TestState()) let orderingCounter = ManagedAtomic(0) // Here's the setup. First, we'll set up two scheduled tasks to fire in 5 nanoseconds. Each of these @@ -356,13 +358,13 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { // // To validate the ordering, we'll use a counter. - lock.withLock { () -> Void in - firstScheduled = loop.scheduleTask(in: .nanoseconds(5)) { - let second = lock.withLock { () -> Scheduled? in - XCTAssertNotNil(firstScheduled) - firstScheduled = nil - XCTAssertNotNil(secondScheduled) - return secondScheduled + lock.withLockedValue { + $0.firstScheduled = loop.scheduleTask(in: .nanoseconds(5)) { + let second = lock.withLockedValue { + XCTAssertNotNil($0.firstScheduled) + $0.firstScheduled = nil + XCTAssertNotNil($0.secondScheduled) + return $0.secondScheduled } if let partner = second { @@ -379,11 +381,11 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { } } - secondScheduled = loop.scheduleTask(in: .nanoseconds(5)) { - lock.withLock { () -> Void in - secondScheduled = nil - XCTAssertNil(firstScheduled) - XCTAssertNil(secondScheduled) + $0.secondScheduled = loop.scheduleTask(in: .nanoseconds(5)) { + lock.withLockedValue { + $0.secondScheduled = nil + XCTAssertNil($0.firstScheduled) + XCTAssertNil($0.secondScheduled) } XCTAssertCompareAndSwapSucceeds(storage: orderingCounter, expected: 2, desired: 3) @@ -482,6 +484,7 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { let eventLoop = NIOAsyncTestingEventLoop() let tasksRun = ManagedAtomic(0) + @Sendable func scheduleRecursiveTask( at taskStartTime: NIODeadline, andChildTaskAfter childTaskStartDelay: TimeAmount @@ -514,29 +517,29 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { func testShutdownCancelsRemainingScheduledTasks() async { let eventLoop = NIOAsyncTestingEventLoop() - var tasksRun = 0 + let tasksRun = ManagedAtomic(0) - let a = eventLoop.scheduleTask(in: .seconds(1)) { tasksRun += 1 } - let b = eventLoop.scheduleTask(in: .seconds(2)) { tasksRun += 1 } + let a = eventLoop.scheduleTask(in: .seconds(1)) { tasksRun.wrappingIncrement(ordering: .sequentiallyConsistent) } + let b = eventLoop.scheduleTask(in: .seconds(2)) { tasksRun.wrappingIncrement(ordering: .sequentiallyConsistent) } - XCTAssertEqual(tasksRun, 0) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 0) await eventLoop.advanceTime(by: .seconds(1)) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) XCTAssertNoThrow(try eventLoop.syncShutdownGracefully()) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) await eventLoop.advanceTime(by: .seconds(1)) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) await eventLoop.advanceTime(to: .distantFuture) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) XCTAssertNoThrow(try a.futureResult.wait()) await XCTAssertThrowsError(try await b.futureResult.get()) { error in XCTAssertEqual(error as? EventLoopError, .cancelled) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) } } diff --git a/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift b/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift index 3c77ca624a..a13386e900 100644 --- a/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift +++ b/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift @@ -12,12 +12,14 @@ // //===----------------------------------------------------------------------===// +import Atomics +import NIOConcurrencyHelpers import NIOCore import XCTest @testable import NIOEmbedded -class ChannelLifecycleHandler: ChannelInboundHandler { +final class ChannelLifecycleHandler: ChannelInboundHandler, Sendable { public typealias InboundIn = Any public enum ChannelState { @@ -27,17 +29,28 @@ class ChannelLifecycleHandler: ChannelInboundHandler { case active } - public var currentState: ChannelState - public var stateHistory: [ChannelState] + public var currentState: ChannelState { + get { + self._state.withLockedValue { $0.currentState } + } + } + public var stateHistory: [ChannelState] { + get { + self._state.withLockedValue { $0.stateHistory } + } + } + + private let _state: NIOLockedValueBox<(currentState: ChannelState, stateHistory: [ChannelState])> public init() { - currentState = .unregistered - stateHistory = [.unregistered] + self._state = NIOLockedValueBox((currentState: .unregistered, stateHistory: [.unregistered])) } private func updateState(_ state: ChannelState) { - currentState = state - stateHistory.append(state) + self._state.withLockedValue { + $0.currentState = state + $0.stateHistory.append(state) + } } public func channelRegistered(context: ChannelHandlerContext) { @@ -77,7 +90,7 @@ class EmbeddedChannelTest: XCTestCase { } let channel = EmbeddedChannel(handler: Handler()) - XCTAssertNoThrow(try channel.pipeline.handler(type: Handler.self).wait()) + XCTAssertNoThrow(try channel.pipeline.handler(type: Handler.self).map { _ in }.wait()) } func testSingleHandlerInitNil() { @@ -86,7 +99,7 @@ class EmbeddedChannelTest: XCTestCase { } let channel = EmbeddedChannel(handler: nil) - XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).wait()) { e in + XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).map { _ in }.wait()) { e in XCTAssertEqual(e as? ChannelPipelineError, .notFound) } } @@ -104,13 +117,13 @@ class EmbeddedChannelTest: XCTestCase { let channel = EmbeddedChannel( handlers: [Handler(identifier: "0"), Handler(identifier: "1"), Handler(identifier: "2")] ) - XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "0")) + XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).map { $0.identifier }.wait(), "0")) XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler0").wait()) - XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "1")) + XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).map { $0.identifier }.wait(), "1")) XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler1").wait()) - XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "2")) + XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).map { $0.identifier }.wait(), "2")) XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler2").wait()) } @@ -180,7 +193,7 @@ class EmbeddedChannelTest: XCTestCase { func testWriteInboundByteBufferReThrow() { let channel = EmbeddedChannel() - XCTAssertNoThrow(try channel.pipeline.addHandler(ExceptionThrowingInboundHandler()).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(ExceptionThrowingInboundHandler())) XCTAssertThrowsError(try channel.writeInbound("msg")) { error in XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError) } @@ -189,7 +202,7 @@ class EmbeddedChannelTest: XCTestCase { func testWriteOutboundByteBufferReThrow() { let channel = EmbeddedChannel() - XCTAssertNoThrow(try channel.pipeline.addHandler(ExceptionThrowingOutboundHandler()).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(ExceptionThrowingOutboundHandler())) XCTAssertThrowsError(try channel.writeOutbound("msg")) { error in XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError) } @@ -323,7 +336,7 @@ class EmbeddedChannelTest: XCTestCase { func testCloseOnInactiveIsOk() throws { let channel = EmbeddedChannel() let inactiveHandler = CloseInChannelInactiveHandler() - XCTAssertNoThrow(try channel.pipeline.addHandler(inactiveHandler).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(inactiveHandler)) XCTAssertTrue(try channel.finish().isClean) // channelInactive should fire only once. @@ -479,11 +492,12 @@ class EmbeddedChannelTest: XCTestCase { func testFinishWithRecursivelyScheduledTasks() throws { let channel = EmbeddedChannel() let tasks: NIOLoopBoundBox<[Scheduled]> = NIOLoopBoundBox([], eventLoop: channel.eventLoop) - var invocations = 0 + let invocations = ManagedAtomic(0) + @Sendable func recursivelyScheduleAndIncrement() { let task = channel.pipeline.eventLoop.scheduleTask(deadline: .distantFuture) { - invocations += 1 + invocations.wrappingIncrement(ordering: .sequentiallyConsistent) recursivelyScheduleAndIncrement() } tasks.value.append(task) @@ -494,7 +508,7 @@ class EmbeddedChannelTest: XCTestCase { try XCTAssertNoThrow(channel.finish()) // None of the tasks should have been executed, they were scheduled for distant future. - XCTAssertEqual(invocations, 0) + XCTAssertEqual(invocations.load(ordering: .sequentiallyConsistent), 0) // Because the root task didn't run, it should be the onnly one scheduled. XCTAssertEqual(tasks.value.count, 1) diff --git a/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift b/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift index b1690f7f86..5e3ce67362 100644 --- a/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift +++ b/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift @@ -12,13 +12,14 @@ // //===----------------------------------------------------------------------===// +import Atomics import NIOConcurrencyHelpers import NIOCore import XCTest @testable import NIOEmbedded -private class EmbeddedTestError: Error {} +private final class EmbeddedTestError: Error {} public final class EmbeddedEventLoopTest: XCTestCase { func testExecuteDoesNotImmediatelyRunTasks() throws { @@ -195,37 +196,38 @@ public final class EmbeddedEventLoopTest: XCTestCase { } func testScheduledTasksFuturesFire() throws { - var fired = false + let fired = ManagedAtomic(false) let loop = EmbeddedEventLoop() let task = loop.scheduleTask(in: .nanoseconds(5)) { true } - task.futureResult.whenSuccess { fired = $0 } + task.futureResult.whenSuccess { fired.store($0, ordering: .sequentiallyConsistent) } loop.advanceTime(by: .nanoseconds(4)) - XCTAssertFalse(fired) + XCTAssertFalse(fired.load(ordering: .sequentiallyConsistent)) loop.advanceTime(by: .nanoseconds(1)) - XCTAssertTrue(fired) + XCTAssertTrue(fired.load(ordering: .sequentiallyConsistent)) } func testScheduledTasksFuturesError() throws { - var err: EmbeddedTestError? = nil - var fired = false + let err: NIOLockedValueBox = NIOLockedValueBox(nil) + let fired = ManagedAtomic(false) let loop = EmbeddedEventLoop() let task = loop.scheduleTask(in: .nanoseconds(5)) { - err = EmbeddedTestError() - throw err! + let localErr = EmbeddedTestError() + err.withLockedValue { $0 = localErr } + throw localErr } task.futureResult.map { XCTFail("Scheduled future completed") }.recover { caughtErr in - XCTAssertTrue(err === caughtErr as? EmbeddedTestError) + XCTAssertTrue(err.withLockedValue { $0 === caughtErr as? EmbeddedTestError }) }.whenComplete { (_: Result) in - fired = true + fired.store(true, ordering: .sequentiallyConsistent) } loop.advanceTime(by: .nanoseconds(4)) - XCTAssertFalse(fired) + XCTAssertFalse(fired.load(ordering: .sequentiallyConsistent)) loop.advanceTime(by: .nanoseconds(1)) - XCTAssertTrue(fired) + XCTAssertTrue(fired.load(ordering: .sequentiallyConsistent)) } func testTaskOrdering() { @@ -511,21 +513,24 @@ public final class EmbeddedEventLoopTest: XCTestCase { func testScheduleRepeatedTask() { let eventLoop = EmbeddedEventLoop() - var counter = 0 + let counter = ManagedAtomic(0) eventLoop.scheduleRepeatedTask(initialDelay: .seconds(1), delay: .seconds(1)) { repeatedTask in - guard counter < 10 else { + guard counter.load(ordering: .sequentiallyConsistent) < 10 else { repeatedTask.cancel() return } - counter += 1 + counter.wrappingIncrement(ordering: .sequentiallyConsistent) } - XCTAssertEqual(counter, 0) + XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 0) eventLoop.advanceTime(by: .seconds(1)) - XCTAssertEqual(counter, 1) + XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 1) eventLoop.advanceTime(by: .seconds(9)) - XCTAssertEqual(counter, 10) + XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 10) } } + +@available(*, unavailable) +extension EmbeddedEventLoopTest: Sendable { } diff --git a/Tests/NIOEmbeddedTests/TestUtils.swift b/Tests/NIOEmbeddedTests/TestUtils.swift index c25bc44a1b..3cefa81064 100644 --- a/Tests/NIOEmbeddedTests/TestUtils.swift +++ b/Tests/NIOEmbeddedTests/TestUtils.swift @@ -48,26 +48,25 @@ extension EventLoopFuture { if self.eventLoop.inEventLoop { // Easy, we're on the EventLoop. Let's just use our knowledge that we run completed future callbacks // immediately. - var fulfilled = false + let fulfilled = NIOLoopBoundBox(false, eventLoop: self.eventLoop) self.whenComplete { _ in - fulfilled = true + fulfilled.value = true } - return fulfilled + return fulfilled.value } else { - let lock = NIOLock() + let fulfilled = NIOLockedValueBox(false) let group = DispatchGroup() - var fulfilled = false // protected by lock group.enter() self.eventLoop.execute { let isFulfilled = self.isFulfilled // This will now enter the above branch. - lock.withLock { - fulfilled = isFulfilled + fulfilled.withLockedValue { + $0 = isFulfilled } group.leave() } group.wait() // this is very nasty but this is for tests only, so... - return lock.withLock { fulfilled } + return fulfilled.withLockedValue { $0 } } } }