From c2cac4686b91bc0280465b3f810e798cc3129d31 Mon Sep 17 00:00:00 2001 From: Jovan Gerodetti Date: Thu, 4 Feb 2021 00:13:05 +0100 Subject: [PATCH 1/4] [Threads] implement a toolset to work with threads #45 --- tests/Gulpfile.js | 1 + tests/main.js | 2 + tests/threading/CurrentThread.js | 47 ++++++ tests/threading/Thread.js | 236 ++++++++++++++++++++++++++++ tests/threading/index.js | 6 + threading/index.js | 3 + threading/io.js | 2 + threading/lib/CurrentThread.js | 172 ++++++++++++++++++++ threading/lib/CurrentThreadStore.js | 30 ++++ threading/lib/IOThread.js | 11 ++ threading/lib/MultiThreadEvent.js | 30 ++++ threading/lib/Thread.js | 209 ++++++++++++++++++++++++ threading/lib/messages.js | 5 + threading/traits.js | 4 + util/deepCopy.js | 23 +++ 15 files changed, 781 insertions(+) create mode 100644 tests/threading/CurrentThread.js create mode 100644 tests/threading/Thread.js create mode 100644 tests/threading/index.js create mode 100644 threading/index.js create mode 100644 threading/io.js create mode 100644 threading/lib/CurrentThread.js create mode 100644 threading/lib/CurrentThreadStore.js create mode 100644 threading/lib/IOThread.js create mode 100644 threading/lib/MultiThreadEvent.js create mode 100644 threading/lib/Thread.js create mode 100644 threading/lib/messages.js create mode 100644 threading/traits.js create mode 100644 util/deepCopy.js diff --git a/tests/Gulpfile.js b/tests/Gulpfile.js index 3416bff..c68d64c 100755 --- a/tests/Gulpfile.js +++ b/tests/Gulpfile.js @@ -29,6 +29,7 @@ const task_default = function() { 'web/**/*.js', 'ServiceWorker/**/*.js', 'traits/**/*.js', + 'threading/**/*.js', ], { base: './', }) .pipe(sourcemaps.init()) .pipe(babel(babelConfig)) diff --git a/tests/main.js b/tests/main.js index ee8df1b..ca47796 100755 --- a/tests/main.js +++ b/tests/main.js @@ -79,6 +79,8 @@ require('./util/array'); require('./rendering'); +require('./threading'); + require('./features'); describe('IndexedDB Driver', () => { diff --git a/tests/threading/CurrentThread.js b/tests/threading/CurrentThread.js new file mode 100644 index 0000000..5da38f1 --- /dev/null +++ b/tests/threading/CurrentThread.js @@ -0,0 +1,47 @@ +/* eslint-env mocha */ + +const expect = require('chai').expect; +const mochaVM = require('../../node/mochaVM'); + +module.exports = function() { + const vm = mochaVM({}); + + mochaVM.applyNodeEnv(vm); + + vm.updateContext({ + self: vm.getContext(), + + addEventListener() {}, + postMessage() {}, + + BroadcastChannel: function(name) { // eslint-disable-line object-shorthand + this.name = name; + }, + + Worker: function(sourcePath) { // eslint-disable-line object-shorthand + this.source = sourcePath; + + this.postMessage = function() {}; + }, + + MessagePort: function() { // eslint-disable-line object-shorthand + this.postMessage = function() {}; + + this.onmessage = {}; + } + }); + + vm.runModule('../../testable/threading/lib/CurrentThread.js'); + + it('should be able to bootstrap the current thread', () => { + const { testResult } = vm.apply(() => { + /* globals CurrentThread */ + + CurrentThread.bootstrap(); + + global.testResult = CurrentThread; + }); + + expect(testResult).to.have.property('mainThread'); + }); +}; diff --git a/tests/threading/Thread.js b/tests/threading/Thread.js new file mode 100644 index 0000000..e189e12 --- /dev/null +++ b/tests/threading/Thread.js @@ -0,0 +1,236 @@ +/* eslint-env mocha */ + +const expect = require('chai').expect; +const mochaVM = require('../../node/mochaVM'); + +module.exports = function() { + const vm = mochaVM({}); + + mochaVM.applyNodeEnv(vm); + + vm.updateContext({ + self: vm.getContext(), + + addEventListener() {}, + postMessage() {}, + + BroadcastChannel: function(name) { // eslint-disable-line object-shorthand + this.name = name; + }, + + Worker: function(sourcePath) { // eslint-disable-line object-shorthand + this.source = sourcePath; + + this.postMessage = function() {}; + }, + + MessagePort: function() { // eslint-disable-line object-shorthand + this.postMessage = function() {}; + + this.onmessage = {}; + }, + + setTimeout(...args) { + return setTimeout(...args); + } + }); + + vm.runModule('../../testable/threading/lib/Thread.js'); + + it('should create a new thread', () => { + const { testResult, testContext } = vm.apply(() => { + /* globals Thread */ + + const thread = Object.create(Thread).constructor('./test-thread.js'); + + global.testResult = thread; + global.testContext = { Thread }; + }); + + expect(testResult).to.have.property('__proto__', testContext.Thread); + }); + + it('should be able to create a thread from strings', () => { + const { testResult, testContext } = vm.apply(() => { + const thread = Thread.from('channels/shared'); + + global.testResult = thread; + global.testContext = { Thread }; + }); + + expect(testResult).to.have.property('__proto__', testContext.Thread); + }); + + it('should throw if no MessageChannel is provided', () => { + const { testResult } = vm.apply(() => { + global.testResult = () => Thread.from({ invalid: true }); + }); + + expect(testResult).to.throw(); + }); + + it('should create a thread from a message port', () => { + const { testResult, testContext } = vm.apply(() => { + const thread = Thread.from(new MessagePort()); + + global.testResult = thread; + global.testContext = { Thread }; + }); + + expect(testResult).to.have.property('__proto__', testContext.Thread); + }); + + it('should not return a then method, we are not a promise', () => { + const { testResult } = vm.apply(() => { + const thread = Object.create(Thread).constructor('./test-thread.js'); + + global.testResult = thread.then; + }); + + expect(testResult).to.be.null; + }); + + it('should return the original property value, if it exists', () => { + const { testResult, testContext } = vm.apply(() => { + const thread = Object.create(Thread).constructor('./test-thread.js'); + + global.testResult = thread.call; + global.testContext = { Thread }; + }); + + expect(testResult).to.be.equal(testContext.Thread.call); + }); + + it('should try to invoke the remote method', () => { + const { testResult } = vm.apply(() => { + const thread = Object.create(Thread).constructor('./test-thread.js'); + + global.testResult = thread.doSomething(); + }); + + expect(testResult).to.have.property('then'); + expect(testResult).to.have.property('catch'); + }); + + it('should not copy transfered arguments', () => { + const { testResult, testContext } = vm.apply(() => { + const arg1 = { content: 'test1' }; + const arg2 = { content: 'test2' }; + + global.testContext = { arg1, arg2 }; + + const worker = new Worker('./test-thread.js'); + + worker.postMessage = function(...args) { + global.testResult = args; + }; + + const thread = Object.create(Thread).constructor(worker); + + thread.call('test', [arg1, arg2], [arg2]); + }); + + expect(testResult).to.have.property('0').which.does.deep.include({ args: [testContext.arg1, testContext.arg2] }); + expect(testResult).to.have.property('1').which.does.deep.equal([testContext.arg2]); + }); + + it('should emit thread events', () => { + const { testResult } = vm.apply(() => { + const worker = new Worker('./test-thread.js'); + const thread = Object.create(Thread).constructor(worker); + + global.testResult = new Promise((resolve, reject) => { + thread.on('test-event', data => resolve(data)); + + worker.onmessage({ data: { type: 'THREAD_MESSAGE_EVENT', name: 'test-event', data: { a: 1, b: 2} } }); + + setTimeout(() => reject(), 500); + }); + }); + + expect(testResult).to.have.property('then'); + expect(testResult).to.have.property('catch'); + + return testResult + .then(data => expect(data).to.be.deep.equal({ a: 1, b: 2 })) + .catch(() => expect.fail('eventhandler timedout')); + }); + + it('should resolve an async remote call', () => { + const { testResult } = vm.apply(() => { + const worker = new Worker('./test-thread.js'); + const thread = Object.create(Thread).constructor(worker); + + worker.postMessage = function(event) { + worker.onmessage({ data: { type: 'THREAD_MESSAGE_RETURN_VALUE', data: { transaction: event.transaction, return: true }, } }); + }; + + global.testResult = thread.call('method', [1, 2, 3]); + }); + + expect(testResult).to.have.property('then'); + expect(testResult).to.have.property('catch'); + + const timeout = setTimeout(() => expect.fail('async resolve timedout'), 500); + + return testResult.then( + data => (clearTimeout(timeout), expect(data).to.be.true), + () => (clearTimeout(timeout), expect.fail('remote call failed')) + ); + }); + + it('should reject an async remote call that threw', () => { + const { testResult } = vm.apply(() => { + const worker = new Worker('./test-thread.js'); + const thread = Object.create(Thread).constructor(worker); + + worker.postMessage = function(event) { + worker.onmessage({ data: { type: 'THREAD_MESSAGE_RETURN_VALUE', data: { transaction: event.transaction, error: true }, } }); + }; + + global.testResult = thread.call('method', [1, 2, 3]); + }); + + expect(testResult).to.have.property('then'); + expect(testResult).to.have.property('catch'); + + const timeout = setTimeout(() => expect.fail('async resolve timedout'), 500); + + return testResult + .then(() => { + clearTimeout(timeout); + + return expect.fail('remote call should throw'); + }, (error) => { + clearTimeout(timeout); + + return expect(error).to.be.true; + }); + }); + + it('should do nothing for unkown events', (done) => { + const { testResult } = vm.apply(() => { + const worker = new Worker('./test-thread.js'); + const thread = Object.create(Thread).constructor(worker); + + worker.postMessage = function(event) { + worker.onmessage({ data: { type: 'THREAD_MESSAGE_UNKOWN', data: { transaction: event.transaction, return: true }, } }); + }; + + global.testResult = thread.call('method', [1, 2, 3]); + }); + + expect(testResult).to.have.property('then'); + expect(testResult).to.have.property('catch'); + + const timeout = setTimeout(() => { + expect(true).to.be.true; + done(); + }, 300); + + testResult.then( + () => (clearTimeout(timeout), expect.fail('async call should not resolve')), + () => (clearTimeout(timeout), expect.fail('event handler should not fail')) + ).catch(done); + }); +}; diff --git a/tests/threading/index.js b/tests/threading/index.js new file mode 100644 index 0000000..eb6e2c2 --- /dev/null +++ b/tests/threading/index.js @@ -0,0 +1,6 @@ +/* eslint-env mocha */ + +describe('Threading', () => { + require('./Thread')(); + require('./CurrentThread')(); +}); diff --git a/threading/index.js b/threading/index.js new file mode 100644 index 0000000..af8163a --- /dev/null +++ b/threading/index.js @@ -0,0 +1,3 @@ +export { default as CurrentThread } from './lib/CurrentThread'; +export { default as Thread } from './lib/Thread'; +export { MultiThreadEvent } from './lib/MultiThreadEvent'; diff --git a/threading/io.js b/threading/io.js new file mode 100644 index 0000000..bcab4e7 --- /dev/null +++ b/threading/io.js @@ -0,0 +1,2 @@ +export * from './index'; +export { default as IOThread } from './lib/IOThread'; diff --git a/threading/lib/CurrentThread.js b/threading/lib/CurrentThread.js new file mode 100644 index 0000000..1fecadc --- /dev/null +++ b/threading/lib/CurrentThread.js @@ -0,0 +1,172 @@ +import { + MESSAGE_TYPE_CALL, MESSAGE_TYPE_CALLBACK, MESSAGE_TYPE_EVENT, + MESSAGE_TYPE_RETURN_VALUE, MESSAGE_TYPE_PARENT_INJECT +} from './messages'; + +import uuid from 'uuid'; +import { Thread, pWorker, pPostMessage, Callbacks as ThreadCallbacks } from './Thread'; +import CurrentThreadStore from './CurrentThreadStore'; + +const IS_WORKER = (!!self.importScripts && !self.document); +const { create } = Object; + +const pBroadcastTargets = Symbol('CurrentThread.broadcastTargets'); +const pConfig = Symbol('CurrentThread.config'); +const pParent = Symbol('CurrentThread.parent'); +const pSetupInterfaces = Symbol('CurrentThread.setupInterfaces()'); +const pCallbacks = Symbol('CurrentThread.setupInterfaces()'); + +const Callbacks = { + onCallHandler: Symbol('CurrentThread.onCallHandler'), + onCallbackHandler: Symbol('CurrentThread.onCallbackHandler'), + onParentInjectHandler: Symbol('CurrentThread.onParentInjectHandler'), + + __proto__: ThreadCallbacks, +}; + +export const CurrentThread = { + + /** @type {Map.} */ + [pCallbacks]: null, + + [pParent]: null, + [pConfig]: null, + [pBroadcastTargets]: null, + + constructor: undefined, + + interfaces: [], + + /** @type {Thread} */ + mainThread: null, + + /** @type {Thread} */ + get parent() { + if (!this[pParent]) { + throw new Error('Thread has not been properly bootstrapped!'); + } + + return this[pParent]; + }, + + [pSetupInterfaces]() { + this.interfaces = this.interfaces.map(item => create(item)).reverse(); + }, + + [Callbacks.onProcessMessage](event) { + const { type } = event.data; + + if (type === MESSAGE_TYPE_CALL) { + const { name, args, transaction } = event.data; + + return this[Callbacks.onCallHandler](name, args, transaction); + } + + if (type === MESSAGE_TYPE_CALLBACK) { + const { callbackId, args } = event.data; + + return this[Callbacks.onCallbackHandler](callbackId, args); + } + + if (type === MESSAGE_TYPE_PARENT_INJECT) { + const { parent } = event.data; + + return this[Callbacks.onCallbackHandler](parent); + } + }, + + [Callbacks.onCallHandler](name, params, transaction) { + const responsibleInterface = this.interfaces.find(interfacce => !!interfacce[name]); + + if (!responsibleInterface) { + throw new Error(`no interface declared the method ${name}!`); + } + + return Promise.resolve(responsibleInterface[name](...params)) + .then(result => { + this[pPostMessage]({ type: MESSAGE_TYPE_RETURN_VALUE, return: result, transaction }); + }).catch(error => { + this[pPostMessage]({ type: MESSAGE_TYPE_RETURN_VALUE, error, transaction }); + }); + }, + + [Callbacks.onCallbackHandler](id, args) { + if (!this[pCallbacks].has(id)) { + throw `unable to invoke ${id}!`; + } + + this[pCallbacks].get(id).apply(null, args); + }, + + [Callbacks.onCallbackHandler](parent) { + this[pParent] = Thread.from(parent); + + if (this[pConfig].init) { + this[pConfig].init(); + } + }, + + [pPostMessage](message, transfers) { + this[pBroadcastTargets].forEach(target => target.port1.postMessage(message)); + + return this[pWorker].postMessage(message, transfers); + }, + + registerCallback(callback) { + const id = `Callback<${uuid()}>`; + + this[pCallbacks].set(id, callback); + + return id; + }, + + dispatchEvent(name, data) { + this[pPostMessage]({ + type: MESSAGE_TYPE_EVENT, + name, data + }); + }, + + bootstrap(...args) { + this[pSetupInterfaces](); + this[pCallbacks] = new Map(); + this[pBroadcastTargets] = []; + + if (!IS_WORKER) { + this[pWorker] = new BroadcastChannel('threads/io'); + this[pWorker].onmessage = this[Callbacks.onProcessMessage].bind(this); + this[pWorker].onerror = console.error.bind(console); + + CurrentThreadStore.set(this); + + const mainThread = create(Thread).constructor(args[0]); + + this.mainThread = mainThread; + + return mainThread; + } + + this[pWorker] = self; + this[pWorker].onmessage = this[Callbacks.onProcessMessage].bind(this); + this[pWorker].onerror = console.error.bind(console); + this[pConfig] = args[0] || {}; + + CurrentThreadStore.set(this); + this.emit(Thread.Events.bootstrapping); + }, + + publish(identifier) { + const channel = new BroadcastChannel(identifier); + const oldPostMessage = this[pPostMessage]; + + channel.onmessage = this[Callbacks.onProcessMessage].bind(this); + + this[pPostMessage] = function(...args) { + channel.postMessage(...args); + + return oldPostMessage.apply(this, args); + }; + }, +}; + +export default CurrentThread; diff --git a/threading/lib/CurrentThreadStore.js b/threading/lib/CurrentThreadStore.js new file mode 100644 index 0000000..50931e4 --- /dev/null +++ b/threading/lib/CurrentThreadStore.js @@ -0,0 +1,30 @@ + +/** @type {CurrentThread} **/ +let value = null; + +export const CurrentThreadStore = { + + /** + * @return {CurrentThread} + */ + get() { + if (!value) { + throw new Error('CurrentThread.bootstrap() has to be called before any thread can be created!'); + } + + return value; + }, + + /** + * @param {CurrentThread} ct + */ + set(ct) { + if (value) { + throw new Error('A Thread has already been bootstraped!'); + } + + value = ct; + } +}; + +export default CurrentThreadStore; diff --git a/threading/lib/IOThread.js b/threading/lib/IOThread.js new file mode 100644 index 0000000..ba8b525 --- /dev/null +++ b/threading/lib/IOThread.js @@ -0,0 +1,11 @@ +import Thread from './Thread'; + +const IS_WORKER = (!!self.importScripts && !self.document); + +if (!IS_WORKER) { + throw new Error('IOThread can\'t be used inside the io thread itself! Use CurrentThread instead!'); +} + +export const IOThread = Thread.from('threads/io'); + +export default IOThread; diff --git a/threading/lib/MultiThreadEvent.js b/threading/lib/MultiThreadEvent.js new file mode 100644 index 0000000..88da95a --- /dev/null +++ b/threading/lib/MultiThreadEvent.js @@ -0,0 +1,30 @@ +import Observable from '../../core/Observable'; + +const pThread = Symbol('MultiThreadEvent.thread'); + +export const MultiThreadEvent = { + + name: '', + [pThread]: null, + + constructor(name, thread) { + super.constructor(); + + this[pThread] = thread; + this.name = name; + + this[pThread].on(name, (data) => { + super.emit(name, data); + }); + + return this; + }, + + emit(data) { + this[pThread].emit(this.name, data); + }, + + __proto__: Observable, +}; + +export default MultiThreadEvent; diff --git a/threading/lib/Thread.js b/threading/lib/Thread.js new file mode 100644 index 0000000..8716cab --- /dev/null +++ b/threading/lib/Thread.js @@ -0,0 +1,209 @@ +import uuid from 'uuid'; +import deepCopy from '../../util/deepCopy'; +import { MessagePortTrait } from '../traits'; +import { + MESSAGE_TYPE_CALL, MESSAGE_TYPE_CALLBACK, MESSAGE_TYPE_EVENT, + MESSAGE_TYPE_RETURN_VALUE, MESSAGE_TYPE_PARENT_INJECT +} from './messages'; +import validateTrait from '../../core/validateTrait'; +import CurrentThreadStore from './CurrentThreadStore'; +import Observable from '../../core/Observable'; +import { pBroadcastTargets } from './CurrentThread'; + +export const pWorker = Symbol('Thread.worker'); +export const pPostMessage = Symbol('Thread.postMessage()'); + +const pTransactions = Symbol('Thread.transactions'); +const pCreateInterface = Symbol('Thread.createInterface()'); + +const getPropertyValue = function(source, property, target) { + do { + if (!source.hasOwnProperty(property)) { + continue; + } + + const desc = Object.getOwnPropertyDescriptor(source, property); + + if (desc.get) { + return desc.get.apply(target); + } + + return desc.value; + } while ((source = Object.getPrototypeOf(source))); +}; + +const constructThread = function(worker, instance) { + instance[pWorker] = worker; + instance[pTransactions] = new Map(); + instance.observe(instance); + instance.ready = new Promise(resolve => instance.on(Observer.onReady, resolve)); + + instance[pWorker].onmessage = instance[Callbacks.onProcessMessage].bind(instance); + instance[pWorker].onerror = console.error.bind(console); + + return instance[pCreateInterface](); +}; + +export const Events = { + bootstrapping: Symbol.for('Thread.Events.bootstrapping'), + ready: Symbol.for('Thread.Events.ready'), +}; + +export const Observer = { + onBootstrapping: Events.bootstrapping, + onReady: Events.ready, +}; + +export const Callbacks = { + onProcessMessage: Symbol('Thread.Callbacks.onProcessMessage'), +}; + +export const Thread = { + Observer, + Events, + + /** @type {Worker} **/ + [pWorker]: null, + + /** @type {Map.} **/ + [pTransactions]: null, + + [pCreateInterface]() { + const proxy = new Proxy (this, { + get(target, property, current) { + + // make it clear that we are not a promise + if (property === 'then') { + return null; + } + + // we have to call a potential getter on the current object and not the target, + // otherwise the `this` inside the getter will point to the + // target and not to our actual current object + if (property in target) { + return getPropertyValue(target, property, current); + } + + return (...args) => current.call(property, args); + } + }); + + return proxy; + }, + + [pPostMessage](message, transfers) { + return this[pWorker].postMessage(message, transfers); + }, + + [Callbacks.onProcessMessage](event) { + const { type, name, data } = event.data; + + if (type === MESSAGE_TYPE_RETURN_VALUE) { + if (!this[pTransactions].has(data.transaction)) { + return; + } + + const resolver = this[pTransactions].get(data.transaction); + + if (data.error) { + return resolver.reject(data.error); + } + + return resolver.resolve(data.return); + } + + if (type !== MESSAGE_TYPE_EVENT) { + return; + } + + this.emit(name, data); + }, + + [Observer.onBootstrapping]() { + /** @type {CurrentThread} */ + const parentThread = CurrentThreadStore.get(); + const channel = new MessageChannel(); + + channel.port1.onmessage = parentThread[Callbacks.onProcessMessage].bind(parentThread); + channel.port1.onerror = parentThread[Callbacks.onProcessMessage].bind(parentThread); + + parentThread[pBroadcastTargets].push(channel); + + this[pPostMessage]({ type: MESSAGE_TYPE_PARENT_INJECT, parent: channel.port2 }, [channel.port2]); + this.emit(Events.ready); + }, + + /** + * Calls a method on the remote thread. + * + * @param {string} name + * @param {object[]} args + * @param {object[]} transfers defines which arguments should be transfered instead of copied + * + * @return {Promise. { + const transaction = uuid(); + const mappedArgs = args.map(item => transfers.includes(item) ? item : deepCopy(item)); + + this[pTransactions].set(transaction, { resolve, reject }); + + this[pPostMessage]({ type: MESSAGE_TYPE_CALL, name, args: mappedArgs, transaction }, transfers); + }); + }, + + /** + * calls a callback function in the remote thread + * + * @param {string} callbackId + * @param {object[]} args + * + * @return {undefined} + */ + invokeCallback(callbackId, args) { + const mappedArgs = args.map(item => deepCopy(item)); + + this[pPostMessage]({ type: MESSAGE_TYPE_CALLBACK, callbackId, args: mappedArgs }); + }, + + /** + * creates a new thread interface from a script source URL + * + * @param {string|Worker} source + * + * @return {Proxy.} + */ + constructor(source = '') { + super.constructor(); + + const worker = typeof source === 'string' ? new Worker(source) : source; + + return constructThread(worker, this); + }, + + /** + * creates a local thread interface from an existing port + * + * @param {MessagePortTrait|string} port + * + * @return {Proxy.} + */ + from(port) { + let worker = null; + + if (typeof port === 'string') { + worker = new BroadcastChannel(port); + } else if (validateTrait(port, MessagePortTrait)) { + worker = port; + } else { + throw new Error(`unable to create Thread from ${port.toString()}`); + } + + return Object.create(this).constructor(worker); + }, + + __proto__: Observable, +}; + +export default Thread; diff --git a/threading/lib/messages.js b/threading/lib/messages.js new file mode 100644 index 0000000..6fd943f --- /dev/null +++ b/threading/lib/messages.js @@ -0,0 +1,5 @@ +export const MESSAGE_TYPE_CALL = 'THREAD_MESSAGE_CALL'; +export const MESSAGE_TYPE_RETURN_VALUE = 'THREAD_MESSAGE_RETURN_VALUE'; +export const MESSAGE_TYPE_CALLBACK = 'THREAD_MESSAGE_CALLBACK'; +export const MESSAGE_TYPE_EVENT = 'THREAD_MESSAGE_EVENT'; +export const MESSAGE_TYPE_PARENT_INJECT = 'THREAD_MESSAGE_PARENT_INJECT'; diff --git a/threading/traits.js b/threading/traits.js new file mode 100644 index 0000000..d3a8b75 --- /dev/null +++ b/threading/traits.js @@ -0,0 +1,4 @@ +export const MessagePortTrait = { + postMessage: 'function', + onmessage: 'object' +}; diff --git a/util/deepCopy.js b/util/deepCopy.js new file mode 100644 index 0000000..f75edb4 --- /dev/null +++ b/util/deepCopy.js @@ -0,0 +1,23 @@ +export const deepCopy = function(value) { + if (typeof value === 'function') { + return; + } + + if (!value || typeof value !== 'object') { + return value; + } + + if (Array.isArray(value)) { + return value.map((item) => deepCopy(item)); + } + + const newValue = {}; + + for (const key in value) { + newValue[key] = deepCopy(value[key]); + } + + return newValue; +}; + +export default deepCopy; From 1a658e6732efc6f2252fe2eeefe8d105a2f1febd Mon Sep 17 00:00:00 2001 From: Jovan Gerodetti Date: Mon, 13 Dec 2021 16:54:07 +0100 Subject: [PATCH 2/4] more testing --- tests/threading/Thread.js | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/threading/Thread.js b/tests/threading/Thread.js index e189e12..20330b8 100644 --- a/tests/threading/Thread.js +++ b/tests/threading/Thread.js @@ -233,4 +233,35 @@ module.exports = function() { () => (clearTimeout(timeout), expect.fail('event handler should not fail')) ).catch(done); }); + + it('should ignore function return events for unknown transactions', () => { + const { testResult } = vm.apply(() => { + const worker = new Worker('./test-thread.js'); + + Object.create(Thread).constructor(worker); + + global.testResult = worker.onmessage({ data: { type: 'THREAD_MESSAGE_RETURN_VALUE', data: { transaction: 123 } } }); + }); + + expect(testResult).to.be.undefined; + }); + + it('should post a callback event when invoking a callback', () => { + const callbackId = 12343; + + const { testResult } = vm.apply(() => { + const callbackId = 12343; + const worker = new Worker('./test-thread.js'); + + const thread = Object.create(Thread).constructor(worker); + + worker.postMessage = function(message) { + global.testResult = message; + }; + + thread.invokeCallback(callbackId, [1, 'test', true]); + }); + + expect(testResult).to.be.deep.equal({ type: 'THREAD_MESSAGE_CALLBACK', callbackId, args: [1, 'test', true] }); + }); }; From c5a7ba0c3749374a61a4bb76298004810c72858a Mon Sep 17 00:00:00 2001 From: Jovan Gerodetti Date: Mon, 13 Dec 2021 17:22:39 +0100 Subject: [PATCH 3/4] eslint fixes --- threading/lib/CurrentThread.js | 5 +++-- threading/lib/CurrentThreadStore.js | 1 + threading/lib/Thread.js | 6 +++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/threading/lib/CurrentThread.js b/threading/lib/CurrentThread.js index 1fecadc..9886128 100644 --- a/threading/lib/CurrentThread.js +++ b/threading/lib/CurrentThread.js @@ -10,7 +10,6 @@ import CurrentThreadStore from './CurrentThreadStore'; const IS_WORKER = (!!self.importScripts && !self.document); const { create } = Object; -const pBroadcastTargets = Symbol('CurrentThread.broadcastTargets'); const pConfig = Symbol('CurrentThread.config'); const pParent = Symbol('CurrentThread.parent'); const pSetupInterfaces = Symbol('CurrentThread.setupInterfaces()'); @@ -24,9 +23,11 @@ const Callbacks = { __proto__: ThreadCallbacks, }; +export const pBroadcastTargets = Symbol('CurrentThread.broadcastTargets'); + export const CurrentThread = { - /** @type {Map.} */ + /** @type {Map.} */ [pCallbacks]: null, [pParent]: null, diff --git a/threading/lib/CurrentThreadStore.js b/threading/lib/CurrentThreadStore.js index 50931e4..6e64974 100644 --- a/threading/lib/CurrentThreadStore.js +++ b/threading/lib/CurrentThreadStore.js @@ -1,3 +1,4 @@ +import CurrentThread from './CurrentThread'; /** @type {CurrentThread} **/ let value = null; diff --git a/threading/lib/Thread.js b/threading/lib/Thread.js index 8716cab..38ae046 100644 --- a/threading/lib/Thread.js +++ b/threading/lib/Thread.js @@ -8,7 +8,7 @@ import { import validateTrait from '../../core/validateTrait'; import CurrentThreadStore from './CurrentThreadStore'; import Observable from '../../core/Observable'; -import { pBroadcastTargets } from './CurrentThread'; +import { pBroadcastTargets, CurrentThread } from './CurrentThread'; export const pWorker = Symbol('Thread.worker'); export const pPostMessage = Symbol('Thread.postMessage()'); @@ -65,7 +65,7 @@ export const Thread = { /** @type {Worker} **/ [pWorker]: null, - /** @type {Map.} **/ + /** @type {Map.} **/ [pTransactions]: null, [pCreateInterface]() { @@ -140,7 +140,7 @@ export const Thread = { * @param {object[]} args * @param {object[]} transfers defines which arguments should be transfered instead of copied * - * @return {Promise.} the return value of the function */ call(name, args, transfers = []) { return new Promise((resolve, reject) => { From 14d37bee5d2679c357775bcc7526a85580a15f4a Mon Sep 17 00:00:00 2001 From: Jovan Gerodetti Date: Fri, 17 Dec 2021 23:12:18 +0100 Subject: [PATCH 4/4] more tests --- tests/threading/CurrentThread.js | 80 +++++++++++++++++++++++++++++--- tests/threading/Thread.js | 38 ++++++++++++++- tests/threading/index.js | 9 +++- threading/lib/CurrentThread.js | 24 +++++----- threading/lib/Thread.js | 24 ++++++---- threading/lib/messages.js | 1 + threading/traits.js | 2 +- 7 files changed, 146 insertions(+), 32 deletions(-) diff --git a/tests/threading/CurrentThread.js b/tests/threading/CurrentThread.js index 5da38f1..937905e 100644 --- a/tests/threading/CurrentThread.js +++ b/tests/threading/CurrentThread.js @@ -3,7 +3,7 @@ const expect = require('chai').expect; const mochaVM = require('../../node/mochaVM'); -module.exports = function() { +const createVM = function() { const vm = mochaVM({}); mochaVM.applyNodeEnv(vm); @@ -27,21 +27,89 @@ module.exports = function() { MessagePort: function() { // eslint-disable-line object-shorthand this.postMessage = function() {}; - this.onmessage = {}; + this.onmessage = function() {}; } }); + return vm; +}; + +module.exports = function() { + const vm = createVM(); + vm.runModule('../../testable/threading/lib/CurrentThread.js'); it('should be able to bootstrap the current thread', () => { - const { testResult } = vm.apply(() => { - /* globals CurrentThread */ - + const { testResult } = vm.apply((CurrentThread) => { CurrentThread.bootstrap(); global.testResult = CurrentThread; - }); + }, ['CurrentThread']); expect(testResult).to.have.property('mainThread'); }); + + it('should register callback methods', () => { + const { testResult } = vm.apply((CurrentThread, pCallbacks) => { + const callback = () => {}; + const id = CurrentThread.registerCallback(callback); + + global.testResult = { callback, id, CurrentThread, pCallbacks }; + }, ['CurrentThread', 'pCallbacks']); + + expect(testResult.CurrentThread[testResult.pCallbacks].get(testResult.id)) + .to.be.equal(testResult.callback); + }); + + it('should throw if parent hasn\'t been set', () => { + vm.updateContext({ testResult: null, }); + + const { testResult } = vm.apply((CurrentThread) => { + try { + return CurrentThread.parent; + } catch (e) { + global.testResult = e; + } + }, ['CurrentThread']); + + expect(testResult).to.be.an('error'); + }); + + it('should set the parent when injection event is received', () => { + const vm = createVM(); + + vm.updateContext({ onmessage() {}, importScripts() {} }); + vm.runModule('../../testable/threading/lib/CurrentThread.js'); + + const { testResult } = vm.apply((CurrentThread) => { + const parent = new MessagePort(); + + CurrentThread.bootstrap(); + self.onmessage({ data: { type: 'THREAD_MESSAGE_PARENT_INJECT', parent } }); + + global.testResult = { parent: CurrentThread.parent }; + }, ['CurrentThread']); + + expect(testResult.parent) + .to.have.property('ready'); + expect(testResult.parent) + .to.have.property('call'); + }); + + it('should call a callback by it\'s id', () => { + const vm = createVM(); + + vm.updateContext({ onmessage() {}, importScripts() {}, testResult: false }); + vm.runModule('../../testable/threading/lib/CurrentThread.js'); + + const { testResult } = vm.apply((CurrentThread) => { + CurrentThread.bootstrap(); + + const callbackId = CurrentThread.registerCallback(() => { + global.testResult = true; + }); + + self.onmessage({ data: { type: 'THREAD_MESSAGE_CALLBACK', callbackId, args: [] } }); + }, ['CurrentThread']); + }); }; diff --git a/tests/threading/Thread.js b/tests/threading/Thread.js index 20330b8..1cbb279 100644 --- a/tests/threading/Thread.js +++ b/tests/threading/Thread.js @@ -27,12 +27,19 @@ module.exports = function() { MessagePort: function() { // eslint-disable-line object-shorthand this.postMessage = function() {}; - this.onmessage = {}; + this.onmessage = function() {}; }, setTimeout(...args) { return setTimeout(...args); - } + }, + + MessageChannel: function() { // eslint-disable-line object-shorthand + const global = vm.getContext(); + + this.port1 = new global.MessagePort(); + this.port2 = new global.MessagePort(); + }, }); vm.runModule('../../testable/threading/lib/Thread.js'); @@ -264,4 +271,31 @@ module.exports = function() { expect(testResult).to.be.deep.equal({ type: 'THREAD_MESSAGE_CALLBACK', callbackId, args: [1, 'test', true] }); }); + + it('should react to the bootstrapping event and handle it', () => { + const { scheduleTask } = require('../../testable/core/tasks.js'); + + const { testResult } = vm.apply((CurrentThreadStore, CurrentThread, pBroadcastTargets) => { + const worker = new Worker('./test-thread.js'); + const thread = Object.create(Thread).constructor(worker); + + global.testResult = { message: null, ready: false }; + + worker.postMessage = function(message) { + global.testResult.message = message; + }; + + CurrentThreadStore.set({ [pBroadcastTargets]: [], __proto__: CurrentThread }); + + thread.on(Thread.Events.ready, () => global.testResult.ready = true); + worker.onmessage({ data: { type: 'THREAD_MESSAGE_BOOTSTRAPING' } }); + }, ['_CurrentThreadStore.default', '_CurrentThread.default', '_CurrentThread.pBroadcastTargets']); + + return scheduleTask(() => { + expect(testResult).to.have.property('ready').which.is.true; + expect(testResult).to.have.property('message') + .which.has.property('type') + .which.is.equal('THREAD_MESSAGE_PARENT_INJECT'); + }); + }); }; diff --git a/tests/threading/index.js b/tests/threading/index.js index eb6e2c2..5f71e0c 100644 --- a/tests/threading/index.js +++ b/tests/threading/index.js @@ -1,6 +1,11 @@ /* eslint-env mocha */ describe('Threading', () => { - require('./Thread')(); - require('./CurrentThread')(); + describe('Thread', () => { + require('./Thread')(); + }); + + describe('CurrentThread', () => { + require('./CurrentThread')(); + }); }); diff --git a/threading/lib/CurrentThread.js b/threading/lib/CurrentThread.js index 9886128..0e59c29 100644 --- a/threading/lib/CurrentThread.js +++ b/threading/lib/CurrentThread.js @@ -1,10 +1,10 @@ import { MESSAGE_TYPE_CALL, MESSAGE_TYPE_CALLBACK, MESSAGE_TYPE_EVENT, - MESSAGE_TYPE_RETURN_VALUE, MESSAGE_TYPE_PARENT_INJECT + MESSAGE_TYPE_RETURN_VALUE, MESSAGE_TYPE_PARENT_INJECT, MESSAGE_TYPE_BOOTSTRAPING, } from './messages'; import uuid from 'uuid'; -import { Thread, pWorker, pPostMessage, Callbacks as ThreadCallbacks } from './Thread'; +import { Thread } from './Thread'; import CurrentThreadStore from './CurrentThreadStore'; const IS_WORKER = (!!self.importScripts && !self.document); @@ -14,22 +14,24 @@ const pConfig = Symbol('CurrentThread.config'); const pParent = Symbol('CurrentThread.parent'); const pSetupInterfaces = Symbol('CurrentThread.setupInterfaces()'); const pCallbacks = Symbol('CurrentThread.setupInterfaces()'); +const pPostMessage = Symbol('CurrentThread.postMessage()'); +const pWorker = Symbol('CurrentThread.worker'); -const Callbacks = { +export const Callbacks = { onCallHandler: Symbol('CurrentThread.onCallHandler'), onCallbackHandler: Symbol('CurrentThread.onCallbackHandler'), onParentInjectHandler: Symbol('CurrentThread.onParentInjectHandler'), - - __proto__: ThreadCallbacks, + onProcessMessage: Symbol('CurrentThread.onProcessMessage'), }; export const pBroadcastTargets = Symbol('CurrentThread.broadcastTargets'); export const CurrentThread = { - /** @type {Map.} */ + /** @type {Map.} **/ [pCallbacks]: null, + [pWorker]: null, [pParent]: null, [pConfig]: null, [pBroadcastTargets]: null, @@ -38,10 +40,10 @@ export const CurrentThread = { interfaces: [], - /** @type {Thread} */ + /** @type {Thread} **/ mainThread: null, - /** @type {Thread} */ + /** @type {Thread} **/ get parent() { if (!this[pParent]) { throw new Error('Thread has not been properly bootstrapped!'); @@ -72,7 +74,7 @@ export const CurrentThread = { if (type === MESSAGE_TYPE_PARENT_INJECT) { const { parent } = event.data; - return this[Callbacks.onCallbackHandler](parent); + return this[Callbacks.onParentInjectHandler](parent); } }, @@ -99,7 +101,7 @@ export const CurrentThread = { this[pCallbacks].get(id).apply(null, args); }, - [Callbacks.onCallbackHandler](parent) { + [Callbacks.onParentInjectHandler](parent) { this[pParent] = Thread.from(parent); if (this[pConfig].init) { @@ -153,7 +155,7 @@ export const CurrentThread = { this[pConfig] = args[0] || {}; CurrentThreadStore.set(this); - this.emit(Thread.Events.bootstrapping); + this[pPostMessage]({ type: MESSAGE_TYPE_BOOTSTRAPING }); }, publish(identifier) { diff --git a/threading/lib/Thread.js b/threading/lib/Thread.js index 38ae046..3c8cb92 100644 --- a/threading/lib/Thread.js +++ b/threading/lib/Thread.js @@ -3,12 +3,12 @@ import deepCopy from '../../util/deepCopy'; import { MessagePortTrait } from '../traits'; import { MESSAGE_TYPE_CALL, MESSAGE_TYPE_CALLBACK, MESSAGE_TYPE_EVENT, - MESSAGE_TYPE_RETURN_VALUE, MESSAGE_TYPE_PARENT_INJECT + MESSAGE_TYPE_RETURN_VALUE, MESSAGE_TYPE_PARENT_INJECT, MESSAGE_TYPE_BOOTSTRAPING, } from './messages'; import validateTrait from '../../core/validateTrait'; import CurrentThreadStore from './CurrentThreadStore'; import Observable from '../../core/Observable'; -import { pBroadcastTargets, CurrentThread } from './CurrentThread'; +import { pBroadcastTargets, CurrentThread, Callbacks as CurrentThreadCallbacks } from './CurrentThread'; export const pWorker = Symbol('Thread.worker'); export const pPostMessage = Symbol('Thread.postMessage()'); @@ -35,8 +35,7 @@ const getPropertyValue = function(source, property, target) { const constructThread = function(worker, instance) { instance[pWorker] = worker; instance[pTransactions] = new Map(); - instance.observe(instance); - instance.ready = new Promise(resolve => instance.on(Observer.onReady, resolve)); + instance.ready = new Promise(resolve => instance.on(Events.ready, resolve)); instance[pWorker].onmessage = instance[Callbacks.onProcessMessage].bind(instance); instance[pWorker].onerror = console.error.bind(console); @@ -45,17 +44,16 @@ const constructThread = function(worker, instance) { }; export const Events = { - bootstrapping: Symbol.for('Thread.Events.bootstrapping'), ready: Symbol.for('Thread.Events.ready'), }; export const Observer = { - onBootstrapping: Events.bootstrapping, onReady: Events.ready, }; export const Callbacks = { onProcessMessage: Symbol('Thread.Callbacks.onProcessMessage'), + onBootstrapping: Symbol('Thread.Callbacks.onBootstrapping'), }; export const Thread = { @@ -112,6 +110,12 @@ export const Thread = { return resolver.resolve(data.return); } + if (type === MESSAGE_TYPE_BOOTSTRAPING) { + this[Callbacks.onBootstrapping](); + + return; + } + if (type !== MESSAGE_TYPE_EVENT) { return; } @@ -119,13 +123,13 @@ export const Thread = { this.emit(name, data); }, - [Observer.onBootstrapping]() { - /** @type {CurrentThread} */ + [Callbacks.onBootstrapping]() { + /** @type {CurrentThread} **/ const parentThread = CurrentThreadStore.get(); const channel = new MessageChannel(); - channel.port1.onmessage = parentThread[Callbacks.onProcessMessage].bind(parentThread); - channel.port1.onerror = parentThread[Callbacks.onProcessMessage].bind(parentThread); + channel.port1.onmessage = parentThread[CurrentThreadCallbacks.onProcessMessage].bind(parentThread); + channel.port1.onerror = parentThread[CurrentThreadCallbacks.onProcessMessage].bind(parentThread); parentThread[pBroadcastTargets].push(channel); diff --git a/threading/lib/messages.js b/threading/lib/messages.js index 6fd943f..9d27ce1 100644 --- a/threading/lib/messages.js +++ b/threading/lib/messages.js @@ -3,3 +3,4 @@ export const MESSAGE_TYPE_RETURN_VALUE = 'THREAD_MESSAGE_RETURN_VALUE'; export const MESSAGE_TYPE_CALLBACK = 'THREAD_MESSAGE_CALLBACK'; export const MESSAGE_TYPE_EVENT = 'THREAD_MESSAGE_EVENT'; export const MESSAGE_TYPE_PARENT_INJECT = 'THREAD_MESSAGE_PARENT_INJECT'; +export const MESSAGE_TYPE_BOOTSTRAPING = 'THREAD_MESSAGE_BOOTSTRAPING'; diff --git a/threading/traits.js b/threading/traits.js index d3a8b75..f372e18 100644 --- a/threading/traits.js +++ b/threading/traits.js @@ -1,4 +1,4 @@ export const MessagePortTrait = { postMessage: 'function', - onmessage: 'object' + onmessage: 'function' };