diff --git a/_raw_semaphore.ts b/_raw_semaphore.ts new file mode 100644 index 0000000..5833638 --- /dev/null +++ b/_raw_semaphore.ts @@ -0,0 +1,57 @@ +/** + * @internal + */ +export class RawSemaphore { + #resolves: (() => void)[] = []; + #value: number; + #size: number; + + /** + * Creates a new semaphore with the specified limit. + * + * @param size The maximum number of times the semaphore can be acquired before blocking. + * @throws {RangeError} if the size is not a positive safe integer. + */ + constructor(size: number) { + if (size <= 0 || !Number.isSafeInteger(size)) { + throw new RangeError( + `size must be a positive safe integer, got ${size}`, + ); + } + this.#value = size; + this.#size = size; + } + + /** + * Returns true if the semaphore is currently locked. + */ + get locked(): boolean { + return this.#value === 0; + } + + /** + * Acquires the semaphore, blocking until the semaphore is available. + */ + acquire(): Promise { + if (this.#value > 0) { + this.#value -= 1; + return Promise.resolve(); + } else { + const { promise, resolve } = Promise.withResolvers(); + this.#resolves.push(resolve); + return promise; + } + } + + /** + * Releases the semaphore, allowing the next waiting operation to proceed. + */ + release(): void { + const resolve = this.#resolves.shift(); + if (resolve) { + resolve(); + } else if (this.#value < this.#size) { + this.#value += 1; + } + } +} diff --git a/barrier.ts b/barrier.ts index 8ffbd53..8f4f3b2 100644 --- a/barrier.ts +++ b/barrier.ts @@ -1,5 +1,3 @@ -import { Notify } from "./notify.ts"; - /** * A synchronization primitive that allows multiple tasks to wait until all of * them have reached a certain point of execution before continuing. @@ -26,8 +24,8 @@ import { Notify } from "./notify.ts"; * ``` */ export class Barrier { - #notify = new Notify(); - #rest: number; + #waiter: PromiseWithResolvers = Promise.withResolvers(); + #value: number; /** * Creates a new `Barrier` that blocks until `size` threads have called `wait`. @@ -41,23 +39,24 @@ export class Barrier { `size must be a positive safe integer, got ${size}`, ); } - this.#rest = size; + this.#value = size; } /** * Wait for all threads to reach the barrier. * Blocks until all threads reach the barrier. */ - async wait({ signal }: { signal?: AbortSignal } = {}): Promise { - signal?.throwIfAborted(); - this.#rest -= 1; - if (this.#rest === 0) { - await Promise.all([ - this.#notify.notified({ signal }), - this.#notify.notifyAll(), - ]); - } else { - await this.#notify.notified({ signal }); + wait({ signal }: { signal?: AbortSignal } = {}): Promise { + if (signal?.aborted) { + return Promise.reject(signal.reason); + } + const { promise, resolve, reject } = this.#waiter; + const abort = () => reject(signal!.reason); + signal?.addEventListener("abort", abort, { once: true }); + this.#value -= 1; + if (this.#value === 0) { + resolve(); } + return promise.finally(() => signal?.removeEventListener("abort", abort)); } } diff --git a/barrier_bench.ts b/barrier_bench.ts new file mode 100644 index 0000000..58ad239 --- /dev/null +++ b/barrier_bench.ts @@ -0,0 +1,23 @@ +import { Barrier as Barrier100 } from "jsr:@core/asyncutil@~1.0.0/barrier"; +import { Barrier } from "./barrier.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const barrier = new Barrier(length); + await Promise.all(Array.from({ length }).map(() => barrier.wait())); + }, + group: "Barrier#wait", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const barrier = new Barrier100(length); + await Promise.all(Array.from({ length }).map(() => barrier.wait())); + }, + group: "Barrier#wait", +}); diff --git a/deno.jsonc b/deno.jsonc index ae1868d..67aa3d3 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -26,6 +26,7 @@ ], "exclude": [ "**/*_test.ts", + "**/*_bench.ts", ".*" ] }, diff --git a/deno.lock b/deno.lock index 1b7cd6e..d1f004c 100644 --- a/deno.lock +++ b/deno.lock @@ -2,6 +2,7 @@ "version": "3", "packages": { "specifiers": { + "jsr:@core/asyncutil@~1.0.0": "jsr:@core/asyncutil@1.0.2", "jsr:@core/iterutil@^0.6.0": "jsr:@core/iterutil@0.6.0", "jsr:@cross/runtime@^1.0.0": "jsr:@cross/runtime@1.0.0", "jsr:@cross/test@^0.0.9": "jsr:@cross/test@0.0.9", @@ -11,6 +12,12 @@ "npm:@types/node": "npm:@types/node@18.16.19" }, "jsr": { + "@core/asyncutil@1.0.2": { + "integrity": "efb2c41ccf6d9ba481f1fedb87734813530ef64ca2193a89f595cd1483b71476", + "dependencies": [ + "jsr:@core/iterutil@^0.6.0" + ] + }, "@core/iterutil@0.6.0": { "integrity": "8de0d0062a515496ae744983941d7e379668c2ee2edf43f63423e8da753828b1" }, diff --git a/lock.ts b/lock.ts index bf49c90..7afa1ee 100644 --- a/lock.ts +++ b/lock.ts @@ -1,4 +1,4 @@ -import { Mutex } from "./mutex.ts"; +import { RawSemaphore } from "./_raw_semaphore.ts"; /** * A mutual exclusion lock that provides safe concurrent access to a shared value. @@ -16,7 +16,7 @@ import { Mutex } from "./mutex.ts"; * ``` */ export class Lock { - #mu = new Mutex(); + #sem = new RawSemaphore(1); #value: T; /** @@ -32,7 +32,7 @@ export class Lock { * Returns true if the lock is currently locked, false otherwise. */ get locked(): boolean { - return this.#mu.locked; + return this.#sem.locked; } /** @@ -43,7 +43,11 @@ export class Lock { * @returns A Promise that resolves with the result of the function. */ async lock(fn: (value: T) => R | PromiseLike): Promise { - using _lock = await this.#mu.acquire(); - return await fn(this.#value); + await this.#sem.acquire(); + try { + return await fn(this.#value); + } finally { + this.#sem.release(); + } } } diff --git a/lock_bench.ts b/lock_bench.ts new file mode 100644 index 0000000..4a5c4bf --- /dev/null +++ b/lock_bench.ts @@ -0,0 +1,23 @@ +import { Lock as Lock100 } from "jsr:@core/asyncutil@~1.0.0/lock"; +import { Lock } from "./lock.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const lock = new Lock(0); + await Promise.all(Array.from({ length }).map(() => lock.lock(() => {}))); + }, + group: "Lock#lock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const lock = new Lock100(0); + await Promise.all(Array.from({ length }).map(() => lock.lock(() => {}))); + }, + group: "Lock#lock", +}); diff --git a/mutex.ts b/mutex.ts index 92c8133..e704d69 100644 --- a/mutex.ts +++ b/mutex.ts @@ -1,3 +1,5 @@ +import { RawSemaphore } from "./_raw_semaphore.ts"; + /** * A mutex (mutual exclusion) is a synchronization primitive that grants * exclusive access to a shared resource. @@ -26,13 +28,13 @@ * ``` */ export class Mutex { - #waiters: Set> = new Set(); + #sem: RawSemaphore = new RawSemaphore(1); /** * Returns true if the mutex is locked, false otherwise. */ get locked(): boolean { - return this.#waiters.size > 0; + return this.#sem.locked; } /** @@ -40,19 +42,11 @@ export class Mutex { * * @returns A Promise with Disposable that releases the mutex when disposed. */ - acquire(): Promise & Disposable { - const waiters = [...this.#waiters]; - const { promise, resolve } = Promise.withResolvers(); - this.#waiters.add(promise); - const disposable = { + acquire(): Promise { + return this.#sem.acquire().then(() => ({ [Symbol.dispose]: () => { - resolve(); - this.#waiters.delete(promise); + this.#sem.release(); }, - }; - return Object.assign( - Promise.all(waiters).then(() => disposable), - disposable, - ); + })); } } diff --git a/mutex_bench.ts b/mutex_bench.ts new file mode 100644 index 0000000..801d40a --- /dev/null +++ b/mutex_bench.ts @@ -0,0 +1,33 @@ +import { Mutex as Mutex100 } from "jsr:@core/asyncutil@~1.0.0/mutex"; +import { Mutex } from "./mutex.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const mutex = new Mutex(); + await Promise.all( + Array.from({ length }).map(async () => { + const lock = await mutex.acquire(); + lock[Symbol.dispose](); + }), + ); + }, + group: "Mutex#wait", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const mutex = new Mutex100(); + await Promise.all( + Array.from({ length }).map(async () => { + const lock = await mutex.acquire(); + lock[Symbol.dispose](); + }), + ); + }, + group: "Mutex#wait", +}); diff --git a/notify.ts b/notify.ts index 355c633..7732bca 100644 --- a/notify.ts +++ b/notify.ts @@ -1,6 +1,3 @@ -import { iter } from "@core/iterutil/iter"; -import { take } from "@core/iterutil/take"; - /** * Async notifier that allows one or more "waiters" to wait for a notification. * @@ -23,13 +20,13 @@ import { take } from "@core/iterutil/take"; * ``` */ export class Notify { - #waiters: Set> = new Set(); + #waiters: PromiseWithResolvers[] = []; /** * Returns the number of waiters that are waiting for notification. */ get waiterCount(): number { - return this.#waiters.size; + return this.#waiters.length; } /** @@ -43,21 +40,15 @@ export class Notify { if (n <= 0 || !Number.isSafeInteger(n)) { throw new RangeError(`n must be a positive safe integer, got ${n}`); } - const it = iter(this.#waiters); - for (const waiter of take(it, n)) { - waiter.resolve(); - } - this.#waiters = new Set(it); + this.#waiters.splice(0, n).forEach(({ resolve }) => resolve()); } /** * Notifies all waiters that are waiting for notification. Resolves each of the notified waiters. */ notifyAll(): void { - for (const waiter of this.#waiters) { - waiter.resolve(); - } - this.#waiters = new Set(); + this.#waiters.forEach(({ resolve }) => resolve()); + this.#waiters = []; } /** @@ -65,18 +56,21 @@ export class Notify { * the `notify` method is called. The method returns a Promise that resolves when the caller is notified. * Optionally takes an AbortSignal to abort the waiting if the signal is aborted. */ - async notified({ signal }: { signal?: AbortSignal } = {}): Promise { + notified({ signal }: { signal?: AbortSignal } = {}): Promise { if (signal?.aborted) { - throw signal.reason; + return Promise.reject(signal.reason); } - const waiter = Promise.withResolvers(); const abort = () => { - this.#waiters.delete(waiter); - waiter.reject(signal!.reason); + const waiter = this.#waiters.shift(); + if (waiter) { + waiter.reject(signal!.reason); + } }; signal?.addEventListener("abort", abort, { once: true }); - this.#waiters.add(waiter); - await waiter.promise; - signal?.removeEventListener("abort", abort); + const waiter = Promise.withResolvers(); + this.#waiters.push(waiter); + return waiter.promise.finally(() => { + signal?.removeEventListener("abort", abort); + }); } } diff --git a/notify_bench.ts b/notify_bench.ts new file mode 100644 index 0000000..02f50fc --- /dev/null +++ b/notify_bench.ts @@ -0,0 +1,70 @@ +import { Notify as Notify100 } from "jsr:@core/asyncutil@~1.0.0/notify"; +import { Notify } from "./notify.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const notify = new Notify(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + notify.notifyAll(); + await waiter; + }, + group: "Notify#notifyAll", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const notify = new Notify100(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + notify.notifyAll(); + await waiter; + }, + group: "Notify#notifyAll", +}); + +Deno.bench({ + name: "current", + fn: async () => { + const notify = new Notify(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + Array + .from({ length: length }, () => notify.notify()) + .forEach(() => notify.notify()); + await waiter; + }, + group: "Notify#notify", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const notify = new Notify100(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + Array + .from({ length: length }) + .forEach(() => notify.notify()); + await waiter; + }, + group: "Notify#notify", +}); diff --git a/queue.ts b/queue.ts index 59138cb..ab40158 100644 --- a/queue.ts +++ b/queue.ts @@ -1,5 +1,3 @@ -import { Notify } from "./notify.ts"; - /** * A queue implementation that allows for adding and removing elements, with optional waiting when * popping elements from an empty queue. @@ -18,7 +16,7 @@ import { Notify } from "./notify.ts"; * ``` */ export class Queue | null> { - #notify = new Notify(); + #resolves: (() => void)[] = []; #items: T[] = []; /** @@ -32,7 +30,7 @@ export class Queue | null> { * Returns true if the queue is currently locked. */ get locked(): boolean { - return this.#notify.waiterCount > 0; + return this.#resolves.length > 0; } /** @@ -40,7 +38,7 @@ export class Queue | null> { */ push(value: T): void { this.#items.push(value); - this.#notify.notify(); + this.#resolves.shift()?.(); } /** @@ -55,7 +53,12 @@ export class Queue | null> { if (value !== undefined) { return value; } - await this.#notify.notified({ signal }); + const { promise, resolve, reject } = Promise.withResolvers(); + signal?.addEventListener("abort", () => reject(signal.reason), { + once: true, + }); + this.#resolves.push(resolve); + await promise; } } } diff --git a/queue_bench.ts b/queue_bench.ts new file mode 100644 index 0000000..990cb8d --- /dev/null +++ b/queue_bench.ts @@ -0,0 +1,29 @@ +import { Queue as Queue100 } from "jsr:@core/asyncutil@~1.0.0/queue"; +import { Queue } from "./queue.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const queue = new Queue(); + Array + .from({ length }) + .forEach(() => queue.push(1)); + await Promise.all(Array.from({ length }).map(() => queue.pop())); + }, + group: "Queue#push/pop", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const queue = new Queue100(); + Array + .from({ length }) + .forEach(() => queue.push(1)); + await Promise.allSettled(Array.from({ length }).map(() => queue.pop())); + }, + group: "Queue#push/pop", +}); diff --git a/rw_lock.ts b/rw_lock.ts index 073c881..7326f3d 100644 --- a/rw_lock.ts +++ b/rw_lock.ts @@ -1,4 +1,4 @@ -import { Mutex } from "./mutex.ts"; +import { RawSemaphore } from "./_raw_semaphore.ts"; /** * A reader-writer lock implementation that allows multiple concurrent reads but only one write at a time. @@ -29,8 +29,8 @@ import { Mutex } from "./mutex.ts"; * ``` */ export class RwLock { - #read = new Mutex(); - #write = new Mutex(); + #read = new RawSemaphore(1); + #write = new RawSemaphore(1); #value: T; /** @@ -50,9 +50,17 @@ export class RwLock { * @returns A promise that resolves to the return value of the specified function. */ async lock(fn: (value: T) => R | PromiseLike): Promise { - using _wlock = await this.#write.acquire(); - using _rlock = await this.#read.acquire(); - return await fn(this.#value); + await this.#write.acquire(); + try { + await this.#read.acquire(); + try { + return await fn(this.#value); + } finally { + this.#read.release(); + } + } finally { + this.#write.release(); + } } /** @@ -63,11 +71,19 @@ export class RwLock { * @returns A promise that resolves to the return value of the specified function. */ async rlock(fn: (value: T) => R | PromiseLike): Promise { - using _wlock = this.#write.locked - ? await this.#write.acquire() - : { [Symbol.dispose]: () => {} }; - // Acquire the read lock without waiting to allow multiple readers to access the lock. - using _rlock = this.#read.acquire(); - return await fn(this.#value); + if (this.#write.locked) { + await this.#write.acquire(); + } + try { + // Acquire the read lock without waiting to allow multiple readers to access the lock. + this.#read.acquire(); + try { + return await fn(this.#value); + } finally { + this.#read.release(); + } + } finally { + this.#write.release(); + } } } diff --git a/rw_lock_bench.ts b/rw_lock_bench.ts new file mode 100644 index 0000000..ee70529 --- /dev/null +++ b/rw_lock_bench.ts @@ -0,0 +1,42 @@ +import { RwLock as RwLock100 } from "jsr:@core/asyncutil@~1.0.0/rw-lock"; +import { RwLock } from "./rw_lock.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const rwLock = new RwLock(0); + await Promise.all(Array.from({ length }).map(() => rwLock.lock(() => {}))); + }, + group: "RwLock#lock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const rwLock = new RwLock100(0); + await Promise.all(Array.from({ length }).map(() => rwLock.lock(() => {}))); + }, + group: "RwLock#lock", +}); + +Deno.bench({ + name: "current", + fn: async () => { + const rwLock = new RwLock(0); + await Promise.all(Array.from({ length }).map(() => rwLock.rlock(() => {}))); + }, + group: "RwLock#rlock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const rwLock = new RwLock100(0); + await Promise.all(Array.from({ length }).map(() => rwLock.rlock(() => {}))); + }, + group: "RwLock#rlock", +}); diff --git a/semaphore.ts b/semaphore.ts index 3aba954..80fd18a 100644 --- a/semaphore.ts +++ b/semaphore.ts @@ -1,4 +1,4 @@ -import { Notify } from "./notify.ts"; +import { RawSemaphore } from "./_raw_semaphore.ts"; /** * A semaphore that allows a limited number of concurrent executions of an operation. @@ -16,8 +16,7 @@ import { Notify } from "./notify.ts"; * ``` */ export class Semaphore { - #notify = new Notify(); - #rest: number; + #sem: RawSemaphore; /** * Creates a new semaphore with the specified limit. @@ -26,19 +25,14 @@ export class Semaphore { * @throws {RangeError} if the size is not a positive safe integer. */ constructor(size: number) { - if (size <= 0 || !Number.isSafeInteger(size)) { - throw new RangeError( - `size must be a positive safe integer, got ${size}`, - ); - } - this.#rest = size + 1; + this.#sem = new RawSemaphore(size); } /** * Returns true if the semaphore is currently locked. */ get locked(): boolean { - return this.#rest === 0; + return this.#sem.locked; } /** @@ -48,29 +42,11 @@ export class Semaphore { * @returns A promise that resolves to the return value of the specified function. */ async lock(fn: () => R | PromiseLike): Promise { - await this.#acquire(); + await this.#sem.acquire(); try { return await fn(); } finally { - this.#release(); - } - } - - async #acquire(): Promise { - if (this.#rest > 0) { - this.#rest -= 1; - } - if (this.#rest === 0) { - await this.#notify.notified(); - } - } - - #release(): void { - if (this.#notify.waiterCount > 0) { - this.#notify.notify(); - } - if (this.#notify.waiterCount === 0) { - this.#rest += 1; + this.#sem.release(); } } } diff --git a/semaphore_bench.ts b/semaphore_bench.ts new file mode 100644 index 0000000..0302a39 --- /dev/null +++ b/semaphore_bench.ts @@ -0,0 +1,27 @@ +import { Semaphore as Semaphore100 } from "jsr:@core/asyncutil@~1.0.0/semaphore"; +import { Semaphore } from "./semaphore.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const semaphore = new Semaphore(10); + await Promise.all( + Array.from({ length }).map(() => semaphore.lock(() => {})), + ); + }, + group: "Semaphore#lock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const semaphore = new Semaphore100(10); + await Promise.all( + Array.from({ length }).map(() => semaphore.lock(() => {})), + ); + }, + group: "Semaphore#lock", +}); diff --git a/stack.ts b/stack.ts index 01bb33d..a5876c3 100644 --- a/stack.ts +++ b/stack.ts @@ -1,5 +1,3 @@ -import { Notify } from "./notify.ts"; - /** * A stack implementation that allows for adding and removing elements, with optional waiting when * popping elements from an empty stack. @@ -20,7 +18,7 @@ import { Notify } from "./notify.ts"; * @template T The type of items in the stack. */ export class Stack | null> { - #notify = new Notify(); + #resolves: (() => void)[] = []; #items: T[] = []; /** @@ -34,7 +32,7 @@ export class Stack | null> { * Returns true if the stack is currently locked. */ get locked(): boolean { - return this.#notify.waiterCount > 0; + return this.#resolves.length > 0; } /** @@ -44,7 +42,7 @@ export class Stack | null> { */ push(value: T): void { this.#items.push(value); - this.#notify.notify(); + this.#resolves.shift()?.(); } /** @@ -59,7 +57,12 @@ export class Stack | null> { if (value !== undefined) { return value; } - await this.#notify.notified({ signal }); + const { promise, resolve, reject } = Promise.withResolvers(); + signal?.addEventListener("abort", () => reject(signal.reason), { + once: true, + }); + this.#resolves.push(resolve); + await promise; } } } diff --git a/stack_bench.ts b/stack_bench.ts new file mode 100644 index 0000000..0277c91 --- /dev/null +++ b/stack_bench.ts @@ -0,0 +1,29 @@ +import { Stack as Stack100 } from "jsr:@core/asyncutil@~1.0.0/stack"; +import { Stack } from "./stack.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const stack = new Stack(); + Array + .from({ length }) + .forEach(() => stack.push(1)); + await Promise.all(Array.from({ length }).map(() => stack.pop())); + }, + group: "Stack#push/pop", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const stack = new Stack100(); + Array + .from({ length }) + .forEach(() => stack.push(1)); + await Promise.allSettled(Array.from({ length }).map(() => stack.pop())); + }, + group: "Stack#push/pop", +}); diff --git a/wait_group_bench.ts b/wait_group_bench.ts new file mode 100644 index 0000000..ff3724f --- /dev/null +++ b/wait_group_bench.ts @@ -0,0 +1,29 @@ +import { WaitGroup as WaitGroup100 } from "jsr:@core/asyncutil@~1.0.0/wait-group"; +import { WaitGroup } from "./wait_group.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const wg = new WaitGroup(); + const waiter = wg.wait(); + Array.from({ length }).forEach(() => wg.add(1)); + Array.from({ length }).forEach(() => wg.done()); + await waiter; + }, + group: "WaitGroup#wait", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const wg = new WaitGroup100(); + const waiter = wg.wait(); + Array.from({ length }).forEach(() => wg.add(1)); + Array.from({ length }).forEach(() => wg.done()); + await waiter; + }, + group: "WaitGroup#wait", +});