Skip to content

Commit

Permalink
convert task scheduling to service
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuawright11 committed Jul 26, 2024
1 parent 6044e05 commit e78ec9c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 76 deletions.
15 changes: 11 additions & 4 deletions Alchemy/Queue/QueueWorker.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import AsyncAlgorithms

struct QueueWorker: Service, @unchecked Sendable {
actor QueueWorker: Service {
let queue: Queue
var channels: [String] = [Queue.defaultChannel]
var pollRate: Duration = .seconds(1)
var untilEmpty: Bool = false
var channels: [String]
var pollRate: Duration
var untilEmpty: Bool

init(queue: Queue, channels: [String], pollRate: Duration, untilEmpty: Bool) {
self.queue = queue
self.channels = channels
self.pollRate = pollRate
self.untilEmpty = untilEmpty
}

private var timer: some AsyncSequence {
AsyncTimerSequence(interval: pollRate, clock: ContinuousClock())
Expand Down
25 changes: 23 additions & 2 deletions Alchemy/Scheduler/Frequency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Cron
import Foundation

/// Used to help build schedule frequencies for scheduled tasks.
public final class Frequency {
public final class Frequency: AsyncSequence {
/// A day of the week.
public enum DayOfWeek: Int, ExpressibleByIntegerLiteral {
/// Sunday
Expand Down Expand Up @@ -71,7 +71,7 @@ public final class Frequency {
var cron = try! DatePattern("* * * * * * *")

/// The time amount until the next interval, if there is one.
func timeUntilNext() -> TimeAmount? {
func timeUntilNext() -> Duration? {
guard let next = cron.next(), let nextDate = next.date else {
return nil
}
Expand Down Expand Up @@ -170,4 +170,25 @@ public final class Frequency {
preconditionFailure("Error parsing cron expression '\(expression)': \(error).")
}
}

// MARK: AsyncSequence

public typealias Element = Foundation.Date

public func makeAsyncIterator() -> Iterator {
Iterator(frequency: self)
}

public struct Iterator: AsyncIteratorProtocol {
let frequency: Frequency

public mutating func next() async throws -> Foundation.Date? {
guard let delay = frequency.timeUntilNext() else {
return nil
}

try await Task.sleep(for: delay)
return Date()
}
}
}
4 changes: 0 additions & 4 deletions Alchemy/Scheduler/Plugins/Schedules.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,4 @@ struct Schedules: Plugin {
app.container.register(scheduler).singleton()
app.registerCommand(ScheduleCommand.self)
}

func shutdown(app: Application) async throws {
try await scheduler.shutdown()
}
}
98 changes: 32 additions & 66 deletions Alchemy/Scheduler/Scheduler.swift
Original file line number Diff line number Diff line change
@@ -1,70 +1,41 @@
import AsyncAlgorithms
import NIOCore
import NIOConcurrencyHelpers
import Foundation
import ServiceLifecycle

/// A service for scheduling recurring work, in lieu of a separate cron task
/// running apart from your server.
public final class Scheduler {
private struct ScheduledTask {
private struct Task: Service, @unchecked Sendable {
let name: String
let frequency: Frequency
let work: () async throws -> Void
}

public private(set) var isStarted: Bool = false
private var tasks: [ScheduledTask] = []
private var scheduled: [Scheduled<Void>] = []
private let lock = NIOLock()

/// Start scheduling with the given loop.
///
/// - Parameter scheduleLoop: A loop to run all tasks on. Defaults to the
/// next available `EventLoop`.
public func start(on scheduleLoop: EventLoop = LoopGroup.next()) {
guard lock.withLock({
guard !isStarted else { return false }
isStarted = true
return true
}) else {
Log.warning("This scheduler has already been started.")
return
}
let task: () async throws -> Void

Log.info("Scheduling \(tasks.count) tasks.")
for task in tasks {
schedule(task: task, on: scheduleLoop)
}
}
func run() async throws {
for try await _ in frequency.cancelOnGracefulShutdown() {
do {
Log.info("Scheduling \(name) (\(frequency.cron.string))")
try await task()
} catch {
// log an error but don't throw - we don't want to stop all
// scheduling if a single instance of a task results in
// an error.
Log.error("Error scheduling \(name): \(error)")
}
}

public func shutdown() async throws {
lock.withLock {
isStarted = false
scheduled.forEach { $0.cancel() }
scheduled = []
Log.info("Scheduling \(name) complete; there are no future times in the frequency.")
}
}

private func schedule(task: ScheduledTask, on loop: EventLoop) {
guard let delay = task.frequency.timeUntilNext() else {
Log.info("Scheduling \(task.name) complete; there are no future times in the frequency.")
return
}

lock.withLock {
guard isStarted else {
Log.debug("Not scheduling task \(task.name), this Scheduler is not started.")
return
}
private var tasks: [Task] = []

let scheduledTask = loop.flatScheduleTask(in: delay) {
loop.asyncSubmit {
// Schedule next and run
self.schedule(task: task, on: loop)

try await task.work()
}
}

scheduled.append(scheduledTask)
/// Start scheduling.
public func start() {
Log.info("Scheduling \(tasks.count) tasks.")
for task in tasks {
Life.addService(task)
}
}

Expand All @@ -78,18 +49,11 @@ public final class Scheduler {
/// - Returns: A builder for customizing the scheduling frequency.
public func task(_ name: String? = nil, _ task: @escaping () async throws -> Void) -> Frequency {
let frequency = Frequency()
let name = name ?? "task_\(tasks.count)"
let task = ScheduledTask(name: name, frequency: frequency) {
do {
Log.info("Scheduling \(name) (\(frequency.cron.string))")
try await task()
} catch {
Log.error("Error scheduling \(name): \(error)")
throw error
}
}

tasks.append(task)
tasks.append(
Task(name: name ?? "task_\(tasks.count)",
frequency: frequency,
task: task)
)
return frequency
}

Expand All @@ -100,7 +64,9 @@ public final class Scheduler {
/// - queue: The queue to schedule it on.
/// - channel: The queue channel to schedule it on.
/// - Returns: A builder for customizing the scheduling frequency.
public func job<J: Job>(_ job: @escaping @autoclosure () -> J, queue: Queue = Q, channel: String = Queue.defaultChannel) -> Frequency {
public func job<J: Job>(_ job: @escaping @autoclosure () -> J,
queue: Queue = Q,
channel: String = Queue.defaultChannel) -> Frequency {

// Register the job, just in case the user forgot.
Jobs.register(J.self)
Expand Down

0 comments on commit e78ec9c

Please sign in to comment.