Skip to content

Commit

Permalink
Merge pull request #29 from jsr-core/refactor
Browse files Browse the repository at this point in the history
feat: throw `RangeError` on invalid arguments and refactor internal impl of `Notify`
  • Loading branch information
lambdalisue authored Aug 13, 2024
2 parents b000238 + f3db52a commit 6eaad95
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 29 deletions.
8 changes: 5 additions & 3 deletions barrier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ export class Barrier {
* Creates a new `Barrier` that blocks until `size` threads have called `wait`.
*
* @param size The number of threads that must reach the barrier before it unblocks.
* @throws Error if the size is negative.
* @throws {RangeError} if the size is not a positive safe integer.
*/
constructor(size: number) {
if (size < 0) {
throw new Error("The size must be greater than 0");
if (size <= 0 || !Number.isSafeInteger(size)) {
throw new RangeError(
`size must be a positive safe integer, got ${size}`,
);
}
this.#rest = size;
}
Expand Down
14 changes: 13 additions & 1 deletion barrier_test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals, assertRejects } from "@std/assert";
import { assertEquals, assertRejects, assertThrows } from "@std/assert";
import { deadline, delay } from "@std/async";
import { Barrier } from "./barrier.ts";

Expand Down Expand Up @@ -79,4 +79,16 @@ Deno.test("Barrier", async (t) => {
);
},
);

await t.step(
"throws RangeError if size is not a positive safe integer",
() => {
assertThrows(() => new Barrier(NaN), RangeError);
assertThrows(() => new Barrier(Infinity), RangeError);
assertThrows(() => new Barrier(-Infinity), RangeError);
assertThrows(() => new Barrier(-1), RangeError);
assertThrows(() => new Barrier(1.1), RangeError);
assertThrows(() => new Barrier(0), RangeError);
},
);
});
1 change: 1 addition & 0 deletions deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@core/asyncutil/semaphore": "./semaphore.ts",
"@core/asyncutil/stack": "./stack.ts",
"@core/asyncutil/wait-group": "./wait_group.ts",
"@core/iterutil": "jsr:@core/iterutil@^0.6.0",
"@std/assert": "jsr:@std/assert@^1.0.2",
"@std/async": "jsr:@std/async@^1.0.2"
},
Expand Down
5 changes: 5 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 17 additions & 18 deletions notify.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { iter } from "@core/iterutil/iter";
import { take } from "@core/iterutil/take";

/**
* Async notifier that allows one or more "waiters" to wait for a notification.
*
Expand All @@ -20,30 +23,31 @@
* ```
*/
export class Notify {
#waiters: {
promise: Promise<void>;
resolve: () => void;
reject: (reason?: unknown) => void;
}[] = [];
#waiters: Set<PromiseWithResolvers<void>> = new Set();

/**
* Returns the number of waiters that are waiting for notification.
*/
get waiterCount(): number {
return this.#waiters.length;
return this.#waiters.size;
}

/**
* Notifies `n` waiters that are waiting for notification. Resolves each of the notified waiters.
* If there are fewer than `n` waiters, all waiters are notified.
*
* @param n The number of waiters to notify.
* @throws {RangeError} if `n` is not a positive safe integer.
*/
notify(n = 1): void {
const head = this.#waiters.slice(0, n);
const tail = this.#waiters.slice(n);
for (const waiter of head) {
if (n <= 0 || !Number.isSafeInteger(n)) {
throw new RangeError(`n must be a positive safe integer, got ${n}`);
}
const it = iter(this.#waiters);
for (const waiter of take(it, n)) {
waiter.resolve();
}
this.#waiters = tail;
this.#waiters = new Set(it);
}

/**
Expand All @@ -53,7 +57,7 @@ export class Notify {
for (const waiter of this.#waiters) {
waiter.resolve();
}
this.#waiters = [];
this.#waiters = new Set();
}

/**
Expand All @@ -67,17 +71,12 @@ export class Notify {
}
const waiter = Promise.withResolvers<void>();
const abort = () => {
removeItem(this.#waiters, waiter);
this.#waiters.delete(waiter);
waiter.reject(signal!.reason);
};
signal?.addEventListener("abort", abort, { once: true });
this.#waiters.push(waiter);
this.#waiters.add(waiter);
await waiter.promise;
signal?.removeEventListener("abort", abort);
}
}

function removeItem<T>(array: T[], item: T): void {
const index = array.indexOf(item);
array.splice(index, 1);
}
55 changes: 54 additions & 1 deletion notify_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { delay } from "@std/async/delay";
import { assertEquals, assertRejects } from "@std/assert";
import { assertEquals, assertRejects, assertThrows } from "@std/assert";
import { promiseState } from "./promise_state.ts";
import { Notify } from "./notify.ts";

Expand All @@ -8,21 +8,61 @@ Deno.test("Notify", async (t) => {
const notify = new Notify();
const waiter1 = notify.notified();
const waiter2 = notify.notified();
assertEquals(notify.waiterCount, 2);

notify.notify();
assertEquals(notify.waiterCount, 1);
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "pending");

notify.notify();
assertEquals(notify.waiterCount, 0);
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "fulfilled");
});

await t.step("'notify' wakes up a multiple waiters", async () => {
const notify = new Notify();
const waiter1 = notify.notified();
const waiter2 = notify.notified();
const waiter3 = notify.notified();
const waiter4 = notify.notified();
const waiter5 = notify.notified();
assertEquals(notify.waiterCount, 5);

notify.notify(2);
assertEquals(notify.waiterCount, 3);
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "fulfilled");
assertEquals(await promiseState(waiter3), "pending");
assertEquals(await promiseState(waiter4), "pending");
assertEquals(await promiseState(waiter5), "pending");

notify.notify(2);
assertEquals(notify.waiterCount, 1);
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "fulfilled");
assertEquals(await promiseState(waiter3), "fulfilled");
assertEquals(await promiseState(waiter4), "fulfilled");
assertEquals(await promiseState(waiter5), "pending");

notify.notify(2);
assertEquals(notify.waiterCount, 0);
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "fulfilled");
assertEquals(await promiseState(waiter3), "fulfilled");
assertEquals(await promiseState(waiter4), "fulfilled");
assertEquals(await promiseState(waiter5), "fulfilled");
});

await t.step("'notifyAll' wakes up all waiters", async () => {
const notify = new Notify();
const waiter1 = notify.notified();
const waiter2 = notify.notified();
assertEquals(notify.waiterCount, 2);

notify.notifyAll();
assertEquals(notify.waiterCount, 0);
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "fulfilled");
});
Expand Down Expand Up @@ -69,4 +109,17 @@ Deno.test("Notify", async (t) => {
);
},
);

await t.step(
"'notify' throws RangeError if size is not a positive safe integer",
() => {
const notify = new Notify();
assertThrows(() => notify.notify(NaN), RangeError);
assertThrows(() => notify.notify(Infinity), RangeError);
assertThrows(() => notify.notify(-Infinity), RangeError);
assertThrows(() => notify.notify(-1), RangeError);
assertThrows(() => notify.notify(1.1), RangeError);
assertThrows(() => notify.notify(0), RangeError);
},
);
});
8 changes: 5 additions & 3 deletions semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ export class Semaphore {
* Creates a new semaphore with the specified limit.
*
* @param size The maximum number of times the semaphore can be acquired before blocking.
* @throws Error if the size is less than 1.
* @throws {RangeError} if the size is not a positive safe integer.
*/
constructor(size: number) {
if (size < 0) {
throw new Error("The size must be greater than 0");
if (size <= 0 || !Number.isSafeInteger(size)) {
throw new RangeError(
`size must be a positive safe integer, got ${size}`,
);
}
this.#rest = size + 1;
}
Expand Down
78 changes: 76 additions & 2 deletions semaphore_test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { assertEquals } from "@std/assert";
import { assertEquals, assertThrows } from "@std/assert";
import { Semaphore } from "./semaphore.ts";

Deno.test("Semaphore", async (t) => {
await t.step(
"regulates the number of workers concurrently running",
"regulates the number of workers concurrently running (n=5)",
async () => {
let nworkers = 0;
const results: number[] = [];
Expand Down Expand Up @@ -32,4 +32,78 @@ Deno.test("Semaphore", async (t) => {
]);
},
);

await t.step(
"regulates the number of workers concurrently running (n=1)",
async () => {
let nworkers = 0;
const results: number[] = [];
const sem = new Semaphore(1);
const worker = () => {
return sem.lock(async () => {
nworkers++;
results.push(nworkers);
await new Promise((resolve) => setTimeout(resolve, 10));
nworkers--;
});
};
await Promise.all([...Array(10)].map(() => worker()));
assertEquals(nworkers, 0);
assertEquals(results, [
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
]);
},
);

await t.step(
"regulates the number of workers concurrently running (n=10)",
async () => {
let nworkers = 0;
const results: number[] = [];
const sem = new Semaphore(10);
const worker = () => {
return sem.lock(async () => {
nworkers++;
results.push(nworkers);
await new Promise((resolve) => setTimeout(resolve, 10));
nworkers--;
});
};
await Promise.all([...Array(10)].map(() => worker()));
assertEquals(nworkers, 0);
assertEquals(results, [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
]);
},
);

await t.step(
"throws RangeError if size is not a positive safe integer",
() => {
assertThrows(() => new Semaphore(NaN), RangeError);
assertThrows(() => new Semaphore(Infinity), RangeError);
assertThrows(() => new Semaphore(-Infinity), RangeError);
assertThrows(() => new Semaphore(-1), RangeError);
assertThrows(() => new Semaphore(1.1), RangeError);
assertThrows(() => new Semaphore(0), RangeError);
},
);
});
3 changes: 3 additions & 0 deletions wait_group.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export class WaitGroup {
* @param delta The number to add to the counter. It can be positive or negative.
*/
add(delta: number): void {
if (!Number.isSafeInteger(delta)) {
throw new RangeError(`delta must be a safe integer, got ${delta}`);
}
this.#count += delta;
if (this.#count === 0) {
this.#notify.notifyAll();
Expand Down
13 changes: 12 additions & 1 deletion wait_group_test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals, assertRejects } from "@std/assert";
import { assertEquals, assertRejects, assertThrows } from "@std/assert";
import { deadline, delay } from "@std/async";
import { WaitGroup } from "./wait_group.ts";

Expand Down Expand Up @@ -83,4 +83,15 @@ Deno.test("WaitGroup", async (t) => {
);
},
);

await t.step(
"'add' throws RangeError if delta is not a safe integer",
() => {
const wg = new WaitGroup();
assertThrows(() => wg.add(NaN), RangeError);
assertThrows(() => wg.add(Infinity), RangeError);
assertThrows(() => wg.add(-Infinity), RangeError);
assertThrows(() => wg.add(1.1), RangeError);
},
);
});

0 comments on commit 6eaad95

Please sign in to comment.