Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Daggers eager API with (vendored) lazy trees #80

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/FileTrees.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ include("values.jl")


import Dagger
import Dagger: compute, delayed, Chunk, Thunk
import Dagger: Chunk

export lazy, exec, compute

Expand Down
76 changes: 29 additions & 47 deletions src/parallelism.jl
Original file line number Diff line number Diff line change
@@ -1,46 +1,17 @@
lazy(f; kw...) = delayed(f; kw...)

# If any input is lazy, make the output lazy
maybe_lazy(f, x) = any(x->x isa Union{Thunk, Chunk}, x) ? lazy(f)(x...) : f(x...)

maybe_lazy(f) = (x...) -> maybe_lazy(f, x)

function mapcompute(ctx, xs;
cache=false,
collect_results=identity,
map=map, kw...)
thunks = []
map(xs) do x
if x isa Thunk
if cache
x.cache = true
end
push!(thunks, x)
end
x
end

vals = collect_results(compute(ctx, delayed((xs...)->[xs...]; meta=true)(thunks...); kw...))

i = 0
map(xs) do x
# Expression returns vals[i] or x
if x isa Thunk
i += 1
vals[i]
else
x
end
end
# Types left off since this is what Dagger does, probably for inference reasons. Not sure if this is the best performance here...
struct Thunk
f
args
kwargs
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this contain Thunks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all chaining of lazy operations works as on current main.

end

function mapexec(ctx, xs; cache=false, map=map)
mapcompute(ctx, xs;
map=map,
cache=cache,
collect_results=xs -> asyncmap(d -> exec(ctx, d), xs))
end
lazy(f) = (args...; kwargs...) -> Thunk(f, args, kwargs)

# If any input is lazy, make the output lazy
maybe_lazy(f, x) = any(x->x isa Thunk, x) ? lazy(f)(x...) : f(x...)

maybe_lazy(f) = (x...) -> maybe_lazy(f, x)

"""
compute(tree::FileTree; cache=true)
Expand All @@ -49,10 +20,7 @@ Compute any lazy values (Thunks) in `tree` and return a new tree where the value
the computed values (maybe on remote processes). The tree still behaves as a Lazy tree. `exec` on it will fetch the values from remote processes.
"""
compute(d::FileTree; cache=true, kw...) = compute(Dagger.Context(), d; cache=cache, kw...)

function compute(ctx, d::FileTree; cache=true, kw...)
mapcompute(ctx, d, map=((f,t) -> mapvalues(f, t; lazy=false)), cache=cache; kw...)
end
compute(ctx, d::FileTree; cache=true, kw...) = mapvalues(v -> exec(ctx, v, identity), d; lazy=false)

"""
exec(x)
Expand All @@ -69,9 +37,23 @@ exec(x) = exec(Dagger.Context(), x)

Same as `exec(x)` with a ctx being passed to `Dagger` when computing any `Thunks`.
"""
exec(ctx, x) = x
exec(ctx, x, args...) = x

exec(ctx, d::FileTree) = mapexec(ctx, d, map=(f,t) -> mapvalues(f, t; lazy=false))
exec(ctx, f::File) = setvalue(f, exec(ctx, f[]))
exec(ctx, d::FileTree, args...) = mapvalues(fetch, compute(ctx, d))
exec(ctx, f::File, collect_results=fetch) = setvalue(f, exec(ctx, f[], collect_results))

exec(ctx, d::Union{Thunk, Chunk}) = collect(ctx, compute(ctx, d))
# TODO: Probably need to rework this since there does not seem to be a (safe) way to set the context for a spawned task (context seems to be global since scheduler is global)?
exec(ctx::Dagger.Context, t::Thunk, collect_results=fetch) = collect_results(Dagger.spawn(t.f, map(a -> exec(ctx, a, identity), t.args)...; (k => exec(ctx, v, identity) for (k,v) in t.kwargs)...))

# TODO: Not sure if these are worth keeping. Added mostly for benchmarking reasons
struct SingleTreadedContext end
exec(ctx::SingleTreadedContext, t::Thunk, args...) = t.f(map(a -> exec(ctx, a), t.args)...; (k => exec(ctx, v) for (k,v) in t.kwargs)...)

struct ThreadContext end
function exec(ctx::ThreadContext, t::Thunk, collect_results=fetch)
args = map(a -> exec(ctx, a, identity), t.args)
kwargs = [k => exec(ctx, v, identity) for (k,v) in t.kwargs]
res = Threads.@spawn t.f(map(fetch, args)...; (k => fetch(v) for (k,v) in kwargs)...)
collect_results(res)
end

3 changes: 2 additions & 1 deletion src/values.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ function save(f, t::Node; lazy=nothing, exec=true)
# workers though...
t_nothunks = map(t) do n
n[] isa Thunk && return setvalue(n, NoValue())
n[] isa Chunk && return setvalue(n, NoValue())
# TODO: Not sure if trees can have Chunks after switch to Daggers eager API
n[] isa Chunk && return setvalue(n, NoValue())
n
end |> setparent

Expand Down
5 changes: 3 additions & 2 deletions test/basics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ end
end
end

@testset "exec with context" begin
# TODO: Just remove if we anyways can't select which context to use in Daggers eager/standard API?
#= @testset "exec with context" begin
import Dagger

struct SpecialContext end
Expand All @@ -251,7 +252,7 @@ end
@test exec(SpecialContext(), t) |> values == [1,2,3]
@test computespecial[] == true # Dummy compare to make failed test outprint a little less confusing
@test ncollectspecial[] == 3 # All values are collected with SpecialContext
end
end =#


@testset "iterators" begin
Expand Down
Loading