diff --git a/Alchemy/Queue/QueueWorker.swift b/Alchemy/Queue/QueueWorker.swift index 81c1de1d..d5e09983 100644 --- a/Alchemy/Queue/QueueWorker.swift +++ b/Alchemy/Queue/QueueWorker.swift @@ -1,10 +1,17 @@ import AsyncAlgorithms -struct QueueWorker: Service, @unchecked Sendable { +actor QueueWorker: Service { let queue: Queue - var channels: [String] = [Queue.defaultChannel] - var pollRate: Duration = .seconds(1) - var untilEmpty: Bool = false + var channels: [String] + var pollRate: Duration + var untilEmpty: Bool + + init(queue: Queue, channels: [String], pollRate: Duration, untilEmpty: Bool) { + self.queue = queue + self.channels = channels + self.pollRate = pollRate + self.untilEmpty = untilEmpty + } private var timer: some AsyncSequence { AsyncTimerSequence(interval: pollRate, clock: ContinuousClock()) diff --git a/Alchemy/Scheduler/Frequency.swift b/Alchemy/Scheduler/Frequency.swift index 29604bc2..618e0de7 100644 --- a/Alchemy/Scheduler/Frequency.swift +++ b/Alchemy/Scheduler/Frequency.swift @@ -2,7 +2,7 @@ import Cron import Foundation /// Used to help build schedule frequencies for scheduled tasks. -public final class Frequency { +public final class Frequency: AsyncSequence { /// A day of the week. public enum DayOfWeek: Int, ExpressibleByIntegerLiteral { /// Sunday @@ -71,7 +71,7 @@ public final class Frequency { var cron = try! DatePattern("* * * * * * *") /// The time amount until the next interval, if there is one. - func timeUntilNext() -> TimeAmount? { + func timeUntilNext() -> Duration? { guard let next = cron.next(), let nextDate = next.date else { return nil } @@ -170,4 +170,25 @@ public final class Frequency { preconditionFailure("Error parsing cron expression '\(expression)': \(error).") } } + + // MARK: AsyncSequence + + public typealias Element = Foundation.Date + + public func makeAsyncIterator() -> Iterator { + Iterator(frequency: self) + } + + public struct Iterator: AsyncIteratorProtocol { + let frequency: Frequency + + public mutating func next() async throws -> Foundation.Date? { + guard let delay = frequency.timeUntilNext() else { + return nil + } + + try await Task.sleep(for: delay) + return Date() + } + } } diff --git a/Alchemy/Scheduler/Plugins/Schedules.swift b/Alchemy/Scheduler/Plugins/Schedules.swift index f2674833..6bb5f5d6 100644 --- a/Alchemy/Scheduler/Plugins/Schedules.swift +++ b/Alchemy/Scheduler/Plugins/Schedules.swift @@ -5,8 +5,4 @@ struct Schedules: Plugin { app.container.register(scheduler).singleton() app.registerCommand(ScheduleCommand.self) } - - func shutdown(app: Application) async throws { - try await scheduler.shutdown() - } } diff --git a/Alchemy/Scheduler/Scheduler.swift b/Alchemy/Scheduler/Scheduler.swift index 3a6a5d36..1d994373 100644 --- a/Alchemy/Scheduler/Scheduler.swift +++ b/Alchemy/Scheduler/Scheduler.swift @@ -1,70 +1,41 @@ +import AsyncAlgorithms import NIOCore import NIOConcurrencyHelpers +import Foundation +import ServiceLifecycle /// A service for scheduling recurring work, in lieu of a separate cron task /// running apart from your server. public final class Scheduler { - private struct ScheduledTask { + private struct Task: Service, @unchecked Sendable { let name: String let frequency: Frequency - let work: () async throws -> Void - } - - public private(set) var isStarted: Bool = false - private var tasks: [ScheduledTask] = [] - private var scheduled: [Scheduled] = [] - private let lock = NIOLock() - - /// Start scheduling with the given loop. - /// - /// - Parameter scheduleLoop: A loop to run all tasks on. Defaults to the - /// next available `EventLoop`. - public func start(on scheduleLoop: EventLoop = LoopGroup.next()) { - guard lock.withLock({ - guard !isStarted else { return false } - isStarted = true - return true - }) else { - Log.warning("This scheduler has already been started.") - return - } + let task: () async throws -> Void - Log.info("Scheduling \(tasks.count) tasks.") - for task in tasks { - schedule(task: task, on: scheduleLoop) - } - } + func run() async throws { + for try await _ in frequency.cancelOnGracefulShutdown() { + do { + Log.info("Scheduling \(name) (\(frequency.cron.string))") + try await task() + } catch { + // log an error but don't throw - we don't want to stop all + // scheduling if a single instance of a task results in + // an error. + Log.error("Error scheduling \(name): \(error)") + } + } - public func shutdown() async throws { - lock.withLock { - isStarted = false - scheduled.forEach { $0.cancel() } - scheduled = [] + Log.info("Scheduling \(name) complete; there are no future times in the frequency.") } } - private func schedule(task: ScheduledTask, on loop: EventLoop) { - guard let delay = task.frequency.timeUntilNext() else { - Log.info("Scheduling \(task.name) complete; there are no future times in the frequency.") - return - } - - lock.withLock { - guard isStarted else { - Log.debug("Not scheduling task \(task.name), this Scheduler is not started.") - return - } + private var tasks: [Task] = [] - let scheduledTask = loop.flatScheduleTask(in: delay) { - loop.asyncSubmit { - // Schedule next and run - self.schedule(task: task, on: loop) - - try await task.work() - } - } - - scheduled.append(scheduledTask) + /// Start scheduling. + public func start() { + Log.info("Scheduling \(tasks.count) tasks.") + for task in tasks { + Life.addService(task) } } @@ -78,18 +49,11 @@ public final class Scheduler { /// - Returns: A builder for customizing the scheduling frequency. public func task(_ name: String? = nil, _ task: @escaping () async throws -> Void) -> Frequency { let frequency = Frequency() - let name = name ?? "task_\(tasks.count)" - let task = ScheduledTask(name: name, frequency: frequency) { - do { - Log.info("Scheduling \(name) (\(frequency.cron.string))") - try await task() - } catch { - Log.error("Error scheduling \(name): \(error)") - throw error - } - } - - tasks.append(task) + tasks.append( + Task(name: name ?? "task_\(tasks.count)", + frequency: frequency, + task: task) + ) return frequency } @@ -100,7 +64,9 @@ public final class Scheduler { /// - queue: The queue to schedule it on. /// - channel: The queue channel to schedule it on. /// - Returns: A builder for customizing the scheduling frequency. - public func job(_ job: @escaping @autoclosure () -> J, queue: Queue = Q, channel: String = Queue.defaultChannel) -> Frequency { + public func job(_ job: @escaping @autoclosure () -> J, + queue: Queue = Q, + channel: String = Queue.defaultChannel) -> Frequency { // Register the job, just in case the user forgot. Jobs.register(J.self)