Skip to content

Commit

Permalink
async streamify event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuawright11 committed Jul 27, 2024
1 parent 00ae0de commit 2a0c2a5
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 114 deletions.
80 changes: 40 additions & 40 deletions Alchemy/Database/Rune/Model/Model+CRUD.swift
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ extension Array where Element: Model {

/// Inserts each element in this array to a database.
public func insertAll(on db: Database = Element.database) async throws {
try await Element.willCreate(self)
Element.willCreate(self)
try await Element.query(on: db).insert(try insertableFields(on: db))
try await Element.didCreate(self)
Element.didCreate(self)
}

/// Inserts and returns each element in this array to a database.
Expand All @@ -232,11 +232,11 @@ extension Array where Element: Model {

func _insertReturnAll(on db: Database = Element.database, fieldOverrides: SQLFields = [:]) async throws -> Self {
let fields = try insertableFields(on: db).map { $0 + fieldOverrides }
try await Element.willCreate(self)
Element.willCreate(self)
let results = try await Element.query(on: db)
.insertReturn(fields)
.map { try $0.decodeModel(Element.self) }
try await Element.didCreate(results)
Element.didCreate(results)
return results
}

Expand All @@ -250,27 +250,27 @@ extension Array where Element: Model {
public func updateAll(on db: Database = Element.database, _ fields: SQLFields) async throws {
let ids = map(\.id)
let fields = touchUpdatedAt(on: db, fields)
try await Element.willUpdate(self)
Element.willUpdate(self)
try await Element.query(on: db)
.where(Element.idKey, in: ids)
.update(fields)
try await Element.didUpdate(self)
Element.didUpdate(self)
}

// MARK: UPSERT

public func upsertAll(on db: Database = Element.database, conflicts: [String] = Element.upsertConflictKeys) async throws {
try await Element.willUpsert(self)
Element.willUpsert(self)
try await Element.query(on: db).upsert(try insertableFields(on: db), conflicts: conflicts)
try await Element.didUpsert(self)
Element.didUpsert(self)
}

public func upsertReturnAll(on db: Database = Element.database, conflicts: [String] = Element.upsertConflictKeys) async throws -> Self {
try await Element.willUpsert(self)
Element.willUpsert(self)
let results = try await Element.query(on: db)
.upsertReturn(try insertableFields(on: db), conflicts: conflicts)
.map { try $0.decodeModel(Element.self) }
try await Element.didUpsert(results)
Element.didUpsert(results)
return results
}

Expand All @@ -280,14 +280,14 @@ extension Array where Element: Model {
/// array isn't actually in the database, it will be ignored.
public func deleteAll(on db: Database = Element.database) async throws {
let ids = map(\.id)
try await Element.willDelete(self)
Element.willDelete(self)
try await Element.query(on: db)
.where(Element.idKey, in: ids)
.delete()

forEach { ($0 as? any Model & SoftDeletes)?.deletedAt = Date() }

try await Element.didDelete(self)
Element.didDelete(self)
}

// MARK: Refresh
Expand Down Expand Up @@ -339,53 +339,53 @@ extension Array where Element: Model {
// MARK: Model Events

extension Model {
static func didFetch(_ models: [Self]) async throws {
try await ModelDidFetch(models: models).fire()
static func didFetch(_ models: [Self]) {
ModelDidFetch(models: models).fire()
}

static func willDelete(_ models: [Self]) async throws {
try await ModelWillDelete(models: models).fire()
static func willDelete(_ models: [Self]) {
ModelWillDelete(models: models).fire()
}

static func didDelete(_ models: [Self]) async throws {
try await ModelDidDelete(models: models).fire()
static func didDelete(_ models: [Self]) {
ModelDidDelete(models: models).fire()
}

fileprivate static func willCreate(_ models: [Self]) async throws {
try await ModelWillCreate(models: models).fire()
try await willSave(models)
fileprivate static func willCreate(_ models: [Self]) {
ModelWillCreate(models: models).fire()
willSave(models)
}

fileprivate static func didCreate(_ models: [Self]) async throws {
try await ModelDidCreate(models: models).fire()
try await didSave(models)
fileprivate static func didCreate(_ models: [Self]) {
ModelDidCreate(models: models).fire()
didSave(models)
}

fileprivate static func willUpsert(_ models: [Self]) async throws {
try await ModelWillUpsert(models: models).fire()
try await willSave(models)
fileprivate static func willUpsert(_ models: [Self]) {
ModelWillUpsert(models: models).fire()
willSave(models)
}

fileprivate static func didUpsert(_ models: [Self]) async throws {
try await ModelDidUpsert(models: models).fire()
try await didSave(models)
fileprivate static func didUpsert(_ models: [Self]) {
ModelDidUpsert(models: models).fire()
didSave(models)
}

fileprivate static func willUpdate(_ models: [Self]) async throws {
try await ModelWillUpdate(models: models).fire()
try await willSave(models)
fileprivate static func willUpdate(_ models: [Self]) {
ModelWillUpdate(models: models).fire()
willSave(models)
}

fileprivate static func didUpdate(_ models: [Self]) async throws {
try await ModelDidUpdate(models: models).fire()
try await didSave(models)
fileprivate static func didUpdate(_ models: [Self]) {
ModelDidUpdate(models: models).fire()
didSave(models)
}

private static func willSave(_ models: [Self]) async throws {
try await ModelWillSave(models: models).fire()
private static func willSave(_ models: [Self]) {
ModelWillSave(models: models).fire()
}

private static func didSave(_ models: [Self]) async throws {
try await ModelDidSave(models: models).fire()
private static func didSave(_ models: [Self]) {
ModelDidSave(models: models).fire()
}
}
2 changes: 1 addition & 1 deletion Alchemy/Database/Rune/Model/Model.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ extension Database {
public func table<M: Model>(_ model: M.Type, as alias: String? = nil) -> Query<M> {
let tableName = alias.map { "\(model.table) AS \($0)" } ?? model.table
return Query(db: self, table: tableName)
.didLoad { try await M.didFetch($0) }
.didLoad { M.didFetch($0) }
}
}

Expand Down
22 changes: 11 additions & 11 deletions Alchemy/Database/Rune/Model/ModelEvents.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,46 +44,46 @@ public struct ModelDidSave<M: Model>: Event {

extension EventBus {
public func onDidFetch<M: Model>(_ type: M.Type, action: @escaping (ModelDidFetch<M>) async throws -> Void) {
on(ModelDidFetch<M>.self, handler: action)
listen(ModelDidFetch<M>.self, handler: action)
}

public func onWillCreate<M: Model>(_ type: M.Type, action: @escaping (ModelWillCreate<M>) async throws -> Void) {
on(ModelWillCreate<M>.self, handler: action)
listen(ModelWillCreate<M>.self, handler: action)
}

public func onDidCreate<M: Model>(_ type: M.Type, action: @escaping (ModelDidCreate<M>) async throws -> Void) {
on(ModelDidCreate<M>.self, handler: action)
listen(ModelDidCreate<M>.self, handler: action)
}

public func onWillUpsert<M: Model>(_ type: M.Type, action: @escaping (ModelWillUpsert<M>) async throws -> Void) {
on(ModelWillUpsert<M>.self, handler: action)
listen(ModelWillUpsert<M>.self, handler: action)
}

public func onDidUpsert<M: Model>(_ type: M.Type, action: @escaping (ModelDidUpsert<M>) async throws -> Void) {
on(ModelDidUpsert<M>.self, handler: action)
listen(ModelDidUpsert<M>.self, handler: action)
}

public func onWillUpdate<M: Model>(_ type: M.Type, action: @escaping (ModelWillUpdate<M>) async throws -> Void) {
on(ModelWillUpdate<M>.self, handler: action)
listen(ModelWillUpdate<M>.self, handler: action)
}

public func onDidUpdate<M: Model>(_ type: M.Type, action: @escaping (ModelDidUpdate<M>) async throws -> Void) {
on(ModelDidUpdate<M>.self, handler: action)
listen(ModelDidUpdate<M>.self, handler: action)
}

public func onWillSave<M: Model>(_ type: M.Type, action: @escaping (ModelWillSave<M>) async throws -> Void) {
on(ModelWillSave<M>.self, handler: action)
listen(ModelWillSave<M>.self, handler: action)
}

public func onDidSave<M: Model>(_ type: M.Type, action: @escaping (ModelDidSave<M>) async throws -> Void) {
on(ModelDidSave<M>.self, handler: action)
listen(ModelDidSave<M>.self, handler: action)
}

public func onWillDelete<M: Model>(_ type: M.Type, action: @escaping (ModelWillDelete<M>) async throws -> Void) {
on(ModelWillDelete<M>.self, handler: action)
listen(ModelWillDelete<M>.self, handler: action)
}

public func onDidDelete<M: Model>(_ type: M.Type, action: @escaping (ModelDidDelete<M>) async throws -> Void) {
on(ModelDidDelete<M>.self, handler: action)
listen(ModelDidDelete<M>.self, handler: action)
}
}
4 changes: 2 additions & 2 deletions Alchemy/Events/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ extension Event {
public static var registrationKey: String { name(of: Self.self) }

/// Fire this event on an `EventBus`.
public func fire(on bus: EventBus = Events) async throws {
try await bus.fire(self)
public func fire(on bus: EventBus = Events) {
bus.fire(self)
}
}
101 changes: 44 additions & 57 deletions Alchemy/Events/EventBus.swift
Original file line number Diff line number Diff line change
@@ -1,81 +1,68 @@
import NIOConcurrencyHelpers
import AsyncAlgorithms

public final class EventBus: IdentifiedService {
public typealias Identifier = ServiceIdentifier<EventBus>

public typealias Handler<E: Event> = (E) async throws -> Void
private typealias AnyHandler = (Event) async throws -> Void
private struct EventJob<E: Event & Codable>: Job, Codable {
let event: E
let listenerId: String

private var listeners: [String: any Listener] = [:]
private var handlers: [String: [AnyHandler]] = [:]
private let lock = NIOLock()
func handle(context: JobContext) async throws {
guard let listener = Events.listeners[listenerId] as? any QueueableListener<E> else {
throw JobError("Unable to find registered listener of type `\(listenerId)` to handle a queued event.")
}

public func on<E: Event>(_ event: E.Type, handler: @escaping Handler<E>) {
lock.withLock {
handlers[E.registrationKey, default: []] += [convertHandler(handler)]
try await listener.handle(event: event)
}
}

public func register<L: Listener>(listener: L) {
lock.withLock {
handlers[L.ObservedEvent.registrationKey, default: []] += [convertHandler(listener.handle)]
listeners[L.registryId] = listener
}
}
fileprivate var listeners: [String: any Listener] = [:]
private let channel = AsyncChannel<Event>()

public func register<L: QueueableListener>(listener: L) {
lock.withLock {
Jobs.register(EventJob<L.ObservedEvent>.self)
handlers[L.ObservedEvent.registrationKey, default: []] += [convertHandler(listener.dispatch)]
listeners[L.registryId] = listener
}
}

public func fire<E: Event>(_ event: E) async throws {
let handlers = lock.withLock { self.handlers[E.registrationKey] ?? [] }
for handle in handlers {
try await handle(event)
}
public func stream<E: Event>(of: E.Type) -> AsyncStream<E> {
channel
.compactMap { $0 as? E }
.stream
}

fileprivate func lookupListener<E: Event>(_ id: String, eventType: E.Type = E.self) throws -> any Listener<E> {
guard let listener = Events.listeners[id] as? any Listener<E> else {
throw JobError("Unable to find registered listener of type `\(id)` to handle a queued event.")
@discardableResult
public func listen<E: Event>(_ event: E.Type, handler: @escaping (E) async throws -> Void) -> Task<Void, Error> {
Task {
for await event in stream(of: event) {
try await handler(event)
}
}

return listener
}

private func convertHandler<E: Event>(_ handler: @escaping Handler<E>) -> AnyHandler {
return { event in
guard let event = event as? E else {
Log.error("Event handler type mismatch for \(E.registrationKey)!")
return
@discardableResult
public func register<L: Listener>(listener: L) -> Task<Void, Error> {
listeners[L.registryId] = listener
return Task {
for await event in stream(of: L.ObservedEvent.self) {
try await listener.handle(event: event)
}

try await handler(event)
}
}
}

private struct EventJob<E: Event & Codable>: Job, Codable {
let event: E
let listenerId: String

func handle(context: JobContext) async throws {
try await Events.lookupListener(listenerId, eventType: E.self).handle(event: event)
@discardableResult
public func register<L: QueueableListener>(listener: L) -> Task<Void, Error> {
listeners[L.registryId] = listener
return Task {
for await event in stream(of: L.ObservedEvent.self) {
if listener.shouldQueue(event: event) {
try await listener.handle(event: event)
} else {
try await EventJob(event: event, listenerId: L.registryId)
.dispatch(on: listener.queue, channel: listener.channel)
}
}
}
}
}

extension QueueableListener {
fileprivate func dispatch(event: ObservedEvent) async throws {
guard shouldQueue(event: event) else {
try await handle(event: event)
return

public func fire<E: Event>(_ event: E) {
Task {
await channel.send(event)
}

try await EventJob(event: event, listenerId: Self.registryId)
.dispatch(on: queue, channel: channel)
}
}

Expand Down
2 changes: 1 addition & 1 deletion Alchemy/Events/QueueableListener.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// A listener that handles its events on a background `Queue`.
public protocol QueueableListener: Listener where ObservedEvent: Codable {
public protocol QueueableListener<ObservedEvent>: Listener where ObservedEvent: Codable {
/// The queue where events will be dispatched.
var queue: Queue { get }

Expand Down
7 changes: 7 additions & 0 deletions Alchemy/Utilities/Extensions/AsyncSequence+Utilities.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import ConcurrencyExtras

extension AsyncSequence {
public var stream: AsyncStream<Element> {
eraseToStream()
}
}
2 changes: 1 addition & 1 deletion Example/App.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Alchemy
@main
@Application
struct App {
func boot() throws {
func boot() {
use(UserController())
}

Expand Down
Loading

0 comments on commit 2a0c2a5

Please sign in to comment.