diff --git a/src/FileTrees.jl b/src/FileTrees.jl index d1ed758..808a26e 100644 --- a/src/FileTrees.jl +++ b/src/FileTrees.jl @@ -32,7 +32,7 @@ include("values.jl") import Dagger -import Dagger: compute, delayed, Chunk, Thunk +import Dagger: Chunk export lazy, exec, compute diff --git a/src/parallelism.jl b/src/parallelism.jl index 5dac1e0..be0ce88 100644 --- a/src/parallelism.jl +++ b/src/parallelism.jl @@ -1,46 +1,17 @@ -lazy(f; kw...) = delayed(f; kw...) -# If any input is lazy, make the output lazy -maybe_lazy(f, x) = any(x->x isa Union{Thunk, Chunk}, x) ? lazy(f)(x...) : f(x...) - -maybe_lazy(f) = (x...) -> maybe_lazy(f, x) - -function mapcompute(ctx, xs; - cache=false, - collect_results=identity, - map=map, kw...) - thunks = [] - map(xs) do x - if x isa Thunk - if cache - x.cache = true - end - push!(thunks, x) - end - x - end - - vals = collect_results(compute(ctx, delayed((xs...)->[xs...]; meta=true)(thunks...); kw...)) - - i = 0 - map(xs) do x - # Expression returns vals[i] or x - if x isa Thunk - i += 1 - vals[i] - else - x - end - end +# Types left off since this is what Dagger does, probably for inference reasons. Not sure if this is the best performance here... +struct Thunk + f + args + kwargs end -function mapexec(ctx, xs; cache=false, map=map) - mapcompute(ctx, xs; - map=map, - cache=cache, - collect_results=xs -> asyncmap(d -> exec(ctx, d), xs)) -end +lazy(f) = (args...; kwargs...) -> Thunk(f, args, kwargs) +# If any input is lazy, make the output lazy +maybe_lazy(f, x) = any(x->x isa Thunk, x) ? lazy(f)(x...) : f(x...) + +maybe_lazy(f) = (x...) -> maybe_lazy(f, x) """ compute(tree::FileTree; cache=true) @@ -49,10 +20,7 @@ Compute any lazy values (Thunks) in `tree` and return a new tree where the value the computed values (maybe on remote processes). The tree still behaves as a Lazy tree. `exec` on it will fetch the values from remote processes. """ compute(d::FileTree; cache=true, kw...) = compute(Dagger.Context(), d; cache=cache, kw...) - -function compute(ctx, d::FileTree; cache=true, kw...) - mapcompute(ctx, d, map=((f,t) -> mapvalues(f, t; lazy=false)), cache=cache; kw...) -end +compute(ctx, d::FileTree; cache=true, kw...) = mapvalues(v -> exec(ctx, v, identity), d; lazy=false) """ exec(x) @@ -69,9 +37,23 @@ exec(x) = exec(Dagger.Context(), x) Same as `exec(x)` with a ctx being passed to `Dagger` when computing any `Thunks`. """ -exec(ctx, x) = x +exec(ctx, x, args...) = x -exec(ctx, d::FileTree) = mapexec(ctx, d, map=(f,t) -> mapvalues(f, t; lazy=false)) -exec(ctx, f::File) = setvalue(f, exec(ctx, f[])) +exec(ctx, d::FileTree, args...) = mapvalues(fetch, compute(ctx, d)) +exec(ctx, f::File, collect_results=fetch) = setvalue(f, exec(ctx, f[], collect_results)) -exec(ctx, d::Union{Thunk, Chunk}) = collect(ctx, compute(ctx, d)) +# TODO: Probably need to rework this since there does not seem to be a (safe) way to set the context for a spawned task (context seems to be global since scheduler is global)? +exec(ctx::Dagger.Context, t::Thunk, collect_results=fetch) = collect_results(Dagger.spawn(t.f, map(a -> exec(ctx, a, identity), t.args)...; (k => exec(ctx, v, identity) for (k,v) in t.kwargs)...)) + +# TODO: Not sure if these are worth keeping. Added mostly for benchmarking reasons +struct SingleTreadedContext end +exec(ctx::SingleTreadedContext, t::Thunk, args...) = t.f(map(a -> exec(ctx, a), t.args)...; (k => exec(ctx, v) for (k,v) in t.kwargs)...) + +struct ThreadContext end +function exec(ctx::ThreadContext, t::Thunk, collect_results=fetch) + args = map(a -> exec(ctx, a, identity), t.args) + kwargs = [k => exec(ctx, v, identity) for (k,v) in t.kwargs] + res = Threads.@spawn t.f(map(fetch, args)...; (k => fetch(v) for (k,v) in kwargs)...) + collect_results(res) +end + \ No newline at end of file diff --git a/src/values.jl b/src/values.jl index 286028a..569cada 100644 --- a/src/values.jl +++ b/src/values.jl @@ -118,7 +118,8 @@ function save(f, t::Node; lazy=nothing, exec=true) # workers though... t_nothunks = map(t) do n n[] isa Thunk && return setvalue(n, NoValue()) - n[] isa Chunk && return setvalue(n, NoValue()) + # TODO: Not sure if trees can have Chunks after switch to Daggers eager API + n[] isa Chunk && return setvalue(n, NoValue()) n end |> setparent diff --git a/test/basics.jl b/test/basics.jl index d04342e..b48413d 100644 --- a/test/basics.jl +++ b/test/basics.jl @@ -230,7 +230,8 @@ end end end -@testset "exec with context" begin +# TODO: Just remove if we anyways can't select which context to use in Daggers eager/standard API? +#= @testset "exec with context" begin import Dagger struct SpecialContext end @@ -251,7 +252,7 @@ end @test exec(SpecialContext(), t) |> values == [1,2,3] @test computespecial[] == true # Dummy compare to make failed test outprint a little less confusing @test ncollectspecial[] == 3 # All values are collected with SpecialContext -end +end =# @testset "iterators" begin