Skip to content

Commit

Permalink
Add implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
philipahlberg committed Aug 4, 2020
1 parent dd99f4c commit 6306a50
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 5 deletions.
77 changes: 75 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,76 @@
export function add(a: number, b: number): number {
return a + b;
/**
* Create an AsyncIterable that yields the values
* of the given Promises in the order they resolve.
*
* @param promises An iterable of Promises.
*/
export const createRaceIterable = <T>(promises: Iterable<Promise<T>>): AsyncIterable<T> => {
return new RaceIterable(promises);
};

/**
* A simple wrapper to go from AsyncIterable to AsyncIterator.
*/
class RaceIterable<T> implements AsyncIterable<T> {
private iterable: Iterable<Promise<T>>;

constructor(iterable: Iterable<Promise<T>>) {
this.iterable = iterable;
}

[Symbol.asyncIterator](): AsyncIterator<T> {
return new RaceIterator(this.iterable);
}
}

class RaceIterator<T> implements AsyncIterator<T> {
private promises: Map<number, Promise<[number, T]>>;

constructor(iterable: Iterable<Promise<T>>) {
// Create a Map from key to Promise
const map = new Map<number, Promise<[number, T]>>();
// Assign a unique `key` to every Promise
for (const [key, promise] of enumerate(iterable)) {
// Create a wrapped Promise that resolves to [key, value]
const keyedPromise = createKeyedPromise(key, promise);
// Insert the wrapped Promise by its key
map.set(key, keyedPromise);
}
this.promises = map;
}

async next(): Promise<IteratorResult<T>> {
// If the pool is empty, so is the iterator.
if (this.promises.size === 0) {
return {
done: true,
value: undefined,
};
}

// Wait for the first Promise to resolve.
// Note: `Promise.race` creates a *new* Promise; it is not
// actually the Promise that resolved first.
const [key, value] = await Promise.race(this.promises.values());
// Remove the resolved Promise from the pool.
this.promises.delete(key);
// Resolve with the resolved value of the original Promise.
return {
done: false,
value,
};
}
}

function* enumerate<T>(iterable: Iterable<T>): Iterable<[number, T]> {
let i = 0;
for (const value of iterable) {
yield [i, value];
i = i + 1;
}
}

const createKeyedPromise = async <T>(key: number, promise: Promise<T>): Promise<[number, T]> => {
const value = await promise;
return [key, value];
};
32 changes: 29 additions & 3 deletions tests/index.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
import { assertEqual } from '@windtunnel/assert';
import { add } from '../dist/index.mjs';
import { createRaceIterable } from '../dist/index.mjs';

export function testAdd() {
assertEqual(add(1, 2), 3, '1 + 2 = 3');
const timeout = (ms) => new Promise((resolve) => {
setTimeout(resolve, ms);
});

const collect = async (iterable) => {
const values = [];
for await (const value of iterable) {
values.push(value);
}
return values;
};

export async function testRaceIterable() {
const d = timeout(400).then(() => 'd');
const c = timeout(300).then(() => 'c');
const b = timeout(200).then(() => 'b');
const a = timeout(100).then(() => 'a');

const iterable = createRaceIterable([
c,
a,
d,
b,
]);

const values = await collect(iterable);

assertEqual(values, ['a', 'b', 'c', 'd'], 'should resolve in order from fastest to slowest');
}

0 comments on commit 6306a50

Please sign in to comment.