From c3efb2e45fe64b14908e6af1cd2f991fa73b1472 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sun, 2 Oct 2022 13:37:11 +0200 Subject: [PATCH] Use assocreduce in mapcompute instead of splatting --- .gitignore | 1 + src/parallelism.jl | 25 +++++++- src/values.jl | 2 +- test/basics.jl | 140 ++++++++++++++++++++++++--------------------- 4 files changed, 100 insertions(+), 68 deletions(-) diff --git a/.gitignore b/.gitignore index ba39cc5..2ba67bd 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ Manifest.toml +.vscode \ No newline at end of file diff --git a/src/parallelism.jl b/src/parallelism.jl index 5dac1e0..8246704 100644 --- a/src/parallelism.jl +++ b/src/parallelism.jl @@ -9,7 +9,18 @@ function mapcompute(ctx, xs; cache=false, collect_results=identity, map=map, kw...) - thunks = [] + + # Dagger does not have a way to just say "here is a bunch of Thunks, just compute them for me please" + # To make Dagger do this for us, we collect all Thunks in xs and ask Dagger to concatenate them into an array + # Once we have that array we just put the computed values back at the same place we found them + # Note that we rely on map visiting all elements in the same order each time it is called + + # Creating all these intermediate arrys do have a little bit of overhead and I doubt this + # is the most efficient way of doing it. Hopefully if people end up here it is because they + # actually have heavy computations where the extra overhead is not significant + + # Step 1: Collect all thunks in xs + thunks = Thunk[] map(xs) do x if x isa Thunk if cache @@ -20,8 +31,14 @@ function mapcompute(ctx, xs; x end - vals = collect_results(compute(ctx, delayed((xs...)->[xs...]; meta=true)(thunks...); kw...)) + # Step 2: Ask Dagger to concatenate the results into vals + # We do assocreduce here mainly to prevent inference issues when xs is heterogenous + # Drawback vs e.g. splatting the whole array into a single delayed call to vcat is + # that we end up creating a fair bit of intermediate arrays (about log2(length(thunks))). + vals = collect_results(compute(ctx, assocreduce(delayed(vcat; meta=true), thunks); kw...)) + # Step 3: Put the computed results back at the same places we found them + # This is where we rely on map visiting all elements in the same order. i = 0 map(xs) do x # Expression returns vals[i] or x @@ -38,9 +55,11 @@ function mapexec(ctx, xs; cache=false, map=map) mapcompute(ctx, xs; map=map, cache=cache, - collect_results=xs -> asyncmap(d -> exec(ctx, d), xs)) + collect_results=xs -> collect_chunks(ctx, xs)) end +collect_chunks(ctx, x) = [exec(ctx,x)] +collect_chunks(ctx, xs::AbstractArray) = asyncmap(d -> exec(ctx,d), xs) """ compute(tree::FileTree; cache=true) diff --git a/src/values.jl b/src/values.jl index d287b43..866af0e 100644 --- a/src/values.jl +++ b/src/values.jl @@ -95,7 +95,7 @@ function assocreduce(f, xs; init=no_init) length(xs) == 1 && return xs[1] l = length(xs) m = div(l, 2) - f(assocreduce(f, xs[1:m]), assocreduce(f, xs[m+1:end])) + f(assocreduce(f, @view(xs[1:m])), assocreduce(f, @view(xs[m+1:end]))) end """ diff --git a/test/basics.jl b/test/basics.jl index 7be1c35..103cbfb 100644 --- a/test/basics.jl +++ b/test/basics.jl @@ -140,100 +140,112 @@ import FileTrees: attach end @testset "values" begin - t1 = FileTrees.load(path, t) - if isdir("test_dir") - rm("test_dir", recursive=true) - end - @test get(t1["a/b/a"]) == string(p"." / "a" / "b" / "a") + mktempdir() do tmproot + basedir = joinpath(tmproot, "test_dir_values") + t1 = FileTrees.load(path, t) + + @test get(t1["a/b/a"]) == string(p"." / "a" / "b" / "a") - @test reducevalues(*, mapvalues(lowercase, t1)) == lowercase(reducevalues(*, t1)) + @test reducevalues(*, mapvalues(lowercase, t1)) == lowercase(reducevalues(*, t1)) - FileTrees.save(maketree("test_dir" => [t1])) do f - @test f isa File - open(path(f), "w") do io - print(io, get(f)) + FileTrees.save(maketree(basedir=> [t1])) do f + @test f isa File + open(path(f), "w") do io + print(io, get(f)) + end end - end - t2 = FileTree("test_dir") - t3 = FileTrees.load(t2) do f - open(path(f), "r") do io - String(read(io)) + t2 = FileTree(basedir) + t3 = FileTrees.load(t2) do f + open(path(f), "r") do io + String(read(io)) + end end - end - t4 = filter(!isempty, t1) + t4 = filter(!isempty, t1) - @test isequal(t3, FileTrees.rename(t4, "test_dir")) - if isdir("test_dir") - rm("test_dir", recursive=true) - end + @test isequal(t3, FileTrees.rename(t4, basedir)) - x1 = maketree("a"=>[(name="b", value=1)]) - x2 = mapvalues(x->NoValue(), x1, lazy=true) - @test !isempty(values(x2)) - @test isempty(values(exec(x2))) - x3 = mapvalues(x->rand(), x2) - @test !isempty(values(x3)) - @test isempty(values(exec(x3))) + x1 = maketree("a"=>[(name="b", value=1)]) + x2 = mapvalues(x->NoValue(), x1, lazy=true) + @test !isempty(values(x2)) + @test isempty(values(exec(x2))) + x3 = mapvalues(x->rand(), x2) + @test !isempty(values(x3)) + @test isempty(values(exec(x3))) - # issue 16 - @test_throws ArgumentError reducevalues(+, maketree("." => [])) - @test reducevalues(+, maketree("." => []), init=0) === 0 + # issue 16 + @test_throws ArgumentError reducevalues(+, maketree("." => [])) + @test reducevalues(+, maketree("." => []), init=0) === 0 - @test_throws Union{ArgumentError,MethodError} reducevalues(+, maketree("." => []), associative=false) - @test reducevalues(+, maketree("." => []), init=0, associative=false) === 0 + @test_throws Union{ArgumentError,MethodError} reducevalues(+, maketree("." => []), associative=false) + @test reducevalues(+, maketree("." => []), init=0, associative=false) === 0 - # issue 23 - @test FileTrees.save(identity, maketree([])) == nothing + # issue 23 + @test FileTrees.save(identity, maketree([])) == nothing + end end @testset "lazy-exec" begin + mktempdir() do tmproot + basedir = joinpath(tmproot, "test_dir_lazy") - if isdir("test_dir_lazy") - rm("test_dir_lazy", recursive=true) - end + t1 = FileTrees.load(uppercase∘path, t, lazy=true) + @test get(t1["a/b/a"]) isa Thunk + @test get(exec(t1)["a/b/a"]) == string(p"."/"A"/"B"/"A") + # Exec a single File + @test get(exec(t1["a/b/a"])) == string(p"."/"A"/"B"/"A") - t1 = FileTrees.load(uppercase∘path, t, lazy=true) + @test exec(reducevalues(*, mapvalues(lowercase, t1))) == lowercase(exec(reducevalues(*, t1))) - @test get(t1["a/b/a"]) isa Thunk - @test get(exec(t1)["a/b/a"]) == string(p"."/"A"/"B"/"A") - # Exec a single File - @test get(exec(t1["a/b/a"])) == string(p"."/"A"/"B"/"A") + s = FileTrees.save(maketree(basedir => [t1])) do f + open(path(f), "w") do io + print(io, get(f)) + end + end + @test isdir(basedir) + @test isfile(joinpath(basedir, "a", "b", "a")) - @test exec(reducevalues(*, mapvalues(lowercase, t1))) == lowercase(exec(reducevalues(*, t1))) - s = FileTrees.save(maketree("test_dir_lazy" => [t1])) do f - open(path(f), "w") do io - print(io, get(f)) + t2 = FileTree(basedir) + t3 = FileTrees.load(t2; lazy=true) do f + open(path(f), "r") do io + (String(read(io)), now()) + end end - end + toc = now() + sleep(0.01) + tic = exec(reducevalues((x,y)->x, mapvalues(last, t3))) - @test isdir("test_dir_lazy") - @test isfile("test_dir_lazy/a/b/a") + @test tic > toc + t4 = filter(!isempty, t1) |> exec - t2 = FileTree("test_dir_lazy") - t3 = FileTrees.load(t2; lazy=true) do f - open(path(f), "r") do io - (String(read(io)), now()) - end + t5 = mapvalues(first, t3) |> exec + @test isequal(t5, FileTrees.rename(t4, basedir)) end - toc = now() - sleep(0.01) - tic = exec(reducevalues((x,y)->x, mapvalues(last, t3))) +end + +@testset "lazy-exec heterogenous" begin + mktempdir() do tmproot + basedir = joinpath(tmproot, "test_dir_lazy") - @test tic > toc + smalltree = exec(FileTrees.load(maketree(basedir => [string(i) for i in 1:3]); lazy=true) do file + isodd(parse(Int, name(file))) ? "AAA" : 13 + end) - t4 = filter(!isempty, t1) |> exec + @test unique(values(smalltree)[1:2:end]) == ["AAA"] + @test unique(values(smalltree)[2:2:end]) == [13] - t5 = mapvalues(first, t3) |> exec - @test isequal(t5, FileTrees.rename(t4, "test_dir_lazy")) + # This used to create inference problems due to array of all values being splatted into a function + largetree = exec(FileTrees.load(maketree(basedir => [string(i) for i in 1:1200]); lazy=true) do file + isodd(parse(Int, name(file))) ? "AAA" : 13 + end) - if isdir("test_dir_lazy") - rm("test_dir_lazy", recursive=true) + @test unique(values(largetree)[1:2:end]) == ["AAA"] + @test unique(values(largetree)[2:2:end]) == [13] end end