From d7ae5eabf2b4bb590c15f092bca34fdcce242a62 Mon Sep 17 00:00:00 2001 From: Alisue Date: Fri, 16 Aug 2024 20:48:45 +0900 Subject: [PATCH 01/12] feat: exclude `*_bench.ts` from JSR --- deno.jsonc | 1 + 1 file changed, 1 insertion(+) diff --git a/deno.jsonc b/deno.jsonc index ae1868d..67aa3d3 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -26,6 +26,7 @@ ], "exclude": [ "**/*_test.ts", + "**/*_bench.ts", ".*" ] }, From 2e72caf257b6792eedd401ce72fea5ffb1d64ae5 Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 01:17:31 +0900 Subject: [PATCH 02/12] feat: add benchmarks --- barrier_bench.ts | 23 +++++++++++++++ deno.lock | 7 +++++ lock_bench.ts | 23 +++++++++++++++ mutex_bench.ts | 33 +++++++++++++++++++++ notify_bench.ts | 70 +++++++++++++++++++++++++++++++++++++++++++++ queue_bench.ts | 29 +++++++++++++++++++ rw_lock_bench.ts | 42 +++++++++++++++++++++++++++ semaphore_bench.ts | 27 +++++++++++++++++ stack_bench.ts | 29 +++++++++++++++++++ wait_group_bench.ts | 29 +++++++++++++++++++ 10 files changed, 312 insertions(+) create mode 100644 barrier_bench.ts create mode 100644 lock_bench.ts create mode 100644 mutex_bench.ts create mode 100644 notify_bench.ts create mode 100644 queue_bench.ts create mode 100644 rw_lock_bench.ts create mode 100644 semaphore_bench.ts create mode 100644 stack_bench.ts create mode 100644 wait_group_bench.ts diff --git a/barrier_bench.ts b/barrier_bench.ts new file mode 100644 index 0000000..58ad239 --- /dev/null +++ b/barrier_bench.ts @@ -0,0 +1,23 @@ +import { Barrier as Barrier100 } from "jsr:@core/asyncutil@~1.0.0/barrier"; +import { Barrier } from "./barrier.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const barrier = new Barrier(length); + await Promise.all(Array.from({ length }).map(() => barrier.wait())); + }, + group: "Barrier#wait", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const barrier = new Barrier100(length); + await Promise.all(Array.from({ length }).map(() => barrier.wait())); + }, + group: "Barrier#wait", +}); diff --git a/deno.lock b/deno.lock index 1b7cd6e..d1f004c 100644 --- a/deno.lock +++ b/deno.lock @@ -2,6 +2,7 @@ "version": "3", "packages": { "specifiers": { + "jsr:@core/asyncutil@~1.0.0": "jsr:@core/asyncutil@1.0.2", "jsr:@core/iterutil@^0.6.0": "jsr:@core/iterutil@0.6.0", "jsr:@cross/runtime@^1.0.0": "jsr:@cross/runtime@1.0.0", "jsr:@cross/test@^0.0.9": "jsr:@cross/test@0.0.9", @@ -11,6 +12,12 @@ "npm:@types/node": "npm:@types/node@18.16.19" }, "jsr": { + "@core/asyncutil@1.0.2": { + "integrity": "efb2c41ccf6d9ba481f1fedb87734813530ef64ca2193a89f595cd1483b71476", + "dependencies": [ + "jsr:@core/iterutil@^0.6.0" + ] + }, "@core/iterutil@0.6.0": { "integrity": "8de0d0062a515496ae744983941d7e379668c2ee2edf43f63423e8da753828b1" }, diff --git a/lock_bench.ts b/lock_bench.ts new file mode 100644 index 0000000..4a5c4bf --- /dev/null +++ b/lock_bench.ts @@ -0,0 +1,23 @@ +import { Lock as Lock100 } from "jsr:@core/asyncutil@~1.0.0/lock"; +import { Lock } from "./lock.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const lock = new Lock(0); + await Promise.all(Array.from({ length }).map(() => lock.lock(() => {}))); + }, + group: "Lock#lock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const lock = new Lock100(0); + await Promise.all(Array.from({ length }).map(() => lock.lock(() => {}))); + }, + group: "Lock#lock", +}); diff --git a/mutex_bench.ts b/mutex_bench.ts new file mode 100644 index 0000000..801d40a --- /dev/null +++ b/mutex_bench.ts @@ -0,0 +1,33 @@ +import { Mutex as Mutex100 } from "jsr:@core/asyncutil@~1.0.0/mutex"; +import { Mutex } from "./mutex.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const mutex = new Mutex(); + await Promise.all( + Array.from({ length }).map(async () => { + const lock = await mutex.acquire(); + lock[Symbol.dispose](); + }), + ); + }, + group: "Mutex#wait", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const mutex = new Mutex100(); + await Promise.all( + Array.from({ length }).map(async () => { + const lock = await mutex.acquire(); + lock[Symbol.dispose](); + }), + ); + }, + group: "Mutex#wait", +}); diff --git a/notify_bench.ts b/notify_bench.ts new file mode 100644 index 0000000..02f50fc --- /dev/null +++ b/notify_bench.ts @@ -0,0 +1,70 @@ +import { Notify as Notify100 } from "jsr:@core/asyncutil@~1.0.0/notify"; +import { Notify } from "./notify.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const notify = new Notify(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + notify.notifyAll(); + await waiter; + }, + group: "Notify#notifyAll", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const notify = new Notify100(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + notify.notifyAll(); + await waiter; + }, + group: "Notify#notifyAll", +}); + +Deno.bench({ + name: "current", + fn: async () => { + const notify = new Notify(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + Array + .from({ length: length }, () => notify.notify()) + .forEach(() => notify.notify()); + await waiter; + }, + group: "Notify#notify", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const notify = new Notify100(); + const waiter = Promise.all( + Array.from({ length }).map(async () => { + await notify.notified(); + }), + ); + Array + .from({ length: length }) + .forEach(() => notify.notify()); + await waiter; + }, + group: "Notify#notify", +}); diff --git a/queue_bench.ts b/queue_bench.ts new file mode 100644 index 0000000..990cb8d --- /dev/null +++ b/queue_bench.ts @@ -0,0 +1,29 @@ +import { Queue as Queue100 } from "jsr:@core/asyncutil@~1.0.0/queue"; +import { Queue } from "./queue.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const queue = new Queue(); + Array + .from({ length }) + .forEach(() => queue.push(1)); + await Promise.all(Array.from({ length }).map(() => queue.pop())); + }, + group: "Queue#push/pop", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const queue = new Queue100(); + Array + .from({ length }) + .forEach(() => queue.push(1)); + await Promise.allSettled(Array.from({ length }).map(() => queue.pop())); + }, + group: "Queue#push/pop", +}); diff --git a/rw_lock_bench.ts b/rw_lock_bench.ts new file mode 100644 index 0000000..ee70529 --- /dev/null +++ b/rw_lock_bench.ts @@ -0,0 +1,42 @@ +import { RwLock as RwLock100 } from "jsr:@core/asyncutil@~1.0.0/rw-lock"; +import { RwLock } from "./rw_lock.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const rwLock = new RwLock(0); + await Promise.all(Array.from({ length }).map(() => rwLock.lock(() => {}))); + }, + group: "RwLock#lock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const rwLock = new RwLock100(0); + await Promise.all(Array.from({ length }).map(() => rwLock.lock(() => {}))); + }, + group: "RwLock#lock", +}); + +Deno.bench({ + name: "current", + fn: async () => { + const rwLock = new RwLock(0); + await Promise.all(Array.from({ length }).map(() => rwLock.rlock(() => {}))); + }, + group: "RwLock#rlock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const rwLock = new RwLock100(0); + await Promise.all(Array.from({ length }).map(() => rwLock.rlock(() => {}))); + }, + group: "RwLock#rlock", +}); diff --git a/semaphore_bench.ts b/semaphore_bench.ts new file mode 100644 index 0000000..0302a39 --- /dev/null +++ b/semaphore_bench.ts @@ -0,0 +1,27 @@ +import { Semaphore as Semaphore100 } from "jsr:@core/asyncutil@~1.0.0/semaphore"; +import { Semaphore } from "./semaphore.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const semaphore = new Semaphore(10); + await Promise.all( + Array.from({ length }).map(() => semaphore.lock(() => {})), + ); + }, + group: "Semaphore#lock", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const semaphore = new Semaphore100(10); + await Promise.all( + Array.from({ length }).map(() => semaphore.lock(() => {})), + ); + }, + group: "Semaphore#lock", +}); diff --git a/stack_bench.ts b/stack_bench.ts new file mode 100644 index 0000000..0277c91 --- /dev/null +++ b/stack_bench.ts @@ -0,0 +1,29 @@ +import { Stack as Stack100 } from "jsr:@core/asyncutil@~1.0.0/stack"; +import { Stack } from "./stack.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const stack = new Stack(); + Array + .from({ length }) + .forEach(() => stack.push(1)); + await Promise.all(Array.from({ length }).map(() => stack.pop())); + }, + group: "Stack#push/pop", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const stack = new Stack100(); + Array + .from({ length }) + .forEach(() => stack.push(1)); + await Promise.allSettled(Array.from({ length }).map(() => stack.pop())); + }, + group: "Stack#push/pop", +}); diff --git a/wait_group_bench.ts b/wait_group_bench.ts new file mode 100644 index 0000000..ff3724f --- /dev/null +++ b/wait_group_bench.ts @@ -0,0 +1,29 @@ +import { WaitGroup as WaitGroup100 } from "jsr:@core/asyncutil@~1.0.0/wait-group"; +import { WaitGroup } from "./wait_group.ts"; + +const length = 1_000; + +Deno.bench({ + name: "current", + fn: async () => { + const wg = new WaitGroup(); + const waiter = wg.wait(); + Array.from({ length }).forEach(() => wg.add(1)); + Array.from({ length }).forEach(() => wg.done()); + await waiter; + }, + group: "WaitGroup#wait", + baseline: true, +}); + +Deno.bench({ + name: "v1.0.0", + fn: async () => { + const wg = new WaitGroup100(); + const waiter = wg.wait(); + Array.from({ length }).forEach(() => wg.add(1)); + Array.from({ length }).forEach(() => wg.done()); + await waiter; + }, + group: "WaitGroup#wait", +}); From 320d2234438c953d75e3f1504cc480235791a2d0 Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 02:16:41 +0900 Subject: [PATCH 03/12] refactor: avoid using `using` due to performance concerns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It seems `using` in Node is not as performant as expected. Note that the performance in Deno seems not to be affected by this change. benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Lock#lock v1.0.0 52.84 ms/iter 18.9 (45.86 ms … 57.58 ms) 55.68 ms 57.58 ms 57.58 ms main 53.55 ms/iter 18.7 (47.32 ms … 73.59 ms) 55.52 ms 73.59 ms 73.59 ms summary v1.0.0 1.01x faster than main See https://github.com/jsr-core/asyncutil/pull/31 for detail --- lock.ts | 8 ++++++-- rw_lock.ts | 30 +++++++++++++++++++++++------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/lock.ts b/lock.ts index bf49c90..2b5d955 100644 --- a/lock.ts +++ b/lock.ts @@ -43,7 +43,11 @@ export class Lock { * @returns A Promise that resolves with the result of the function. */ async lock(fn: (value: T) => R | PromiseLike): Promise { - using _lock = await this.#mu.acquire(); - return await fn(this.#value); + const lock = await this.#mu.acquire(); + try { + return await fn(this.#value); + } finally { + lock[Symbol.dispose](); + } } } diff --git a/rw_lock.ts b/rw_lock.ts index 073c881..fc8eadb 100644 --- a/rw_lock.ts +++ b/rw_lock.ts @@ -50,9 +50,17 @@ export class RwLock { * @returns A promise that resolves to the return value of the specified function. */ async lock(fn: (value: T) => R | PromiseLike): Promise { - using _wlock = await this.#write.acquire(); - using _rlock = await this.#read.acquire(); - return await fn(this.#value); + const wlock = await this.#write.acquire(); + try { + const rlock = await this.#read.acquire(); + try { + return await fn(this.#value); + } finally { + rlock[Symbol.dispose](); + } + } finally { + wlock[Symbol.dispose](); + } } /** @@ -63,11 +71,19 @@ export class RwLock { * @returns A promise that resolves to the return value of the specified function. */ async rlock(fn: (value: T) => R | PromiseLike): Promise { - using _wlock = this.#write.locked + const wlock = this.#write.locked ? await this.#write.acquire() : { [Symbol.dispose]: () => {} }; - // Acquire the read lock without waiting to allow multiple readers to access the lock. - using _rlock = this.#read.acquire(); - return await fn(this.#value); + try { + // Acquire the read lock without waiting to allow multiple readers to access the lock. + const rlock = this.#read.acquire(); + try { + return await fn(this.#value); + } finally { + rlock[Symbol.dispose](); + } + } finally { + wlock[Symbol.dispose](); + } } } From c23756712f7a0d64841803efcdbcd632c9a458ce Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 03:30:40 +0900 Subject: [PATCH 04/12] feat(Semaphore): improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Semaphore#lock current 231.08 µs/iter 4,327.5 (207 µs … 565.42 µs) 214.17 µs 491.58 µs 503.88 µs v1.0.0 21.07 ms/iter 47.5 (20.46 ms … 23.17 ms) 21.06 ms 23.17 ms 23.17 ms summary current 91.18x faster than v1.0.0 --- semaphore.ts | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/semaphore.ts b/semaphore.ts index 3aba954..d5da8dc 100644 --- a/semaphore.ts +++ b/semaphore.ts @@ -1,5 +1,3 @@ -import { Notify } from "./notify.ts"; - /** * A semaphore that allows a limited number of concurrent executions of an operation. * @@ -16,8 +14,9 @@ import { Notify } from "./notify.ts"; * ``` */ export class Semaphore { - #notify = new Notify(); - #rest: number; + #resolves: (() => void)[] = []; + #value: number; + #size: number; /** * Creates a new semaphore with the specified limit. @@ -31,14 +30,15 @@ export class Semaphore { `size must be a positive safe integer, got ${size}`, ); } - this.#rest = size + 1; + this.#value = size; + this.#size = size; } /** * Returns true if the semaphore is currently locked. */ get locked(): boolean { - return this.#rest === 0; + return this.#value === 0; } /** @@ -56,21 +56,23 @@ export class Semaphore { } } - async #acquire(): Promise { - if (this.#rest > 0) { - this.#rest -= 1; - } - if (this.#rest === 0) { - await this.#notify.notified(); + #acquire(): Promise { + if (this.#value > 0) { + this.#value -= 1; + return Promise.resolve(); + } else { + const { promise, resolve } = Promise.withResolvers(); + this.#resolves.push(resolve); + return promise; } } #release(): void { - if (this.#notify.waiterCount > 0) { - this.#notify.notify(); - } - if (this.#notify.waiterCount === 0) { - this.#rest += 1; + const resolve = this.#resolves.shift(); + if (resolve) { + resolve(); + } else if (this.#value < this.#size) { + this.#value += 1; } } } From 995fa8c99fb73ed4da83ef7f02d5b33216334820 Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 04:26:59 +0900 Subject: [PATCH 05/12] refactor(Semaphore): add internal `RawSemaphore` --- _raw_semaphore.ts | 57 +++++++++++++++++++++++++++++++++++++++++++++++ semaphore.ts | 40 ++++++--------------------------- 2 files changed, 64 insertions(+), 33 deletions(-) create mode 100644 _raw_semaphore.ts diff --git a/_raw_semaphore.ts b/_raw_semaphore.ts new file mode 100644 index 0000000..5833638 --- /dev/null +++ b/_raw_semaphore.ts @@ -0,0 +1,57 @@ +/** + * @internal + */ +export class RawSemaphore { + #resolves: (() => void)[] = []; + #value: number; + #size: number; + + /** + * Creates a new semaphore with the specified limit. + * + * @param size The maximum number of times the semaphore can be acquired before blocking. + * @throws {RangeError} if the size is not a positive safe integer. + */ + constructor(size: number) { + if (size <= 0 || !Number.isSafeInteger(size)) { + throw new RangeError( + `size must be a positive safe integer, got ${size}`, + ); + } + this.#value = size; + this.#size = size; + } + + /** + * Returns true if the semaphore is currently locked. + */ + get locked(): boolean { + return this.#value === 0; + } + + /** + * Acquires the semaphore, blocking until the semaphore is available. + */ + acquire(): Promise { + if (this.#value > 0) { + this.#value -= 1; + return Promise.resolve(); + } else { + const { promise, resolve } = Promise.withResolvers(); + this.#resolves.push(resolve); + return promise; + } + } + + /** + * Releases the semaphore, allowing the next waiting operation to proceed. + */ + release(): void { + const resolve = this.#resolves.shift(); + if (resolve) { + resolve(); + } else if (this.#value < this.#size) { + this.#value += 1; + } + } +} diff --git a/semaphore.ts b/semaphore.ts index d5da8dc..80fd18a 100644 --- a/semaphore.ts +++ b/semaphore.ts @@ -1,3 +1,5 @@ +import { RawSemaphore } from "./_raw_semaphore.ts"; + /** * A semaphore that allows a limited number of concurrent executions of an operation. * @@ -14,9 +16,7 @@ * ``` */ export class Semaphore { - #resolves: (() => void)[] = []; - #value: number; - #size: number; + #sem: RawSemaphore; /** * Creates a new semaphore with the specified limit. @@ -25,20 +25,14 @@ export class Semaphore { * @throws {RangeError} if the size is not a positive safe integer. */ constructor(size: number) { - if (size <= 0 || !Number.isSafeInteger(size)) { - throw new RangeError( - `size must be a positive safe integer, got ${size}`, - ); - } - this.#value = size; - this.#size = size; + this.#sem = new RawSemaphore(size); } /** * Returns true if the semaphore is currently locked. */ get locked(): boolean { - return this.#value === 0; + return this.#sem.locked; } /** @@ -48,31 +42,11 @@ export class Semaphore { * @returns A promise that resolves to the return value of the specified function. */ async lock(fn: () => R | PromiseLike): Promise { - await this.#acquire(); + await this.#sem.acquire(); try { return await fn(); } finally { - this.#release(); - } - } - - #acquire(): Promise { - if (this.#value > 0) { - this.#value -= 1; - return Promise.resolve(); - } else { - const { promise, resolve } = Promise.withResolvers(); - this.#resolves.push(resolve); - return promise; - } - } - - #release(): void { - const resolve = this.#resolves.shift(); - if (resolve) { - resolve(); - } else if (this.#value < this.#size) { - this.#value += 1; + this.#sem.release(); } } } From 6b093da5f96bd04691dcc10cdf0f2aa3f93c780b Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 04:29:46 +0900 Subject: [PATCH 06/12] feat(Mutex): use `RawSemaphore` to improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Mutex#wait current 395.68 µs/iter 2,527.3 (361.25 µs … 1.31 ms) 374.62 µs 716.46 µs 733.62 µs v1.0.0 52.76 ms/iter 19.0 (46.67 ms … 56.59 ms) 55.51 ms 56.59 ms 56.59 ms summary current 133.33x faster than v1.0.0 --- mutex.ts | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/mutex.ts b/mutex.ts index 92c8133..e704d69 100644 --- a/mutex.ts +++ b/mutex.ts @@ -1,3 +1,5 @@ +import { RawSemaphore } from "./_raw_semaphore.ts"; + /** * A mutex (mutual exclusion) is a synchronization primitive that grants * exclusive access to a shared resource. @@ -26,13 +28,13 @@ * ``` */ export class Mutex { - #waiters: Set> = new Set(); + #sem: RawSemaphore = new RawSemaphore(1); /** * Returns true if the mutex is locked, false otherwise. */ get locked(): boolean { - return this.#waiters.size > 0; + return this.#sem.locked; } /** @@ -40,19 +42,11 @@ export class Mutex { * * @returns A Promise with Disposable that releases the mutex when disposed. */ - acquire(): Promise & Disposable { - const waiters = [...this.#waiters]; - const { promise, resolve } = Promise.withResolvers(); - this.#waiters.add(promise); - const disposable = { + acquire(): Promise { + return this.#sem.acquire().then(() => ({ [Symbol.dispose]: () => { - resolve(); - this.#waiters.delete(promise); + this.#sem.release(); }, - }; - return Object.assign( - Promise.all(waiters).then(() => disposable), - disposable, - ); + })); } } From 9f3b7eb53679db47e02c20e7b6406fd4f825c7fc Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 04:32:32 +0900 Subject: [PATCH 07/12] feat(Lock): use `RawSemaphore` to improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Lock#lock current 231.74 µs/iter 4,315.2 (206.46 µs … 700.42 µs) 215.5 µs 488.58 µs 505.96 µs v1.0.0 52.28 ms/iter 19.1 (47.25 ms … 55.24 ms) 54.78 ms 55.24 ms 55.24 ms summary current 225.59x faster than v1.0.0 --- lock.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lock.ts b/lock.ts index 2b5d955..7afa1ee 100644 --- a/lock.ts +++ b/lock.ts @@ -1,4 +1,4 @@ -import { Mutex } from "./mutex.ts"; +import { RawSemaphore } from "./_raw_semaphore.ts"; /** * A mutual exclusion lock that provides safe concurrent access to a shared value. @@ -16,7 +16,7 @@ import { Mutex } from "./mutex.ts"; * ``` */ export class Lock { - #mu = new Mutex(); + #sem = new RawSemaphore(1); #value: T; /** @@ -32,7 +32,7 @@ export class Lock { * Returns true if the lock is currently locked, false otherwise. */ get locked(): boolean { - return this.#mu.locked; + return this.#sem.locked; } /** @@ -43,11 +43,11 @@ export class Lock { * @returns A Promise that resolves with the result of the function. */ async lock(fn: (value: T) => R | PromiseLike): Promise { - const lock = await this.#mu.acquire(); + await this.#sem.acquire(); try { return await fn(this.#value); } finally { - lock[Symbol.dispose](); + this.#sem.release(); } } } From badaec55720497b0caa5d13f9fde03ef80ffa62f Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 04:33:32 +0900 Subject: [PATCH 08/12] feat(RwLock): use `RawSemaphore` to improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group RwLock#lock current 270.64 µs/iter 3,695.0 (247.21 µs … 681.58 µs) 257.83 µs 508.83 µs 532.21 µs v1.0.0 52.87 ms/iter 18.9 (47.98 ms … 57.47 ms) 55.87 ms 57.47 ms 57.47 ms summary current 195.35x faster than v1.0.0 group RwLock#rlock current 201.44 µs/iter 4,964.4 (182.46 µs … 1.18 ms) 189.58 µs 445.92 µs 474.75 µs v1.0.0 57.84 ms/iter 17.3 (52.11 ms … 78.27 ms) 59.24 ms 78.27 ms 78.27 ms summary current 287.13x faster than v1.0.0 --- rw_lock.ts | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/rw_lock.ts b/rw_lock.ts index fc8eadb..7326f3d 100644 --- a/rw_lock.ts +++ b/rw_lock.ts @@ -1,4 +1,4 @@ -import { Mutex } from "./mutex.ts"; +import { RawSemaphore } from "./_raw_semaphore.ts"; /** * A reader-writer lock implementation that allows multiple concurrent reads but only one write at a time. @@ -29,8 +29,8 @@ import { Mutex } from "./mutex.ts"; * ``` */ export class RwLock { - #read = new Mutex(); - #write = new Mutex(); + #read = new RawSemaphore(1); + #write = new RawSemaphore(1); #value: T; /** @@ -50,16 +50,16 @@ export class RwLock { * @returns A promise that resolves to the return value of the specified function. */ async lock(fn: (value: T) => R | PromiseLike): Promise { - const wlock = await this.#write.acquire(); + await this.#write.acquire(); try { - const rlock = await this.#read.acquire(); + await this.#read.acquire(); try { return await fn(this.#value); } finally { - rlock[Symbol.dispose](); + this.#read.release(); } } finally { - wlock[Symbol.dispose](); + this.#write.release(); } } @@ -71,19 +71,19 @@ export class RwLock { * @returns A promise that resolves to the return value of the specified function. */ async rlock(fn: (value: T) => R | PromiseLike): Promise { - const wlock = this.#write.locked - ? await this.#write.acquire() - : { [Symbol.dispose]: () => {} }; + if (this.#write.locked) { + await this.#write.acquire(); + } try { // Acquire the read lock without waiting to allow multiple readers to access the lock. - const rlock = this.#read.acquire(); + this.#read.acquire(); try { return await fn(this.#value); } finally { - rlock[Symbol.dispose](); + this.#read.release(); } } finally { - wlock[Symbol.dispose](); + this.#write.release(); } } } From 6ee7534af42e35ebd2ac77cc19cef9a4d381f4fe Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 04:53:09 +0900 Subject: [PATCH 09/12] feat(Barrier): reimplement to improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Barrier#wait current 61.42 µs/iter 16,280.8 (56.54 µs … 270.25 µs) 61.75 µs 129.17 µs 137.71 µs v1.0.0 251.46 µs/iter 3,976.7 (206.38 µs … 577.12 µs) 232.12 µs 518.33 µs 528.71 µs summary current 4.09x faster than v1.0.0 --- barrier.ts | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/barrier.ts b/barrier.ts index 8ffbd53..8f4f3b2 100644 --- a/barrier.ts +++ b/barrier.ts @@ -1,5 +1,3 @@ -import { Notify } from "./notify.ts"; - /** * A synchronization primitive that allows multiple tasks to wait until all of * them have reached a certain point of execution before continuing. @@ -26,8 +24,8 @@ import { Notify } from "./notify.ts"; * ``` */ export class Barrier { - #notify = new Notify(); - #rest: number; + #waiter: PromiseWithResolvers = Promise.withResolvers(); + #value: number; /** * Creates a new `Barrier` that blocks until `size` threads have called `wait`. @@ -41,23 +39,24 @@ export class Barrier { `size must be a positive safe integer, got ${size}`, ); } - this.#rest = size; + this.#value = size; } /** * Wait for all threads to reach the barrier. * Blocks until all threads reach the barrier. */ - async wait({ signal }: { signal?: AbortSignal } = {}): Promise { - signal?.throwIfAborted(); - this.#rest -= 1; - if (this.#rest === 0) { - await Promise.all([ - this.#notify.notified({ signal }), - this.#notify.notifyAll(), - ]); - } else { - await this.#notify.notified({ signal }); + wait({ signal }: { signal?: AbortSignal } = {}): Promise { + if (signal?.aborted) { + return Promise.reject(signal.reason); + } + const { promise, resolve, reject } = this.#waiter; + const abort = () => reject(signal!.reason); + signal?.addEventListener("abort", abort, { once: true }); + this.#value -= 1; + if (this.#value === 0) { + resolve(); } + return promise.finally(() => signal?.removeEventListener("abort", abort)); } } From 719394178bd9cd3acde5296a3a0c8c924e671d84 Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 05:05:04 +0900 Subject: [PATCH 10/12] feat(Notify): reimplement to improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Notify#notifyAll current 293.47 µs/iter 3,407.5 (251.46 µs … 676.88 µs) 266.04 µs 592.96 µs 608.17 µs v1.0.0 223.93 µs/iter 4,465.6 (200.75 µs … 452.79 µs) 215.67 µs 356.75 µs 370.08 µs summary current 1.31x slower than v1.0.0 group Notify#notify current 395.51 µs/iter 2,528.4 (344.42 µs … 953.71 µs) 365.83 µs 700.67 µs 717.75 µs v1.0.0 21.54 ms/iter 46.4 (20.82 ms … 23.01 ms) 21.89 ms 23.01 ms 23.01 ms summary current 54.46x faster than v1.0.0 --- notify.ts | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/notify.ts b/notify.ts index 355c633..7732bca 100644 --- a/notify.ts +++ b/notify.ts @@ -1,6 +1,3 @@ -import { iter } from "@core/iterutil/iter"; -import { take } from "@core/iterutil/take"; - /** * Async notifier that allows one or more "waiters" to wait for a notification. * @@ -23,13 +20,13 @@ import { take } from "@core/iterutil/take"; * ``` */ export class Notify { - #waiters: Set> = new Set(); + #waiters: PromiseWithResolvers[] = []; /** * Returns the number of waiters that are waiting for notification. */ get waiterCount(): number { - return this.#waiters.size; + return this.#waiters.length; } /** @@ -43,21 +40,15 @@ export class Notify { if (n <= 0 || !Number.isSafeInteger(n)) { throw new RangeError(`n must be a positive safe integer, got ${n}`); } - const it = iter(this.#waiters); - for (const waiter of take(it, n)) { - waiter.resolve(); - } - this.#waiters = new Set(it); + this.#waiters.splice(0, n).forEach(({ resolve }) => resolve()); } /** * Notifies all waiters that are waiting for notification. Resolves each of the notified waiters. */ notifyAll(): void { - for (const waiter of this.#waiters) { - waiter.resolve(); - } - this.#waiters = new Set(); + this.#waiters.forEach(({ resolve }) => resolve()); + this.#waiters = []; } /** @@ -65,18 +56,21 @@ export class Notify { * the `notify` method is called. The method returns a Promise that resolves when the caller is notified. * Optionally takes an AbortSignal to abort the waiting if the signal is aborted. */ - async notified({ signal }: { signal?: AbortSignal } = {}): Promise { + notified({ signal }: { signal?: AbortSignal } = {}): Promise { if (signal?.aborted) { - throw signal.reason; + return Promise.reject(signal.reason); } - const waiter = Promise.withResolvers(); const abort = () => { - this.#waiters.delete(waiter); - waiter.reject(signal!.reason); + const waiter = this.#waiters.shift(); + if (waiter) { + waiter.reject(signal!.reason); + } }; signal?.addEventListener("abort", abort, { once: true }); - this.#waiters.add(waiter); - await waiter.promise; - signal?.removeEventListener("abort", abort); + const waiter = Promise.withResolvers(); + this.#waiters.push(waiter); + return waiter.promise.finally(() => { + signal?.removeEventListener("abort", abort); + }); } } From 7d46d4ae0a80a6884738b6b15f87fb8477f4f83c Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 05:22:36 +0900 Subject: [PATCH 11/12] feat(Queue): reimplement to improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Queue#push/pop current 154.58 µs/iter 6,469.2 (146 µs … 425.71 µs) 151.83 µs 272.96 µs 297.25 µs v1.0.0 1.1 ms/iter 909.4 (917.17 µs … 5.96 ms) 993.5 µs 3.44 ms 3.58 ms summary current 7.11x faster than v1.0.0 --- queue.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/queue.ts b/queue.ts index 59138cb..ab40158 100644 --- a/queue.ts +++ b/queue.ts @@ -1,5 +1,3 @@ -import { Notify } from "./notify.ts"; - /** * A queue implementation that allows for adding and removing elements, with optional waiting when * popping elements from an empty queue. @@ -18,7 +16,7 @@ import { Notify } from "./notify.ts"; * ``` */ export class Queue | null> { - #notify = new Notify(); + #resolves: (() => void)[] = []; #items: T[] = []; /** @@ -32,7 +30,7 @@ export class Queue | null> { * Returns true if the queue is currently locked. */ get locked(): boolean { - return this.#notify.waiterCount > 0; + return this.#resolves.length > 0; } /** @@ -40,7 +38,7 @@ export class Queue | null> { */ push(value: T): void { this.#items.push(value); - this.#notify.notify(); + this.#resolves.shift()?.(); } /** @@ -55,7 +53,12 @@ export class Queue | null> { if (value !== undefined) { return value; } - await this.#notify.notified({ signal }); + const { promise, resolve, reject } = Promise.withResolvers(); + signal?.addEventListener("abort", () => reject(signal.reason), { + once: true, + }); + this.#resolves.push(resolve); + await promise; } } } From 2ce4940ae01bf1b62d96aee163c1b5469b436146 Mon Sep 17 00:00:00 2001 From: Alisue Date: Sat, 17 Aug 2024 05:23:37 +0900 Subject: [PATCH 12/12] feat(Stack): reimplement to improve performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchmark time (avg) iter/s (min … max) p75 p99 p995 --------------------------------------------------------------- ----------------------------- group Stack#push/pop current 112.37 µs/iter 8,898.9 (100.12 µs … 348.71 µs) 111.25 µs 217.75 µs 225.96 µs v1.0.0 1.07 ms/iter 938.4 (882.54 µs … 3.94 ms) 969.21 µs 3.35 ms 3.55 ms summary current 9.48x faster than v1.0.0 --- stack.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/stack.ts b/stack.ts index 01bb33d..a5876c3 100644 --- a/stack.ts +++ b/stack.ts @@ -1,5 +1,3 @@ -import { Notify } from "./notify.ts"; - /** * A stack implementation that allows for adding and removing elements, with optional waiting when * popping elements from an empty stack. @@ -20,7 +18,7 @@ import { Notify } from "./notify.ts"; * @template T The type of items in the stack. */ export class Stack | null> { - #notify = new Notify(); + #resolves: (() => void)[] = []; #items: T[] = []; /** @@ -34,7 +32,7 @@ export class Stack | null> { * Returns true if the stack is currently locked. */ get locked(): boolean { - return this.#notify.waiterCount > 0; + return this.#resolves.length > 0; } /** @@ -44,7 +42,7 @@ export class Stack | null> { */ push(value: T): void { this.#items.push(value); - this.#notify.notify(); + this.#resolves.shift()?.(); } /** @@ -59,7 +57,12 @@ export class Stack | null> { if (value !== undefined) { return value; } - await this.#notify.notified({ signal }); + const { promise, resolve, reject } = Promise.withResolvers(); + signal?.addEventListener("abort", () => reject(signal.reason), { + once: true, + }); + this.#resolves.push(resolve); + await promise; } } }