Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use assocreduce in mapcompute instead of splatting #73

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Manifest.toml
.vscode
25 changes: 22 additions & 3 deletions src/parallelism.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ function mapcompute(ctx, xs;
cache=false,
collect_results=identity,
map=map, kw...)
thunks = []

# Dagger does not have a way to just say "here is a bunch of Thunks, just compute them for me please"
# To make Dagger do this for us, we collect all Thunks in xs and ask Dagger to concatenate them into an array
# Once we have that array we just put the computed values back at the same place we found them
# Note that we rely on map visiting all elements in the same order each time it is called

# Creating all these intermediate arrys do have a little bit of overhead and I doubt this
# is the most efficient way of doing it. Hopefully if people end up here it is because they
# actually have heavy computations where the extra overhead is not significant

# Step 1: Collect all thunks in xs
thunks = Thunk[]
map(xs) do x
if x isa Thunk
if cache
Expand All @@ -20,8 +31,14 @@ function mapcompute(ctx, xs;
x
end

vals = collect_results(compute(ctx, delayed((xs...)->[xs...]; meta=true)(thunks...); kw...))
# Step 2: Ask Dagger to concatenate the results into vals
# We do assocreduce here mainly to prevent inference issues when xs is heterogenous
# Drawback vs e.g. splatting the whole array into a single delayed call to vcat is
# that we end up creating a fair bit of intermediate arrays (about log2(length(thunks))).
vals = collect_results(compute(ctx, assocreduce(delayed(vcat; meta=true), thunks); kw...))

# Step 3: Put the computed results back at the same places we found them
# This is where we rely on map visiting all elements in the same order.
i = 0
map(xs) do x
# Expression returns vals[i] or x
Expand All @@ -38,9 +55,11 @@ function mapexec(ctx, xs; cache=false, map=map)
mapcompute(ctx, xs;
map=map,
cache=cache,
collect_results=xs -> asyncmap(d -> exec(ctx, d), xs))
collect_results=xs -> collect_chunks(ctx, xs))
end

collect_chunks(ctx, x) = [exec(ctx,x)]
collect_chunks(ctx, xs::AbstractArray) = asyncmap(d -> exec(ctx,d), xs)

"""
compute(tree::FileTree; cache=true)
Expand Down
2 changes: 1 addition & 1 deletion src/values.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function assocreduce(f, xs; init=no_init)
length(xs) == 1 && return xs[1]
l = length(xs)
m = div(l, 2)
f(assocreduce(f, xs[1:m]), assocreduce(f, xs[m+1:end]))
f(assocreduce(f, @view(xs[1:m])), assocreduce(f, @view(xs[m+1:end])))
end

"""
Expand Down
140 changes: 76 additions & 64 deletions test/basics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -140,100 +140,112 @@ import FileTrees: attach
end

@testset "values" begin
t1 = FileTrees.load(path, t)
if isdir("test_dir")
rm("test_dir", recursive=true)
end

@test get(t1["a/b/a"]) == string(p"." / "a" / "b" / "a")
mktempdir() do tmproot
basedir = joinpath(tmproot, "test_dir_values")
t1 = FileTrees.load(path, t)

@test get(t1["a/b/a"]) == string(p"." / "a" / "b" / "a")

@test reducevalues(*, mapvalues(lowercase, t1)) == lowercase(reducevalues(*, t1))
@test reducevalues(*, mapvalues(lowercase, t1)) == lowercase(reducevalues(*, t1))

FileTrees.save(maketree("test_dir" => [t1])) do f
@test f isa File
open(path(f), "w") do io
print(io, get(f))
FileTrees.save(maketree(basedir=> [t1])) do f
@test f isa File
open(path(f), "w") do io
print(io, get(f))
end
end
end

t2 = FileTree("test_dir")
t3 = FileTrees.load(t2) do f
open(path(f), "r") do io
String(read(io))
t2 = FileTree(basedir)
t3 = FileTrees.load(t2) do f
open(path(f), "r") do io
String(read(io))
end
end
end

t4 = filter(!isempty, t1)
t4 = filter(!isempty, t1)

@test isequal(t3, FileTrees.rename(t4, "test_dir"))
if isdir("test_dir")
rm("test_dir", recursive=true)
end
@test isequal(t3, FileTrees.rename(t4, basedir))

x1 = maketree("a"=>[(name="b", value=1)])
x2 = mapvalues(x->NoValue(), x1, lazy=true)
@test !isempty(values(x2))
@test isempty(values(exec(x2)))
x3 = mapvalues(x->rand(), x2)
@test !isempty(values(x3))
@test isempty(values(exec(x3)))
x1 = maketree("a"=>[(name="b", value=1)])
x2 = mapvalues(x->NoValue(), x1, lazy=true)
@test !isempty(values(x2))
@test isempty(values(exec(x2)))
x3 = mapvalues(x->rand(), x2)
@test !isempty(values(x3))
@test isempty(values(exec(x3)))

# issue 16
@test_throws ArgumentError reducevalues(+, maketree("." => []))
@test reducevalues(+, maketree("." => []), init=0) === 0
# issue 16
@test_throws ArgumentError reducevalues(+, maketree("." => []))
@test reducevalues(+, maketree("." => []), init=0) === 0

@test_throws Union{ArgumentError,MethodError} reducevalues(+, maketree("." => []), associative=false)
@test reducevalues(+, maketree("." => []), init=0, associative=false) === 0
@test_throws Union{ArgumentError,MethodError} reducevalues(+, maketree("." => []), associative=false)
@test reducevalues(+, maketree("." => []), init=0, associative=false) === 0

# issue 23
@test FileTrees.save(identity, maketree([])) == nothing
# issue 23
@test FileTrees.save(identity, maketree([])) == nothing
end
end

@testset "lazy-exec" begin
mktempdir() do tmproot
basedir = joinpath(tmproot, "test_dir_lazy")

if isdir("test_dir_lazy")
rm("test_dir_lazy", recursive=true)
end
t1 = FileTrees.load(uppercase∘path, t, lazy=true)

@test get(t1["a/b/a"]) isa Thunk
@test get(exec(t1)["a/b/a"]) == string(p"."/"A"/"B"/"A")
# Exec a single File
@test get(exec(t1["a/b/a"])) == string(p"."/"A"/"B"/"A")

t1 = FileTrees.load(uppercase∘path, t, lazy=true)
@test exec(reducevalues(*, mapvalues(lowercase, t1))) == lowercase(exec(reducevalues(*, t1)))

@test get(t1["a/b/a"]) isa Thunk
@test get(exec(t1)["a/b/a"]) == string(p"."/"A"/"B"/"A")
# Exec a single File
@test get(exec(t1["a/b/a"])) == string(p"."/"A"/"B"/"A")
s = FileTrees.save(maketree(basedir => [t1])) do f
open(path(f), "w") do io
print(io, get(f))
end
end
@test isdir(basedir)
@test isfile(joinpath(basedir, "a", "b", "a"))

@test exec(reducevalues(*, mapvalues(lowercase, t1))) == lowercase(exec(reducevalues(*, t1)))

s = FileTrees.save(maketree("test_dir_lazy" => [t1])) do f
open(path(f), "w") do io
print(io, get(f))
t2 = FileTree(basedir)
t3 = FileTrees.load(t2; lazy=true) do f
open(path(f), "r") do io
(String(read(io)), now())
end
end
end
toc = now()
sleep(0.01)
tic = exec(reducevalues((x,y)->x, mapvalues(last, t3)))

@test isdir("test_dir_lazy")
@test isfile("test_dir_lazy/a/b/a")
@test tic > toc

t4 = filter(!isempty, t1) |> exec

t2 = FileTree("test_dir_lazy")
t3 = FileTrees.load(t2; lazy=true) do f
open(path(f), "r") do io
(String(read(io)), now())
end
t5 = mapvalues(first, t3) |> exec
@test isequal(t5, FileTrees.rename(t4, basedir))
end
toc = now()
sleep(0.01)
tic = exec(reducevalues((x,y)->x, mapvalues(last, t3)))
end

@testset "lazy-exec heterogenous" begin
mktempdir() do tmproot
basedir = joinpath(tmproot, "test_dir_lazy")

@test tic > toc
smalltree = exec(FileTrees.load(maketree(basedir => [string(i) for i in 1:3]); lazy=true) do file
isodd(parse(Int, name(file))) ? "AAA" : 13
end)

t4 = filter(!isempty, t1) |> exec
@test unique(values(smalltree)[1:2:end]) == ["AAA"]
@test unique(values(smalltree)[2:2:end]) == [13]

t5 = mapvalues(first, t3) |> exec
@test isequal(t5, FileTrees.rename(t4, "test_dir_lazy"))
# This used to create inference problems due to array of all values being splatted into a function
largetree = exec(FileTrees.load(maketree(basedir => [string(i) for i in 1:1200]); lazy=true) do file
isodd(parse(Int, name(file))) ? "AAA" : 13
end)

if isdir("test_dir_lazy")
rm("test_dir_lazy", recursive=true)
@test unique(values(largetree)[1:2:end]) == ["AAA"]
@test unique(values(largetree)[2:2:end]) == [13]
end
end

Expand Down