diff --git a/.gitignore b/.gitignore index 660471e..d55a721 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -*.swp -.DS_Store node_modules -examples/node_modules +dist +types \ No newline at end of file diff --git a/.jshintrc b/.jshintrc deleted file mode 100644 index 205790a..0000000 --- a/.jshintrc +++ /dev/null @@ -1,24 +0,0 @@ -{ - "curly": true, - "eqeqeq": true, - "forin": true, - "freeze": true, - "indent": 2, - "latedef": "nofunc", - "noarg": true, - "noempty": true, - "quotmark": "single", - "undef": true, - "strict": false, - "trailing": true, - "maxparams": 5, - "maxdepth": 5, - "maxstatements": 15, - "maxcomplexity": 7, - "maxlen": 80, - "asi": true, - "boss": true, - "eqnull": true, - "browser": true, - "node": true -} diff --git a/.travis.yml b/.travis.yml index 1932865..3f308e8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,5 @@ language: node_js node_js: - - "0.11" + - "4" + - "5" + - "6" diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..3c9e73a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,28 @@ +{ + // Use IntelliSense to learn about possible Node.js debug attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Tests", + "program": "${workspaceRoot}/node_modules/.bin/_mocha", + "args": [ + "--require", + "source-map-support/register", + "--no-timeouts", + "--colors", + "${workspaceRoot}/dist/**/*.test.js" + ], + "preLaunchTask": "build", + "cwd": "${workspaceRoot}", + "sourceMaps": true, + "outFiles": [ + "${workspaceRoot}/dist/**/*.js" + ], + "internalConsoleOptions": "openOnSessionStart" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..11c5516 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +// Place your settings in this file to overwrite default and user settings. +{ + "typescript.tsdk": "./node_modules/typescript/lib", + "editor.tabSize": 2, + "vsicons.presets.angular": false + } \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..c5c00ea --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,27 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "0.1.0", + "command": "npm", + "isShellCommand": true, + "showOutput": "always", + "suppressTaskName": true, + "tasks": [ + { + "taskName": "install", + "args": ["install"] + }, + { + "taskName": "update", + "args": ["update"] + }, + { + "taskName": "test", + "args": ["run", "test"] + }, + { + "taskName": "build", + "args": ["run", "build"] + } + ] +} \ No newline at end of file diff --git a/index.js b/index.js deleted file mode 100644 index 9a26a95..0000000 --- a/index.js +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./lib/make') -var select = require('./lib/select') -var timeout = require('./lib/timeout') -var interval = require('./lib/interval') - -/** - * Expose `make`. - */ -module.exports = make - -/** - * Expose `select`. - */ -module.exports.select = select - -/** - * Expose `interval`. - */ -module.exports.interval = interval - -/** - * Expose `timeout`. - */ -module.exports.timeout = timeout diff --git a/lib/async.js b/lib/async.js deleted file mode 100644 index b64e4e5..0000000 --- a/lib/async.js +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Module dependencies. - */ -var Receiver = require('./receiver') - -/** - * Expose `async`. - */ -module.exports = async - -/** - * Add value to channel via node-style async function. - * - * @param {Function} channel - * @param {Function|Object} fn async function or object with async method - * @param {String} method name only if fn is an object - * @param {mixed} args async function arguments without callback - * @return {Function} thunk - */ -function async(ch, fn/*, args...*/) { - var args = [].slice.call(arguments, 2) - var receiver = new Receiver() - var context = null - - if (typeof fn === 'object') { - context = fn - fn = fn[args.shift()] - } - - args.push(function (err, val) { - if (arguments.length > 2) { - val = [].slice.call(arguments, 1) - } - ch(err, val)(function (err) { - receiver[err ? 'error' : 'add'](err) - }) - }) - - fn.apply(context, args) - - return function (cb) { - receiver.callback(cb) - } -} diff --git a/lib/channel.js b/lib/channel.js deleted file mode 100644 index 4998690..0000000 --- a/lib/channel.js +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Module dependencies. - */ -var Receiver = require('./receiver') - -/** - * Expose `Channel`. - */ -module.exports = Channel - -/** - * Constants. - */ -var CLOSED_ERROR_MSG = 'Cannot add to closed channel' - -/** - * Initialize a `Channel`. - * - * @param {Function|Object} [empty=Object] - * @api private - */ -function Channel(bufferSize) { - this.pendingAdds = [] - this.pendingGets = [] - this.items = [] - this.bufferSize = parseInt(bufferSize, 10) || 0 - this.isClosed = false - this.isDone = false - this.empty = {} -} - -/** - * Static reference to the most recently called callback - */ -Channel.lastCalled = null - -/** - * Get an item with `cb`. - * - * @param {Function} cb - * @api private - */ -Channel.prototype.get = function (cb){ - if (this.done()) { - this.callEmpty(cb) - } else if (this.items.length > 0 || this.pendingAdds.length > 0) { - this.call(cb, this.nextItem()) - } else { - this.pendingGets.push(cb) - } -} - -/** - * Remove `cb` from the queue. - * - * @param {Function} cb - * @api private - */ -Channel.prototype.removeGet = function (cb) { - var idx = this.pendingGets.indexOf(cb) - if (idx > -1) { - this.pendingGets.splice(idx, 1) - } -} - -/** - * Get the next item and pull from pendingAdds to fill the buffer. - * - * @return {Mixed} - * @api private - */ -Channel.prototype.nextItem = function () { - if (this.pendingAdds.length > 0) { - this.items.push(this.pendingAdds.shift().add()) - } - return this.items.shift() -} - -/** - * Add `val` to the channel. - * - * @param {Mixed} val - * @return {Function} thunk - * @api private - */ -Channel.prototype.add = function (val){ - var receiver = new Receiver(val) - - if (this.isClosed) { - receiver.error(Error(CLOSED_ERROR_MSG)) - } else if (this.pendingGets.length > 0) { - this.call(this.pendingGets.shift(), receiver.add()) - } else if (this.items.length < this.bufferSize) { - this.items.push(receiver.add()) - } else { - this.pendingAdds.push(receiver) - } - - return function (cb) { - receiver.callback(cb) - } -} - -/** - * Invoke `cb` with `val` facilitate both - * `chan(value)` and the `chan(error, value)` - * use-cases. - * - * @param {Function} cb - * @param {Mixed} val - * @api private - */ -Channel.prototype.call = function (cb, val) { - Channel.lastCalled = this.func - if (val instanceof Error) { - cb(val) - } else { - cb(null, val) - } - this.done() -} - -/** - * Invoke `cb` callback with the empty value. - * - * @param {Function} cb - * @api private - */ -Channel.prototype.callEmpty = function (cb) { - this.call(cb, this.empty) -} - -/** - * Prevennt future values from being added to - * the channel. - * - * @return {Boolean} - * @api public - */ -Channel.prototype.close = function () { - this.isClosed = true - var receiver - while (receiver = this.pendingAdds.shift()) { - receiver.error(Error(CLOSED_ERROR_MSG)) - } - return this.done() -} - -/** - * Check to see if the channel is done and - * call pending callbacks if necessary. - * - * @return {Boolean} - * @api private - */ -Channel.prototype.done = function () { - if (!this.isDone && this.isClosed && this.items.length === 0) { - this.isDone = true - // call each pending callback with the empty value - this.pendingGets.forEach(function (cb) { this.callEmpty(cb) }, this) - } - return this.isDone -} diff --git a/lib/interval.js b/lib/interval.js deleted file mode 100644 index 828997a..0000000 --- a/lib/interval.js +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./make') - -/** - * Expose `intervalChan`. - */ -module.exports = intervalChan - -/** - * Make a interval channel that receives a count every number of milliseconds. - * - * @param {Number} ms - * @returns {Function} channel - * @api public - */ -function intervalChan(ms) { - var ch = make() - var count = 0; - - var int = setInterval(function () { - try { - ch(++count) - } catch (err) { - clearInterval(int) - } - }, ms) - - return ch -} diff --git a/lib/make.js b/lib/make.js deleted file mode 100644 index c82cb16..0000000 --- a/lib/make.js +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Module dependencies. - */ -var Channel = require('./channel') -var async = require('./async') - -/** - * Expose `make`. - */ -module.exports = make - -/** - * Make a channel. - * - * @param {Number} bufferSize optional default=0 - * @return {Function} - * @api public - */ -function make(bufferSize) { - var chan = new Channel(bufferSize) - - var func = function (a, b) { - // yielded - if (typeof a === 'function') { - return chan.get(a) - } - - // (err, res) - if (a === null && typeof b !== 'undefined') { - a = b - } - - // value - return chan.add(a) - } - - // expose public channel methods - func.close = chan.close.bind(chan) - func.done = chan.done.bind(chan) - - // bind async helper - func.async = async.bind(null, func) - - // expose empty value - func.empty = chan.empty - - // cross reference the channel object and function for internal use - func.__chan = chan - chan.func = func - - return func -} diff --git a/lib/receiver.js b/lib/receiver.js deleted file mode 100644 index 0ee6fb8..0000000 --- a/lib/receiver.js +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Expose `Receiver`. - */ -module.exports = Receiver - -/** - * Initialize a `Receiver`. - * - * @param {Mixed} val - * @api private - */ -function Receiver(val) { - this.val = val - this.isAdded = false - this.err = null - this.cb = null - this.isDone = false -} - -/** - * Call the callback if the pending add is complete. - * - * @api private - */ -Receiver.prototype.attemptNotify = function () { - if ((this.isAdded || this.err) && this.cb && !this.isDone) { - this.isDone = true - setImmediate(function () { this.cb(this.err) }.bind(this)) - } -} - -/** - * Reject the pending add with an error. - * - * @param {Error} err - * @api private - */ -Receiver.prototype.error = function (err) { - this.err = err - this.attemptNotify() -} - -/** - * Get the `val` and set the state of the value to added - * - * @return {Mixed} val - * @api private - */ -Receiver.prototype.add = function () { - this.isAdded = true - this.attemptNotify() - return this.val -} - -/** - * Register the callback. - * - * @api private - */ -Receiver.prototype.callback = function (cb) { - this.cb = cb - this.attemptNotify() -} diff --git a/lib/select.js b/lib/select.js deleted file mode 100644 index 2bd8f59..0000000 --- a/lib/select.js +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./make') -var Channel = require('./channel') - -/** - * Expose `select`. - */ -module.exports = select - -/** - * Return the first of the given channels with a value. - * - * @param {Function} channels... - * @return {Function} - * @api public - */ -function select(/*channels...*/) { - var selectCh = make(arguments.length) - var chans = [].slice.call(arguments, 0) - var remaining = chans.length - - // get all channels with values waiting - var full = chans.filter(function (ch) { - return ch.__chan.items.length + ch.__chan.pendingAdds.length > 0 - }) - - // define get callback - var get = function (err, value) { - var args = arguments - var ch = Channel.lastCalled - - // don't select an channel returning an empty value, unless it is last - if (value === ch.empty && --remaining > 0) { - return - } - - // remove get callback from all selected channels - chans.forEach(function (ch) { ch.__chan.removeGet(get) }) - - // add temporary selected yieldable function - ch.selected = function (cb) { - delete ch.selected - cb.apply(null, args) - } - - // added the selected channel to the select channel - selectCh(null, ch) - selectCh.close() - } - - if (full.length > 1) { - // multiple channels with waiting values, pick one at random - full[Math.floor(Math.random() * full.length)](get) - } else { - // add get callback to all channels - chans.forEach(function (ch) { ch(get) }) - } - - return selectCh -} diff --git a/lib/timeout.js b/lib/timeout.js deleted file mode 100644 index cf3d960..0000000 --- a/lib/timeout.js +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Module dependencies. - */ -var make = require('./make') - -/** - * Expose `timeoutChan`. - */ -module.exports = timeoutChan - -/** - * Make a timeout channel that receives `true` after a number of milliseconds. - * - * @param {Number} ms - * @returns {Function} channel - * @api public - */ -function timeoutChan(ms) { - var ch = make() - - setTimeout(function () { - try { - ch(true) - ch.close() - } catch(err) {} - }, ms) - - return ch -} diff --git a/package.json b/package.json index c255406..0d8797e 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,14 @@ { "name": "chan", - "version": "0.6.1", - "description": "A go style channel implementation that works nicely with co", - "main": "index.js", + "version": "1.0.0", + "description": "Go-like channels for TypeScript and JavaScript.", + "main": "out/src/index.js", "scripts": { - "test": "mocha" + "build": "tsc", + "prepublish": "npm run build", + "test": "npm run lint && npm run build && mocha --require source-map-support/register 'dist/**/*.test.js'", + "debug": "npm run build && mocha debug --require source-map-support/register --no-timeouts 'dist/**/*.test.js'", + "lint": "tslint 'src/**/*.ts'" }, "repository": { "type": "git", @@ -12,10 +16,12 @@ }, "keywords": [ "async", - "go", + "await", "channel", - "co", - "generator" + "csp", + "buffer", + "golang", + "go" ], "author": "Brent Burgoyne", "contributors": [ @@ -37,12 +43,13 @@ }, "homepage": "https://github.com/brentburgoyne/chan", "devDependencies": { - "co": "^3.0.6", - "expect.js": "^0.3.1", - "mocha": "^1.20.1", - "should": "^4.0.4", - "sinon": "^1.10.3", - "split": "^0.3.0", - "superagent": "^0.18.0" + "@types/node": "^6.0.52", + "mocha": "^3.1.0", + "mochon": "^1.0.0", + "sinon": "^1.17.4", + "source-map-support": "^0.4.7", + "tslint": "^4.1.1", + "tslint-config-standard": "^2.0.0", + "typescript": "^2.1.4" } } diff --git a/src/buffer.test.ts b/src/buffer.test.ts new file mode 100644 index 0000000..ee16c43 --- /dev/null +++ b/src/buffer.test.ts @@ -0,0 +1,97 @@ +import {ok, equal, notEqual} from 'assert' +import {describe, it, beforeEach, afterEach} from 'mocha' +import * as buffer from './buffer' + +describe('Buffer', () => { + function common (Buffer) { + describe(`${Buffer.name} common`, () => { + describe('constructor', () => { + it('sets size from argument', () => { + const size = 47 + const b = new Buffer(size) + equal(b.size, size) + }) + }) + + describe('shift', () => { + it('removes the first value from the buffer', () => { + const b = new Buffer(47) + b.push(() => 1) + b.push(() => 2) + equal(b.shift(), 1) + }) + }) + + describe('hasValues', () => { + it('returns true if the buffer has values', () => { + const b = new Buffer(47) + b.push(() => 1) + ok(b.hasValues()) + }) + + it('returns false if the buffer has no values', () => { + const b = new Buffer(47) + ok(!b.hasValues()) + }) + }) + }) + } + + common(buffer.BufferBlocking) + common(buffer.BufferSliding) + common(buffer.BufferDropping) + + describe('BufferBlocking push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferBlocking(1) + const val = {} + equal(b.push(() => val), true) + equal(b.shift(), val) + }) + + it('returns false if there is not room', () => { + const b = new buffer.BufferBlocking(1) + const val = {} + b.push(() => ({})) + ok(!b.push(() => val)) + b.shift() + notEqual(b.shift(), val) + }) + }) + + describe('BufferSliding push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferSliding(1) + const val = {} + equal(b.push(() => val), true) + equal(b.shift(), val) + }) + + it('puts getValue() on buffer and removes first value if no room', () => { + const b = new buffer.BufferSliding(2) + b.push(() => 1) + b.push(() => 2) + equal(b.push(() => 3), true) + equal(b.shift(), 2) + equal(b.shift(), 3) + }) + }) + + describe('BufferDropping push', () => { + it('puts getValue() on buffer and returns true if there is room', () => { + const b = new buffer.BufferDropping(1) + const val = {} + equal(b.push(() => val), true) + equal(b.shift(), val) + }) + + it('does not put value on buffer and returns true if no room', () => { + const b = new buffer.BufferDropping(2) + b.push(() => 1) + b.push(() => 2) + equal(b.push(() => 3), true) + equal(b.shift(), 1) + equal(b.shift(), 2) + }) + }) +}) diff --git a/src/buffer.ts b/src/buffer.ts new file mode 100644 index 0000000..6780856 --- /dev/null +++ b/src/buffer.ts @@ -0,0 +1,54 @@ +import {Queue} from './queue' + +export interface IBuffer { + shift (): T | undefined + hasValues (): boolean + push (getValue: () => T): boolean +} + +export abstract class BufferBase implements IBuffer { + // TODO: Use ring buffer instead of queue + protected values = new Queue() + + constructor (protected size: number) {} + + public shift (): T | undefined { + return this.values.shift() + } + + public hasValues (): boolean { + return this.values.notEmpty() + } + + abstract push (getValue: () => T): boolean +} + +export class BufferBlocking extends BufferBase { + public push (getValue: () => T): boolean { + if (this.values.size() < this.size) { + this.values.push(getValue()) + return true + } + return false + } +} + +export class BufferDropping extends BufferBase { + public push (getValue: () => T): boolean { + const value = getValue() + if (this.values.size() < this.size) { + this.values.push(value) + } + return true + } +} + +export class BufferSliding extends BufferBase { + public push (getValue: () => T): boolean { + this.values.push(getValue()) + if (this.values.size() > this.size) { + this.values.shift() + } + return true + } +} diff --git a/src/channel.test.ts b/src/channel.test.ts new file mode 100644 index 0000000..2328aa5 --- /dev/null +++ b/src/channel.test.ts @@ -0,0 +1,201 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {Channel} from './channel' +import {BufferBlocking} from './buffer' +import * as Deferred from './deferred' + +describe('Channel', () => { + let buffer + let ch + let deferred + let deferredStub + + beforeEach(() => { + deferred = new Deferred.Deferred() + buffer = new BufferBlocking(5) + deferredStub = sinon.stub(Deferred, 'Deferred').returns(deferred) + ch = new Channel(buffer) + }) + + afterEach(() => { + deferredStub.restore() + }) + + describe('take', () => { + it('returns deferred promise', () => { + equal(ch.take(), deferred.promise) + }) + + it('resolves with the first value in the buffer', async () => { + const val = {} + ch.put(val) + ch.put({}) + equal(await ch.take(), val) + }) + + it('resolves when a value is added after the take', async () => { + const val = {} + setImmediate(() => ch.put(val)) + equal(await ch.take(), val) + }) + + it('rejects when called on a closed channel', async () => { + ch.close() + let err + try { + await ch.take() + } catch (e) { + err = e + } + ok(err) + }) + }) + + describe('then', () => { + it('proxies call to then of promise returned by take', () => { + const ret = {} + const then = sinon.stub().returns(ret) + sinon.stub(ch, 'take').returns({then}) + const onFullfilled = {} + const onRejected = {} + equal(ch.then(onFullfilled, onRejected), ret) + sinon.assert.calledWithExactly(then, onFullfilled, onRejected) + }) + }) + + describe('cancelableTake', () => { + it('returns a promise and a cancel function', () => { + const [promise, cancel] = ch.cancelableTake() + ok(promise instanceof Promise) + equal(typeof cancel, 'function') + }) + + it('promise comes from call to take', () => { + const expected = {} + sinon.stub(ch, 'take').returns(expected) + const [promise] = ch.cancelableTake() + equal(promise, expected) + }) + + it('cancel function removes pending take', async () => { + deferredStub.restore() + const [first, cancel] = ch.cancelableTake() + first.catch(() => null) + const second = ch.take() + cancel() + const val = {} + ch.put(val) + equal(await second, val) + }) + + it('promise for pending take is rejected', async () => { + let rejectedErr + const [promise, cancel] = ch.cancelableTake() + cancel() + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('hasValues', () => { + it('returns true if the buffer has values', () => { + ch.buffer = {hasValues: () => true} + ch.pendingPuts = {notEmpty: () => false} + ok(ch.hasValues()) + }) + + it('returns true if the channel has pending puts', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {notEmpty: () => true} + ok(ch.hasValues()) + }) + + it('returns false if buffer and pending puts are empty', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {notEmpty: () => false} + ok(!ch.hasValues()) + }) + }) + + describe('put', () => { + it('resolves the first pending take', async () => { + const take1 = ch.take() + ch.take() + const val = {} + await ch.put(val) + equal(await take1, val) + }) + + it('puts deferred put in the buffer', () => { + sinon.stub(ch.buffer, 'push').returns(true) + const val = {} + ch.put(val) + equal(ch.buffer.push.firstCall.args[0](), val) + }) + + it('puts deferred in pending puts if buffer is full', () => { + sinon.stub(ch.buffer, 'push').returns(false) + const deferredPut = {} + const stub = sinon.stub(Deferred, 'DeferredPut').returns(deferredPut) + ch.put({}) + equal(ch.pendingPuts.peek(), deferredPut) + stub.restore() + }) + + it('is rejected if called on a closed channel', async () => { + ch.close() + let rejectedErr + try { + await ch.put({}) + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('close', () => { + beforeEach(() => { + deferredStub.restore() + }) + + it('sets isClosed to true', () => { + ch.close() + ok(ch.isClosed) + }) + + it('sets isDone to true', () => { + ch.close() + ok(ch.isDone) + }) + + it('rejects promises for any pending puts', async () => { + ch = new Channel(new BufferBlocking(0)) + const promise = ch.put({}) + ch.close() + let rejectedErr + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + + it('rejects any pending takes', async () => { + setImmediate(() => ch.close()) + let takeErr + try { + await ch.take() + } catch (err) { + takeErr = err + } + ok(takeErr) + }) + }) +}) diff --git a/src/channel.ts b/src/channel.ts new file mode 100644 index 0000000..33bb64a --- /dev/null +++ b/src/channel.ts @@ -0,0 +1,150 @@ +import {IBuffer} from './buffer' +import {Deferred, DeferredPut, PromiseWithDeferred} from './deferred' +import {CanceledTakeError, ClosedTakeError, ClosedPutError} from './error' +import {Queue} from './queue' + +export interface OutChannel extends PromiseLike { + take (): PromiseWithDeferred + cancelableTake (): [PromiseWithDeferred, () => void] + hasValues (): boolean +} + +export interface InChannel { + put (T): Promise + close (): void +} + +/** + * A buffered asynchronous queue of values which can be used to coordinate + * between multiple async functions. + */ +export class Channel implements InChannel, OutChannel { + private pendingPuts = new Queue>() + private pendingTakes = new Queue>() + private isClosed = false + private isDone = false + + /** + * Creates a new Channel using any buffer that satisfies the required + * interface. + */ + constructor (private buffer: IBuffer) {} + + /** + * A shortcut for calling `then` on a take, allowing a channel to be awated + * directly. If called multiple times each call with trigger a new take + * from the channel. + */ + public then (onFulfilled, onRejected) { + return this.take().then(onFulfilled, onRejected) + } + + /** + * Return a promise that will be resolved with the next value put on the + * channel. If there are existing values in the channel's buffer, the promise + * will be resolved with the first value. + * + * If called on a channel that is closed and has an empty buffer the promise + * will be rejected with a `ClosedTakeError`. + */ + public take (): PromiseWithDeferred { + const deferred = new Deferred() + if (this.done()) { + deferred.reject(new ClosedTakeError()) + } else if (this.hasValues()) { + this.resolve(deferred, this.nextValue()) + } else { + this.pendingTakes.push(deferred) + } + return deferred.promise + } + + /** + * Initial a new take from the channel returning a promise/cancel function + * pair. If the cancel function is called before the promise resolves, the + * take will be canceled and the next value put on the channel with be + * handled by the next take instead. + */ + public cancelableTake (): [PromiseWithDeferred, () => void] { + const promise = this.take() + return [ + promise, + () => this.removePendingTake(promise.deferred) + ] + } + + /** + * Return a boolean indicating if the channel's buffer has values available + * to be taken. + */ + public hasValues (): boolean { + return this.buffer.hasValues() || this.pendingPuts.notEmpty() + } + + /** + * Put a new value on the channel returning a promise that will be resolved + * once the value has be added to the buffer. + * + * If the channel is closed before the value is in the buffer, the returned + * promise will be rejected with a `ClosedPutError`. + */ + public put (value: T): Promise { + const deferred = new DeferredPut(value) + if (this.isClosed) { + deferred.reject(new ClosedPutError()) + } else if (this.pendingTakes.notEmpty()) { + this.resolve(this.pendingTakes.shift(), deferred.put()) + } else if (!this.buffer.push(deferred.put)) { + this.pendingPuts.push(deferred) + } + return deferred.promise + } + + /** + * Mark a channel as closed. All pending puts on the channel will be rejected + * immediately. Pending takes will be rejected once all values in the buffer + * have been taken. + */ + public close (): void { + this.isClosed = true + const err = new ClosedPutError() + let pendingPut + while (pendingPut = this.pendingPuts.shift()) { + pendingPut.reject(err) + } + this.done() + } + + private removePendingTake (deferred: Deferred): void { + if (this.pendingTakes.remove(deferred)) { + deferred.reject(new CanceledTakeError()) + } + } + + private nextValue (): T | undefined { + const pendingPut = this.pendingPuts.shift() + if (pendingPut) { + this.buffer.push(pendingPut.put) + } + return this.buffer.shift() + } + + private resolve (deferred: Deferred | undefined, value: T | undefined): void { + if (deferred && value) { + deferred.resolve(value) + } + this.done() + } + + private done (): boolean { + if (!this.isDone && this.isClosed && !this.buffer.hasValues()) { + this.isDone = true + const err = new ClosedTakeError() + let pendingTake: Deferred | undefined + while (pendingTake = this.pendingTakes.shift()) { + pendingTake.reject(err) + } + } + return this.isDone + } +} diff --git a/src/deferred.test.ts b/src/deferred.test.ts new file mode 100644 index 0000000..c03fa65 --- /dev/null +++ b/src/deferred.test.ts @@ -0,0 +1,56 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {Deferred, DeferredPut} from './deferred' + +describe('Deferred', () => { + + describe('constructor', () => { + let deferred + + beforeEach(() => { + deferred = new Deferred() + }) + + it('creates a new promise', () => { + ok(deferred.promise instanceof Promise) + }) + + it('resolve fulfills the promise with value', async () => { + const expected = 'value' + deferred.resolve(expected) + equal(await deferred.promise, expected) + }) + + it('reject rejects the promise with error', async () => { + const expected = {} + deferred.reject(expected) + let err + try { + await deferred.promise + } catch (e) { + err = e + } + equal(err, expected) + }) + }) + + describe('DeferredPut', () => { + let value + let deferred + + beforeEach(() => { + value = 'something' + deferred = new DeferredPut(value) + }) + + it('resolves the promise with undefined', async () => { + deferred.put() + equal(await deferred.promise, undefined) + }) + + it('returns the value', async () => { + equal(deferred.put(), value) + }) + }) +}) diff --git a/src/deferred.ts b/src/deferred.ts new file mode 100644 index 0000000..f3708ba --- /dev/null +++ b/src/deferred.ts @@ -0,0 +1,29 @@ +export class PromiseWithDeferred extends Promise { + public deferred: Deferred +} + +export class Deferred { + public promise: PromiseWithDeferred + public resolve: (value: T) => void + public reject: (err: Error) => void + + constructor () { + this.promise = new PromiseWithDeferred((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + this.promise.deferred = this + } +} + +export class DeferredPut extends Deferred { + public put: () => T + + constructor (private value: T) { + super() + this.put = () => { + this.resolve(undefined) + return this.value + } + } +} diff --git a/src/error.test.ts b/src/error.test.ts new file mode 100644 index 0000000..9bcff01 --- /dev/null +++ b/src/error.test.ts @@ -0,0 +1,36 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {CanceledTakeError, ClosedTakeError, ClosedPutError} from './error' + +describe('Errors', () => { + function errorTests (ErrorClass, name) { + describe(name, () => { + let error + + beforeEach(() => { + error = new ErrorClass() + }) + + it('is an instance of Error', () => { + ok(error instanceof Error) + }) + + it('is an instance of its own class', () => { + ok(error instanceof ErrorClass) + }) + + it('has a stack trace', () => { + ok(/beforeEach/.test(error.stack)) + }) + + it('has the correct name', () => { + equal(error.name, name) + }) + }) + } + + errorTests(CanceledTakeError, 'CanceledTakeError') + errorTests(ClosedTakeError, 'ClosedTakeError') + errorTests(ClosedPutError, 'ClosedPutError') +}) diff --git a/src/error.ts b/src/error.ts new file mode 100644 index 0000000..9ae1423 --- /dev/null +++ b/src/error.ts @@ -0,0 +1,44 @@ +/** + * A custom error type used when rejecting the promise for a canceled take. + */ +export class CanceledTakeError extends Error { + constructor () { + super() + const err = Object.create(CanceledTakeError.prototype) + err.message = 'Pending take from channel was canceled.' + err.name = 'CanceledTakeError' + err.stack = this.stack + return err + } +} + +/** + * A custom error type used when rejecting the promise for a take when the + * channel is closed and there are not any values left in the buffer to be + * taken. + */ +export class ClosedTakeError extends Error { + constructor () { + super() + const err = Object.create(ClosedTakeError.prototype) + err.message = 'Cannot take a value from an empty closed channel.' + err.name = 'ClosedTakeError' + err.stack = this.stack + return err + } +} + +/** + * A custom error type used when rejecting the promise for a put when the + * channel is closed before the value can be put in the buffer. + */ +export class ClosedPutError extends Error { + constructor () { + super() + const err = Object.create(ClosedPutError.prototype) + err.message = 'Cannot put a value on a closed channel.' + err.name = 'ClosedPutError' + err.stack = this.stack + return err + } +} diff --git a/src/factory.test.ts b/src/factory.test.ts new file mode 100644 index 0000000..fddab72 --- /dev/null +++ b/src/factory.test.ts @@ -0,0 +1,61 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {blockingChannel, slidingChannel, droppingChannel} from './factory' +import * as bufferModule from './buffer' +import * as channelModule from './channel' + +describe('Factory functions', () => { + let channel + let channelStub + let buffer + let bufferStub + + beforeEach(() => { + channel = {} + buffer = {} + channelStub = sinon.stub(channelModule, 'Channel').returns(channel) + }) + + afterEach(() => { + channelStub.restore() + }) + + function testFactory (factory, bufferType, defaultSize) { + describe(factory.name, () => { + beforeEach(() => { + bufferStub = sinon.stub(bufferModule, bufferType).returns(buffer) + }) + + afterEach(() => { + bufferStub.restore() + }) + + it('constructs a new buffer with size', () => { + const size = 5 + factory(size) + sinon.assert.calledWithExactly(bufferStub, size) + }) + + if (defaultSize != null) { + it(`has a default buffer size of ${defaultSize}`, () => { + factory() + sinon.assert.calledWithExactly(bufferStub, defaultSize) + }) + } + + it('constructs a new channel with buffer', () => { + factory() + sinon.assert.calledWithExactly(channelStub, buffer) + }) + + it('returns the new channel', () => { + equal(factory(), channel) + }) + }) + } + + testFactory(blockingChannel, 'BufferBlocking', 0) + testFactory(slidingChannel, 'BufferSliding', null) + testFactory(blockingChannel, 'BufferBlocking', null) +}) diff --git a/src/factory.ts b/src/factory.ts new file mode 100644 index 0000000..bdd38c0 --- /dev/null +++ b/src/factory.ts @@ -0,0 +1,35 @@ +import {BufferBlocking, BufferSliding, BufferDropping} from './buffer' +import {Channel} from './channel' + +/** + * Create a channel with a blocking buffer of a given size. If the buffer is + * full, calls to `put` will block until there is room in the buffer. Once + * space if available the value will be added to the end of the buffer and + * the returned promise will resolve allowing execution of the calling async + * function to continue. + * + * If called without a buffer size, it will default to `0` and will function as + * an unbuffered channel. Calls to `put` will block until another async + * function calls `take` on the channel. + */ +export function blockingChannel (size: number = 0): Channel { + return new Channel(new BufferBlocking(size)) +} + +/** + * Creata a channel with a sliding buffer of a given size. If the buffer is + * full, calls to `put` will cause the first value in the buffer to drop and + * the new value will be added to the end of the buffer. + */ +export function slidingChannel (size: number): Channel { + return new Channel(new BufferSliding(size)) +} + +/** + * Create a channel with a dropping buffer of a given size. If the buffer is + * full, calls to `put` will be ignored and the value will not be added to the + * channel. + */ +export function droppingChannel (size: number): Channel { + return new Channel(new BufferDropping(size)) +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..66cff27 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,13 @@ +import {Channel} from './channel' +import {blockingChannel, slidingChannel, droppingChannel} from './factory' +import {select, Selector} from './select' +import {timeout} from './timeout' + +export = Object.assign(blockingChannel, { + sliding: slidingChannel, + dropping: droppingChannel, + Channel, + timeout, + select, + Selector +}) diff --git a/src/queue.test.ts b/src/queue.test.ts new file mode 100644 index 0000000..4f5dfb5 --- /dev/null +++ b/src/queue.test.ts @@ -0,0 +1,104 @@ +import {equal, ok} from 'assert' +import * as sinon from 'sinon' +import {describe, it, beforeEach, afterEach} from 'mocha' +import {Queue} from './queue' + +describe('Queue', () => { + let queue: Queue + + beforeEach(() => { + queue = new Queue() + }) + + describe('push/shift', () => { + it('is first in first out', () => { + queue.push(1) + queue.push(2) + queue.push(3) + equal(queue.shift(), 1) + equal(queue.shift(), 2) + equal(queue.shift(), 3) + }) + }) + + describe('peek', () => { + it('returns the first value', () => { + queue.push(1) + equal(queue.peek(), 1) + }) + + it('does not remove the first value from the queue', () => { + queue.push(1) + queue.peek() + equal(queue.shift(), 1) + }) + }) + + describe('size', () => { + it('returns the number of values push on the queue', () => { + queue.push(1) + queue.push(2) + equal(queue.size(), 2) + }) + + it('returns zero for empty queues', () => { + equal(queue.size(), 0) + }) + + it('returns the size reduced by the number of values shifted', () => { + queue.push(1) + queue.push(2) + queue.shift() + equal(queue.size(), 1) + }) + }) + + describe('empty', () => { + it('returns true when no values are in the queue', () => { + equal(queue.empty(), true) + queue.push(1) + queue.shift() + equal(queue.empty(), true) + }) + + it('returns false when values are in the queue', () => { + queue.push(1) + equal(queue.empty(), false) + }) + }) + + describe('notEmpty', () => { + it('returns false when no values are in the queue', () => { + equal(queue.notEmpty(), false) + queue.push(1) + queue.shift() + equal(queue.notEmpty(), false) + }) + + it('returns true when values are in the queue', () => { + queue.push(1) + equal(queue.notEmpty(), true) + }) + }) + + describe('remove', () => { + it('removes value from the queue', () => { + queue.push(1) + queue.push(2) + queue.push(3) + queue.remove(2) + equal(queue.shift(), 1) + equal(queue.shift(), 3) + }) + + it('returns true if value was removed', () => { + queue.push(1) + equal(queue.remove(1), true) + }) + + it('returns false if value was not found', () => { + queue.push(1) + equal(queue.remove(2), false) + }) + }) +}) diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 0000000..eef98aa --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,38 @@ +// TODO: implement proper queue with O(1) push and shift +// Array based queue has O(n) shifts +export class Queue { + private items: Array = [] + + public push (value: T): void { + this.items.push(value) + } + + public shift (): T | undefined { + return this.items.shift() + } + + public peek (): T | undefined { + return this.items[0] + } + + public size (): number { + return this.items.length + } + + public empty (): boolean { + return this.size() === 0 + } + + public notEmpty (): boolean { + return this.size() > 0 + } + + public remove (value: T): boolean { + const idx = this.items.indexOf(value) + if (idx > -1) { + this.items.splice(idx, 1) + return true + } + return false + } +} diff --git a/src/select.ts b/src/select.ts new file mode 100644 index 0000000..7b5c495 --- /dev/null +++ b/src/select.ts @@ -0,0 +1,103 @@ +import {OutChannel} from './channel' +import {Deferred} from './deferred' +import {ClosedTakeError} from './error' +import {timeout} from './timeout' + +/** + * Create a new thenable select with support for chaning `case`, `timeout` and + * `default` methods. + */ +export function select (): Selector { + return new Selector() +} + +/** + * A thenable builder that allows adding handlers for multiple channels as well + * as a timeout or default handler. The thenable resolves once on of the + * handlers has been called. If the handler is asynchronous (returns a promise) + * the selector will resolve once the handler is resolved. + * + * If a default is specified, it will be called immediatly if non of the + * channels have values ready to take. If multiple channels have a pending + * value, one channel will be chosen at random. + */ +export class Selector implements PromiseLike { + private handlers = new Map() + private defaultFn: (() => Promise | void) | undefined + private cancels: Array<() => void> = [] + private deferred = new Deferred() + private remaining = 0 + private executed = false + + /** + * Add a channel to the select along with the function to be called with its + * value if the channel is selected. If the function is an async function or + * any function that returns a promise. The select will not resolve until + * the returned promise resolves. + */ + public case (ch: OutChannel, fn: (T) => Promise | void): this { + this.handlers.set(ch, (value: any) => fn(value as T)) + return this + } + + /** + * A shortcut for adding a case to the select with a timeout channel. If + * another channel in the select does not have a value to take within the + * given number of milliseconds, the timeout handler will be called instead. + */ + public timeout (ms: number, fn: () => Promise | undefined): this { + this.case(timeout(ms), fn) + return this + } + + /** + * Register a handler function to be called if none of the channels have a + * value to take at the time of the select. If the select has a default it + * will always resolve immediatly. + */ + public default (fn: () => Promise | void): this { + this.defaultFn = fn + return this + } + + /** + * Executes the select by taking from one or more channels, or by calling the + * default handler. If a take is initiated from more than one channel, all + * other takes will be canceled after the first one resolves. + */ + public then (onFulfilled, onRejected) { + if (!this.executed) { + this.execute() + } + return this.deferred.promise.then(onFulfilled, onRejected) + } + + private execute (): void { + const channels = Array.from(this.handlers.keys()) + const nonEmpty = channels.filter(c => c.hasValues()) + if (this.defaultFn && nonEmpty.length === 0) { + const done = Promise.resolve(this.defaultFn()) + done.then(this.deferred.resolve, this.deferred.reject) + } else if (nonEmpty.length > 0) { + this.take(nonEmpty[Math.random() * nonEmpty.length | 0]) + } else { + this.remaining = channels.length + channels.forEach(c => this.take(c)) + } + this.executed = true + } + + private take (channel: OutChannel): void { + const [promise, cancel] = channel.cancelableTake() + this.cancels.push(cancel) + promise.then((value) => { + this.cancels.forEach(fn => fn()) + const done = Promise.resolve(this.handlers.get(promise)(value)) + done.then(this.deferred.resolve, this.deferred.reject) + }).catch((err) => { + if (err instanceof ClosedTakeError === false || --this.remaining === 0) { + this.deferred.reject(err) + } + }) + } +} diff --git a/src/timeout.ts b/src/timeout.ts new file mode 100644 index 0000000..9ca4fe0 --- /dev/null +++ b/src/timeout.ts @@ -0,0 +1,21 @@ +import {blockingChannel} from './factory' +import {Channel} from './channel' + +/** + * Create a new channel that will receive a single value after a given number + * of milliseconds. The channel will be closed and cannot be reused. + */ +export function timeout (ms: number): Channel { + const ch = blockingChannel() + setTimeout( + () => { + try { + ch.put(null) + ch.close() + // tslint:disable-next-line:no-empty + } catch (err) {} + }, + ms + ) + return ch +} diff --git a/test/async.js b/test/async.js deleted file mode 100644 index 0ccf741..0000000 --- a/test/async.js +++ /dev/null @@ -1,71 +0,0 @@ -/* jshint expr:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var async = require('../lib/async') -var should = require('should') -var sinon = require('sinon') - -describe('Async helper', function () { - - var err = {} - var val = {} - var ch - var fn - - beforeEach(function () { - ch = sinon.stub().returns(function (cb) { cb() }) - fn = sinon.stub().yields(err, val) - }) - - it( - 'should return a function with an arity of 1', - function () { - var thunk = async(ch, fn) - thunk.should.be.a.Function - thunk.length.should.be.exactly(1) - } - ) - - it( - 'should call fn with args plus a callback', - function () { - async(ch, fn, 1, 2, 3, 'foo') - var argsWithoutCb = fn.firstCall.args.slice(0, -1) - argsWithoutCb.should.eql([1, 2, 3, 'foo']) - } - ) - - it( - 'should call a method of an object with the third argument as the name', - function () { - var ob = { foo: fn } - async(ch, ob, 'foo', 1, 2, 3) - var argsWithoutCb = fn.firstCall.args.slice(0, -1) - argsWithoutCb.should.eql([1, 2, 3]) - fn.firstCall.calledOn(ob).should.be.true - } - ) - - it( - 'should call channel with arguments of the async function callback', - function () { - async(ch, fn) - ch.firstCall.args.length.should.be.exactly(2) - ch.firstCall.args[0].should.be.exactly(err) - ch.firstCall.args[1].should.be.exactly(val) - } - ) - - it( - 'should call callback given to returned function', - function (done) { - var cb = sinon.spy() - async(ch, fn)(cb) - setImmediate(function () { - cb.callCount.should.be.exactly(1) - done() - }) - } - ) - -}) diff --git a/test/buffered.js b/test/buffered.js deleted file mode 100644 index 8f1e8de..0000000 --- a/test/buffered.js +++ /dev/null @@ -1,116 +0,0 @@ -/* jshint loopfunc: true */ -/* global describe:true, beforeEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') - -describe('A unbuffered channel', function () { - - it( - 'should not call the added callback until the value is removed', - function (done) { - var ch = chan(0) // unbuffered - var cbCalled = false - ch('foo')(function () { - cbCalled = true - }) - setImmediate(function () { - expect(cbCalled).to.not.be.ok() - ch(function (err, val) { - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - }) - }) - } - ) - -}) - -describe('A buffered channel', function () { - - it( - 'should pull values from the buffer when yielded', - function (done) { - var ch = chan(1) - var cbCalled = false - var testValue = 'foo' - ch(testValue) - ch(function (err, val) { - cbCalled = true - expect(val).to.be(testValue) - }) - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - } - ) - - describe('with a non-full buffer', function () { - - it( - 'should call added callback as soon as it is given to the returned thunk', - function (done) { - var buffer = 3 - var ch = chan(buffer) - var called = 0 - var added = 0 - while (++added <= buffer + 10) { - ch(added)(function (err) { - called++ - }) - } - setImmediate(function () { - expect(called).to.be(buffer) - done() - }) - } - ) - - }) - - describe('with a full buffer', function () { - - it( - 'should not add another value untill a value has been removed', - function (done) { - var ch = chan(1) - var cbCalled = false - ch('foo') - ch('bar')(function () { - cbCalled = true - }) - setImmediate(function () { - expect(cbCalled).to.not.be.ok() - ch(function (err, val) { - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - }) - }) - } - ) - - it( - 'should call cb with an error when the channel is closed before adding', - function (done) { - var ch = chan(0) - var cbCalled = false - ch('foo')(function (err) { - cbCalled = true - expect(err).to.be.an(Error) - }) - ch.close() - setImmediate(function () { - expect(cbCalled).to.be.ok() - done() - }) - } - ) - - }) - -}) diff --git a/test/chan.js b/test/chan.js deleted file mode 100644 index 010306e..0000000 --- a/test/chan.js +++ /dev/null @@ -1,112 +0,0 @@ -/* global describe:true, beforeEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') -var fs = require('fs') - -describe('Channel make', function () { - - it( - 'should return a channel function', - function () { - var ch = chan() - expect(ch).to.be.a(Function) - } - ) - -}) - -describe('A channel', function () { - - var ch - - beforeEach(function () { - ch = chan() - }) - - it( - 'should receive a value of any non-function type as the first argument', - function () { - var typeCases = [ - 1, - 'foo', - [1, 2 , 3], - {foo: 'bar'}, - true, - false, - null, - void 0 - ] - typeCases.forEach(function (val) { - ch(val) - ch(function (err, result) { - expect(result).to.be(val) - }) - }) - } - ) - - it( - 'should receive a function value as a second argument if the first is null', - function () { - ch(null, function () {}) - ch(function (err, result) { - expect(result).to.be.a(Function) - }) - } - ) - - it( - 'should queue values until they are yielded/removed', - function () { - var values = [1, 2, 3, 4, 5] - values.forEach(function (value) { - ch(value) - }) - values.forEach(function (value) { - ch(function (err, result) { - expect(result).to.be(value) - }) - }) - } - ) - - it( - 'should queue callbacks until values are added', - function () { - var values = [1, 2, 3, 4, 5] - values.forEach(function (value) { - ch(function (err, result) { - expect(result).to.be(value) - }) - }) - values.forEach(function (value) { - ch(value) - }) - } - ) - - it( - 'should pass errors as the first argument to callbacks', - function () { - var e = new Error('Foo') - ch(e) - ch(function (err) { - expect(err).to.be(e) - }) - } - ) - - it( - 'should be useable directly as a callback for node style async functions', - function (done) { - ch(function (err, contents) { - expect(err).to.be(null) - expect(contents).to.be.a(Buffer) - done() - }) - fs.readFile(__filename, ch) - } - ) - -}) diff --git a/test/channel.test.js b/test/channel.test.js new file mode 100644 index 0000000..ebd67ff --- /dev/null +++ b/test/channel.test.js @@ -0,0 +1,191 @@ +import {ok, equal, deepEqual} from 'assert' +import mochon from 'mochon' +import Channel from '../src/channel' +import Deferred, * as allDeferred from '../src/deferred' +import {BufferBlocking} from '../src/buffer' + +describe('Channel', () => { + const sinon = mochon() + + let ch, buffer, deferred + + beforeEach(() => { + deferred = new Deferred() + buffer = new BufferBlocking(5) + ch = new Channel(buffer) + sinon.stub(allDeferred, 'default').returns(deferred) + }) + + describe('constructor', () => { + it('sets buffer prop from arg', () => { + equal(ch.buffer, buffer) + }) + + it('sets initial properties', () => { + deepEqual(ch.pendingPuts, []) + deepEqual(ch.pendingTakes, []) + equal(ch.isClosed, false) + equal(ch.isDone, false) + deepEqual(ch.empty, {}) + }) + }) + + describe('take', () => { + it('returns deferred promise', () => { + equal(ch.take(), deferred.promise) + }) + + it('resolves with the first value in the buffer', async () => { + const val = {} + ch.put(val) + ch.put({}) + equal(await ch.take(), val) + }) + + it('resolves when a value is added after the take', async () => { + const val = {} + setImmediate(() => ch.put(val)) + equal(await ch.take(), val) + }) + + it('resolve with empty when called on a closed channel', async () => { + ch.close() + equal(await ch.take(), ch.empty) + }) + }) + + describe('then', () => { + it('proxies call to then of promise returned by take', () => { + const ret = {} + const then = sinon.stub().returns(ret) + sinon.stub(ch, 'take').returns({then}) + const onFullfilled = {} + const onRejected = {} + equal(ch.then(onFullfilled, onRejected), ret) + sinon.assert.calledWithExactly(then, onFullfilled, onRejected) + }) + }) + + describe('cancelableTake', () => { + it('returns a promise and a cancel function', () => { + const [promise, cancel] = ch.cancelableTake() + ok(promise instanceof Promise) + equal(typeof cancel, 'function') + }) + + it('promise comes from call to take', () => { + const expected = {} + sinon.stub(ch, 'take').returns(expected) + const [promise] = ch.cancelableTake() + equal(promise, expected) + }) + + it('cancel function removes pending take', async () => { + Deferred.restore() + const [, cancel] = ch.cancelableTake() + const second = ch.take() + cancel() + const val = {} + ch.put(val) + equal(await second, val) + }) + + it('promise for pending take is rejected', async () => { + let rejectedErr + const [promise, cancel] = ch.cancelableTake() + cancel() + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('hasValues', () => { + it('returns true if the buffer has values', () => { + ch.buffer = {hasValues: () => true} + ch.pendingPuts = {length: 0} + ok(ch.hasValues()) + }) + + it('returns true if the channel has pending puts', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {length: 1} + ok(ch.hasValues()) + }) + + it('returns false if buffer and pending puts are empty', () => { + ch.buffer = {hasValues: () => false} + ch.pendingPuts = {length: 0} + ok(!ch.hasValues()) + }) + }) + + describe('put', () => { + it('resolves the first pending take', async () => { + const take1 = ch.take() + ch.take() + const val = {} + await ch.put(val) + equal(await take1, val) + }) + + it('puts deferred put in the buffer', () => { + sinon.stub(ch.buffer, 'push').returns(true) + const val = {} + ch.put(val) + equal(ch.buffer.push.firstCall.args[0](), val) + }) + + it('puts deferred in pending puts if buffer is full', () => { + sinon.stub(ch.buffer, 'push').returns(false) + const deferredPut = {} + sinon.stub(allDeferred, 'DeferredPut').returns(deferredPut) + ch.put({}) + equal(ch.pendingPuts[0], deferredPut) + }) + + it('is rejected if called on a closed channel', async () => { + ch.close() + let rejectedErr + try { + await ch.put({}) + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + }) + + describe('close', () => { + it('sets isClosed to true', () => { + ch.close() + ok(ch.isClosed) + }) + + it('sets isDone to true', () => { + ch.close() + ok(ch.isDone) + }) + + it('rejects promises for any pending puts', async () => { + ch = new Channel(new BufferBlocking(0)) + const promise = ch.put({}) + ch.close() + let rejectedErr + try { + await promise + } catch (err) { + rejectedErr = err + } + ok(rejectedErr) + }) + + it('resolves any pending takes with empty value', async () => { + setImmediate(() => ch.close()) + equal(await ch.take(), ch.empty) + }) + }) +}) diff --git a/test/close.js b/test/close.js deleted file mode 100644 index 40dbcb3..0000000 --- a/test/close.js +++ /dev/null @@ -1,68 +0,0 @@ -/* global describe:true, beforeEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') - -describe('A closed channel', function () { - - it( - 'should yield an error when attempting to add a value', - function () { - var ch = chan() - ch.close() - ch('foo')(function (err) { - expect(err).to.be.an(Error) - }) - } - ) - - describe('that is has items in the buffer', function () { - - it( - 'should return `false` when the `done()` method is called', - function () { - var ch = chan(1) - ch('foo') - ch.close() - expect(ch.done()).to.be(false) - } - ) - - }) - - describe('that is empty', function () { - - it( - 'should invoke peding callbacks with empty value', - function () { - var ch = chan() - ch(function (err, value) { - expect(value).to.be(ch.empty) - }) - ch.close() - } - ) - - it( - 'should return `true` when the `done()` method is called', - function () { - var ch = chan() - ch.close() - expect(ch.done()).to.be(true) - } - ) - - it( - 'should immediately invoke any callback added with the empty value', - function () { - var ch = chan() - ch.close() - ch(function (err, value) { - expect(value).to.be(ch.empty) - }) - } - ) - - }) - -}) diff --git a/test/interval.js b/test/interval.js deleted file mode 100644 index a836ede..0000000 --- a/test/interval.js +++ /dev/null @@ -1,40 +0,0 @@ -/* jshint expr:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var interval = require('../lib/interval') -var should = require('should') -var sinon = require('sinon') - -describe('Interval channel make', function () { - - it('should return a function', function () { - var int = interval(500) - int.should.be.a.Function - }) - - it('should should call the callback after a number of ms', function () { - var clock = sinon.useFakeTimers() - var cb = sinon.spy() - var ms = 500 - var int = interval(ms) - int(cb) - clock.tick(ms - 1) - cb.called.should.be.false - clock.tick(1) - cb.called.should.be.true - }) - - it('should call the callback after number of ms', function () { - var clock = sinon.useFakeTimers() - var cb = sinon.spy() - var ms = 500 - var int = interval(ms) - int(cb) - clock.tick(ms - 1) - cb.called.should.be.false - clock.tick(1) - cb.called.should.be.true - }) - -}) - diff --git a/test/select.js b/test/select.js deleted file mode 100644 index bbcc251..0000000 --- a/test/select.js +++ /dev/null @@ -1,143 +0,0 @@ -/* jshint loopfunc:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var chan = require('..') -var expect = require('expect.js') - -describe('Channel select', function () { - var random - beforeEach(function (done) { - // save Math.random - random = Math.random - done() - }) - - afterEach(function (done) { - // restore Math.random - Math.random = random - done() - }) - - it( - 'should be able to select on channels', - function (done) { - var ch1 = chan() - var ch2 = chan() - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.equal(ch2) - ch2.selected(function (err, val) { - expect(val).to.equal(42) - done() - }) - }) - ch2(42) - } - ) - - it( - 'should be able to select on multiple channels', - function (done) { - var chs = [chan(), chan()] - var remaining = chs.length - chs.forEach(function (needle, i) { - chan.select.apply(null, chs)(function (err, ch) { - expect(ch).to.equal(needle) - ch.selected(function (err, val) { - expect(val).to.equal(i*10) - if (--remaining === 0) { - done() - } - }) - }) - }) - chs.forEach(function (ch, i) { - ch(i*10) - }) - } - ) - - it( - 'should be able to select with queued messages', - function (done) { - var chs = [chan(), chan()] - var remaining = chs.length - var i = -1 - while (++i < 10) { - (function (i) { - chan.select.apply(null, chs)(function (err, ch) { - expect(ch).to.equal(chs[0]) - ch.selected(function (err, val) { - expect(val).to.equal(i * 10) - if (--remaining === 0) { - done() - } - }) - }) - })(i) - } - var j = -1 - while (++j < 10) { - chs[0](j * 10) - } - } - ) - - it( - 'should be able to select with existing messages on the channels', - function (done) { - var ch1 = chan() - var ch2 = chan() - ch2(42) - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.equal(ch2) - ch2.selected(function (err, val) { - expect(val).to.equal(42) - done() - }) - }) - } - ) - - it( - 'should randomly choose a channel to return with multiple full channels', - function (done) { - var ch1 = chan() - var ch2 = chan() - - // force the random selection to be the second channel - Math.random = function () { return 0.5 } - - // fill up both the channels - ch1(21) - ch2(42) - - // random selection should choose the second channel "randomly" - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.equal(ch2) - ch2.selected(function (err, val) { - expect(val).to.equal(42) - done() - }) - }) - } - ) - - it ( - 'should wait for previously queued callbacks before selecting', - function (done) { - var ch1 = chan() - var ch2 = chan() - - // queue a callback for ch1 - ch1(function () {}) - - chan.select(ch1, ch2)(function (err, ch) { - expect(ch).to.be(ch2) - done() - }) - - ch1(74) - ch2(47) - } - ) -}) diff --git a/test/timeout.js b/test/timeout.js deleted file mode 100644 index 8723ccf..0000000 --- a/test/timeout.js +++ /dev/null @@ -1,27 +0,0 @@ -/* jshint expr:true */ -/* global describe:true, beforeEach:true, afterEach:true, it:true */ - -var timeout = require('../lib/timeout') -var should = require('should') -var sinon = require('sinon') - -describe('Timeout channel make', function () { - - it('should return a function', function () { - var to = timeout(500) - to.should.be.a.Function - }) - - it('should should call the callback after a number of ms', function () { - var clock = sinon.useFakeTimers() - var cb = sinon.spy() - var ms = 500 - var to = timeout(ms) - to(cb) - clock.tick(ms - 1) - cb.called.should.be.false - clock.tick(1) - cb.called.should.be.true - }) - -}) diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..207b277 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compileOnSave": false, + "compilerOptions": { + "strictNullChecks": true, + "module": "commonjs", + "outDir": "dist", + "sourceMap": true, + "declaration": true, + "declarationDir": "types", + "target": "es6" + }, + "include": [ + "src/**/*.ts" + ] +} \ No newline at end of file diff --git a/tslint.json b/tslint.json new file mode 100644 index 0000000..e9b8434 --- /dev/null +++ b/tslint.json @@ -0,0 +1,8 @@ +{ + "extends": [ + "tslint-config-standard" + ], + "rules": { + "no-conditional-assignment": false + } +} \ No newline at end of file