Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose more ThreadPool functionality through Future.withEventualValue(). #1828

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 136 additions & 129 deletions src/lime/app/Future.hx
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ import lime.utils.Log;
@:noCompletion private var __progressListeners:Array<Int->Int->Void>;

/**
@param work Deprecated; use `Future.withEventualValue()` instead.
@param doWork Deprecated; use `Future.withEventualValue()` instead.
@param useThreads Deprecated; use `Future.withEventualValue()` instead.
**/
public function new(work:WorkFunction<Void->T> = null, useThreads:Bool = false)
public function new(doWork:WorkFunction<Void->T> = null, useThreads:Bool = false)
{
if (work != null)
if (doWork != null)
{
var promise = new Promise<T>();
promise.future = this;

#if (lime_threads && html5)
if (useThreads)
{
work.makePortable();
doWork.makePortable();
}
#end

FutureWork.run(dispatchWorkFunction, work, promise, useThreads ? MULTI_THREADED : SINGLE_THREADED, true);
FutureWork.forMode(useThreads ? MULTI_THREADED : SINGLE_THREADED).run(dispatchWorkFunction, doWork, promise);
}
}

Expand Down Expand Up @@ -201,15 +201,15 @@ import lime.utils.Log;

while (!isComplete && !isError && time <= end)
{
if (FutureWork.activeJobs < 1)
if (FutureWork.totalActiveJobs < 1)
{
Log.error('Cannot block for a Future without a "work" function.');
Log.error('Cannot block for a Future without a "doWork" function.');
return this;
}

if (FutureWork.singleThreadPool != null && FutureWork.singleThreadPool.activeJobs > 0)
if (FutureWork.singleThread.activeJobs > 0)
{
@:privateAccess FutureWork.singleThreadPool.__update(time - prevTime);
@:privateAccess FutureWork.singleThread.threadPool.__update(time - prevTime);
}
else
{
Expand Down Expand Up @@ -309,192 +309,199 @@ import lime.utils.Log;
/**
Creates a `Future` instance which will asynchronously compute a value.

Once `work()` returns a non-null value, the `Future` will finish with that value.
If `work()` throws an error, the `Future` will finish with that error instead.
@param work A function that computes a value of type `T`.
@param state An argument to pass to `work()`. As this may be used on another thread, the
main thread must not access or modify `state` until the `Future` finishes.
@param mode Whether to use real threads as opposed to green threads. Green threads rely
on cooperative multitasking, meaning `work()` must return periodically to allow other code
enough time to run. In these cases, `work()` should return null to signal that it isn't finished.
The provided `doWork` function is the same as as a `ThreadPool` work function.
It will run repeatedly until it calls `sendComplete()` or `sendError()`, and
should aim to do a small fraction of the work each time. This is important in
single-threaded mode to avoid blocking the main thread.

The three output functions:

- `sendComplete()` requires a value of type `T` and resolves the `Future` as complete.
Passing the wrong value causes unspecified behavior.
- `sendError()` takes any value and resolves the `Future` with that error.
- `sendProgress()` requires a `{progress:Int, total:Int}` value. This information will
be sent to any `onProgress` listeners. Any other values will be ignored.
@param doWork The function that performs the work.
@param state An argument to pass to `doWork`. Defaults to `{}`. The same instance will
be passed each time `doWork` is called, allowing it to store data between calls. To avoid
race conditions, the main thread should not access or modify `state` until all work finishes.
@param mode Whether to use real threads (`MULTI_THREADED`) as opposed to green threads (`SINGLE_THREADED`).
In single-threaded mode, it's especially important for `doWork` to return often.
@return A new `Future` instance.
@see https://en.wikipedia.org/wiki/Cooperative_multitasking
@see lime.system.ThreadPool
**/
public static function withEventualValue<T>(work:WorkFunction<State -> Null<T>>, state:State, mode:ThreadMode = #if html5 SINGLE_THREADED #else MULTI_THREADED #end):Future<T>
public static function withEventualValue<T>(doWork:WorkFunction<State->WorkOutput->Void>, ?state:State,
mode:ThreadMode = #if html5 SINGLE_THREADED #else MULTI_THREADED #end):Future<T>
{
var future = new Future<T>();
var promise = new Promise<T>();
promise.future = future;

FutureWork.run(work, state, promise, mode);
FutureWork.forMode(mode).run(doWork, state, promise);

return future;
}

/**
(For backwards compatibility.) Dispatches the given zero-argument function.
**/
@:noCompletion private static function dispatchWorkFunction<T>(work:WorkFunction<Void -> T>):Null<T>
@:noCompletion private static function dispatchWorkFunction<T>(doWork:WorkFunction<Void->T>, output:WorkOutput):Void
{
return work.dispatch();
output.sendComplete(doWork.dispatch());
}
}

/**
The class that handles asynchronous `work` functions passed to `new Future()`.
Return values for work functions used with `Future.withEventualValue()`,
used to describe the state of the work.
**/
enum FutureStatus<T>
{
/**
Resolves the `Future` with a completion state. The work function won't be called again.
**/
Complete(value:T);

/**
Resolves the `Future` with an error state. The work function won't be called again.
**/
Error(error:Dynamic);

/**
Re-runs the work function without dispatching an event. This is particularly important
in single-threaded mode, to avoid blocking the main thread.
**/
Incomplete;

/**
Dispatches a progress event before re-running the work function.
**/
Progress(progress:Int, total:Int);
}

/**
The class that handles asynchronous `doWork` functions passed to `Future.withEventualValue()`.
**/
#if !lime_debug
@:fileXml('tags="haxe,release"')
@:noDebug
#end
@:dox(hide) class FutureWork
{
@:allow(lime.app.Future)
private static var singleThreadPool:ThreadPool;
#if lime_threads
private static var multiThreadPool:ThreadPool;
// It isn't safe to pass a promise object to a web worker, but since it's
// `@:generic` we can't store it as `Promise<Dynamic>`. Instead, we'll store
// the two methods we need.
private static var promises:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic}> = new Map();
#end
public static var minThreads(default, set):Int = 0;
public static var maxThreads(default, set):Int = 1;
public static var activeJobs(get, never):Int;
public static var singleThread(get, null):FutureWork;

private static function getPool(mode:ThreadMode):ThreadPool
private static inline function get_singleThread():FutureWork
{
#if lime_threads
if (mode == MULTI_THREADED) {
if(multiThreadPool == null) {
multiThreadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
multiThreadPool.onComplete.add(multiThreadPool_onComplete);
multiThreadPool.onError.add(multiThreadPool_onError);
}
return multiThreadPool;
}
#end
if(singleThreadPool == null) {
singleThreadPool = new ThreadPool(minThreads, maxThreads, SINGLE_THREADED);
singleThreadPool.onComplete.add(singleThreadPool_onComplete);
singleThreadPool.onError.add(singleThreadPool_onError);
if (singleThread == null)
{
singleThread = new FutureWork(SINGLE_THREADED);
}
return singleThreadPool;
return singleThread;
}

@:allow(lime.app.Future)
private static function run<T>(work:WorkFunction<State->Null<T>>, state:State, promise:Promise<T>, mode:ThreadMode = MULTI_THREADED, legacyCode:Bool = false):Void
{
var bundle = {work: work, state: state, promise: promise, legacyCode: legacyCode};
#if lime_threads
public static var multiThread(get, null):FutureWork;

#if lime_threads
if (mode == MULTI_THREADED)
private static inline function get_multiThread():FutureWork
{
if (multiThread == null)
{
#if html5
work.makePortable();
#end

bundle.promise = null;
multiThread = new FutureWork(MULTI_THREADED);
}
#end
return multiThread;
}
#end

public static var totalActiveJobs(get, never):Int;

var jobID:Int = getPool(mode).run(threadPool_doWork, bundle);
private static inline function get_totalActiveJobs():Int
{
return singleThread.activeJobs #if lime_threads + multiThread.activeJobs #end;
}

@:allow(lime.app.Future)
private static function forMode(mode:ThreadMode):FutureWork
{
#if lime_threads
if (mode == MULTI_THREADED)
{
promises[jobID] = {complete: promise.complete, error: promise.error};
return multiThread;
}
#end
return singleThread;
}

// Event Handlers
private static function threadPool_doWork(bundle:{work:WorkFunction<State->Dynamic>, state:State, legacyCode:Bool}, output:WorkOutput):Void
{
try
{
var result = bundle.work.dispatch(bundle.state);
if (result != null || bundle.legacyCode)
{
output.sendComplete(result);
}
}
catch (e:Dynamic)
{
output.sendError(e);
}
}
private var threadPool:ThreadPool;

// Because `Promise` is `@:generic`, we can't always store it as `Promise<Dynamic>`.
// Instead, we'll store the specific methods we need.
private var promises:Map<Int, {complete:Dynamic->Dynamic, error:Dynamic->Dynamic, progress:Int->Int->Dynamic}> = new Map();

private static function singleThreadPool_onComplete(result:Dynamic):Void
public var minThreads(get, set):Int;
public var maxThreads(get, set):Int;
public var activeJobs(get, never):Int;

private function new(mode:ThreadMode)
{
singleThreadPool.activeJob.state.promise.complete(result);
threadPool = new ThreadPool(mode);
threadPool.onComplete.add(threadPool_onComplete);
threadPool.onError.add(threadPool_onError);
threadPool.onProgress.add(threadPool_onProgress);
}

private static function singleThreadPool_onError(error:Dynamic):Void
@:allow(lime.app.Future)
private function run<T>(doWork:WorkFunction<State->WorkOutput->Void>, state:State, promise:Promise<T>):Void
{
singleThreadPool.activeJob.state.promise.error(error);
var jobID:Int = threadPool.run(doWork, state);
promises[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress};
}

#if lime_threads
private static function multiThreadPool_onComplete(result:Dynamic):Void
// Event Handlers
private function threadPool_onComplete(result:Dynamic):Void
{
var promise = promises[multiThreadPool.activeJob.id];
promises.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.complete(result);
}

private static function multiThreadPool_onError(error:Dynamic):Void
private function threadPool_onError(error:Dynamic):Void
{
var promise = promises[multiThreadPool.activeJob.id];
promises.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.error(error);
}
#end

// Getters & Setters
@:noCompletion private static inline function set_minThreads(value:Int):Int
private function threadPool_onProgress(progress:Dynamic):Void
{
if (singleThreadPool != null)
{
singleThreadPool.minThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
if (Type.typeof(progress) == TObject && Type.typeof(progress.progress) == TInt && Type.typeof(progress.total) == TInt)
{
multiThreadPool.minThreads = value;
promises[threadPool.activeJob.id].progress(progress.progress, progress.total);
}
#end
return minThreads = value;
}

@:noCompletion private static inline function set_maxThreads(value:Int):Int
// Getters & Setters
private inline function get_minThreads():Int
{
if (singleThreadPool != null)
{
singleThreadPool.maxThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
{
multiThreadPool.maxThreads = value;
}
#end
return maxThreads = value;
return threadPool.minThreads;
}

@:noCompletion private static function get_activeJobs():Int
private inline function set_minThreads(value:Int):Int
{
var sum:Int = 0;
if (singleThreadPool != null)
{
sum += singleThreadPool.activeJobs;
}
#if lime_threads
if (multiThreadPool != null)
{
sum += multiThreadPool.activeJobs;
}
#end
return sum;
return threadPool.minThreads = value;
}

private inline function get_maxThreads():Int
{
return threadPool.maxThreads;
}

private inline function set_maxThreads(value:Int):Int
{
return threadPool.maxThreads = value;
}

private inline function get_activeJobs():Int
{
return threadPool.activeJobs;
}
}
2 changes: 1 addition & 1 deletion src/lime/graphics/Image.hx
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ class Image

return promise.future;
#else
return Future.withEventualValue(fromBytes, bytes, MULTI_THREADED);
return new Future(fromBytes.bind(bytes), true);
#end
}

Expand Down
2 changes: 1 addition & 1 deletion src/lime/media/AudioBuffer.hx
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class AudioBuffer

return promise.future;
#else
return Future.withEventualValue(fromFiles, paths, MULTI_THREADED);
return new Future(fromFiles.bind(paths), true);
#end
}

Expand Down
Loading
Loading