Skip to content

Commit

Permalink
Distributed WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
utkinis committed Sep 13, 2023
1 parent a94d177 commit 5c79fdd
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 18 deletions.
14 changes: 14 additions & 0 deletions ext/AMDGPUExt/AMDGPUExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module AMDGPUExt

using AMDGPU
using KernelAbstractions

using FastIce.Architecture

set_device!(dev::HIPDevice) = device!(dev)

heuristic_groupsize(::ROCBackend, ::Val{1}) = (256, )
heuristic_groupsize(::ROCBackend, ::Val{2}) = (128, 2, )
heuristic_groupsize(::ROCBackend, ::Val{3}) = (128, 2, 1, )

end
14 changes: 14 additions & 0 deletions ext/CUDAExt/CUDAExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module CUDAExt

using CUDA
using KernelAbstractions

using FastIce.Architecture

set_device!(dev::CuDevice) = device!(dev)

heuristic_groupsize(::CUDABackend, ::Val{1}) = (256, )
heuristic_groupsize(::CUDABackend, ::Val{2}) = (32, 8, )
heuristic_groupsize(::CUDABackend, ::Val{3}) = (32, 8, 1, )

end
2 changes: 1 addition & 1 deletion scripts_future_API/tm_stokes_wip.jl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Isothermal._apply_bcs!(model.backend, model.grid, model.fields, model.boundary_c
set!(model.fields.η, other_fields.A)
extrapolate!(model.fields.η)

for it in 1:10
for it in 1:100
advance_iteration!(model, 0.0, 1.0; async = false)
if it % 10 == 0
plt.Pr[3][] = interior(model.fields.Pr)[:, :, size(grid,3)÷2]
Expand Down
47 changes: 47 additions & 0 deletions src/Distributed/distributed.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module Distributed

using FastIce.Architecture
using FastIce.Grids

export CartesianTopology

export global_rank, shared_rank, node_name, cartesian_communicator, shared_communicator

export dimensions, global_size, node_size

export global_grid_size, local_grid

export split_ndrange

using FastIce.Grids

using MPI

include("topology.jl")

include("split_ndrange.jl")

struct DistributedArchitecture{C,T,R} <: AbstractArchitecture
child_arch::C
topology::T
ranges::R
end

device(arch::DistributedArchitecture) = device(arch.child_arch)

function launch!(arch::DistributedArchitecture, grid::CartesianGrid, kernel::Pair{Kernel,Args}; boundary_conditions=nothing, async=true) where {Args}
fun, args = kernel

worksize = size(grid, Vertex())
groupsize = heuristic_groupsize(arch.child_arch)

fun(arch.backend, groupsize)(args...; ndrange=size(arch.ranges[end]), offset=first(arch.ranges[end]))


isnothing(boundary_conditions) || apply_boundary_conditions!(boundary_conditions)

async || synchronize(arch.backend)
return
end

end
72 changes: 72 additions & 0 deletions src/Distributed/exchanger.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
mutable struct Exchanger
@atomic done::Bool
ch::Channel
bottom::Base.Event
task::Task
@atomic err

function Exchanger(f::F, arch::AbstractArchitecture, comm, rank, halo, border) where F
top = Base.Event(true)
bottom = Base.Event(true)

send_buf = similar(border)
recv_buf = similar(halo)
this = new(false, top, bottom, nothing)

has_neighbor = rank != MPI.PROC_NULL
compute_bc = !has_neighbor

this.task = Threads.@spawn begin
set_device!(device(arch))
KernelAbstractions.priority!(backend(arch), :high)
try
while !(@atomic this.done)
wait(top)
if has_neighbor
recv = MPI.Irecv!(recv_buf, comm; source=rank)
end
f(compute_bc)
if has_neighbor
copyto!(send_buf, border)
send = MPI.Isend(send_buf, comm; dest=rank)
cooperative_test!(recv)
copyto!(halo, recv_buf)
cooperative_test!(send)
end
notify(bottom)
end
catch err
@show err
@atomic this.done = true
@atomic this.err = err
end
end
errormonitor(this.task)
return this
end
end

setdone!(exc::Exchanger) = @atomic exc.done = true

Base.isdone(exc::Exchanger) = @atomic exc.done

function Base.notify(exc::Exchanger)
if !(@atomic exc.done)
notify(exc.top)
else
error("notify: Exchanger is not running")
end
end
function Base.wait(exc::Exchanger)
if !(@atomic exc.done)
wait(exc.bottom)
else
error("wait: Exchanger is not running")
end
end

get_recv_view(::Val{1}, ::Val{D}, A) where D = view(A, ntuple(I -> I == D ? 1 : Colon(), Val(ndims(A)))...)
get_recv_view(::Val{2}, ::Val{D}, A) where D = view(A, ntuple(I -> I == D ? size(A, D) : Colon(), Val(ndims(A)))...)

get_send_view(::Val{1}, ::Val{D}, A) where D = view(A, ntuple(I -> I == D ? 2 : Colon(), Val(ndims(A)))...)
get_send_view(::Val{2}, ::Val{D}, A) where D = view(A, ntuple(I -> I == D ? size(A, D) - 1 : Colon(), Val(ndims(A)))...)
27 changes: 27 additions & 0 deletions src/Distributed/split_ndrange.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
@inline subrange(nr,bw,I,::Val{1}) = 1:bw[I]
@inline subrange(nr,bw,I,::Val{2}) = (size(nr,I)-bw[I]+1):size(nr,I)
@inline subrange(nr,bw,I,::Val{3}) = (bw[I]+1):(size(nr,I)-bw[I])

@inline split_ndrange(ndrange,ndwidth) = split_ndrange(CartesianIndices(ndrange),ndwidth)

function split_ndrange(ndrange::CartesianIndices{N},ndwidth::NTuple{N,<:Integer}) where N
@assert all(size(ndrange) .> ndwidth.*2)
@inline ndsubrange(I,::Val{J}) where J = ntuple(Val(N)) do idim
if idim < I
1:size(ndrange,idim)
elseif idim == I
subrange(ndrange,ndwidth,idim,Val(J))
else
subrange(ndrange,ndwidth,idim,Val(3))
end
end
ndinner = ntuple(idim -> subrange(ndrange,ndwidth,idim,Val(3)), Val(N))
return ntuple(Val(2N+1)) do i
if i == 2N+1
ndrange[ndinner...]
else
idim,idir = divrem(i-1,2) .+ 1
ndrange[ndsubrange(idim,Val(idir))...]
end
end
end
27 changes: 10 additions & 17 deletions src/distributed.jl → src/Distributed/topology.jl
Original file line number Diff line number Diff line change
@@ -1,23 +1,10 @@
module Distributed

export CartesianTopology

export global_rank, shared_rank, node_name, cartesian_communicator, shared_communicator

export dimensions, global_size, node_size

export global_grid_size, local_grid

using FastIce.Grids

using MPI

struct CartesianTopology{N}
nprocs::Int
dims::NTuple{N,Int}
global_rank::Int
shared_rank::Int
cart_coords::NTuple{N,Int}
neighbors::NTuple{N,NTuple{2,Int}}
comm::MPI.Comm
cart_comm::MPI.Comm
shared_comm::MPI.Comm
Expand All @@ -34,7 +21,11 @@ function CartesianTopology(dims::NTuple{N,Int}; comm = MPI.COMM_WORLD) where {N}
node_name = MPI.Get_processor_name()
cart_coords = Tuple(MPI.Cart_coords(cart_comm))

return CartesianTopology{N}(nprocs, dims, global_rank, shared_rank, cart_coords, comm, cart_comm, shared_comm, node_name)
neighbors = ntuple(Val(N)) do dim
MPI.Cart_shift(cart_comm, dim-1, 1)
end

return CartesianTopology{N}(nprocs, dims, global_rank, shared_rank, cart_coords, neighbors, comm, cart_comm, shared_comm, node_name)
end

global_rank(t::CartesianTopology) = t.global_rank
Expand All @@ -49,6 +40,10 @@ shared_communicator(t::CartesianTopology) = t.shared_comm

dimensions(t::CartesianTopology) = t.dims

coordinates(t::CartesianTopology) = t.cart_coords

neighbors(t::CartesianTopology) = t.neighbors

global_size(t::CartesianTopology) = MPI.Comm_size(t.cart_comm)
node_size(t::CartesianTopology) = MPI.Comm_size(t.shared_comm)

Expand All @@ -60,6 +55,4 @@ function local_grid(g::CartesianGrid, t::CartesianTopology)
local_origin = origin(g) .+ local_extent .* t.cart_coords

return CartesianGrid(local_origin, local_extent, local_size)
end

end
5 changes: 5 additions & 0 deletions src/Models/full_stokes/isothermal/isothermal.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@ function advance_iteration!(model::IsothermalFullStokesModel, t, Δt; async = tr
set_bcs!(bcs) = _apply_bcs!(model.backend, model.grid, model.fields, bcs)

# stress

# launch!(arch, grid, update_σ!, Pr, τ, V, η, Δτ, Δ)

update_σ!(backend, 256, (nx+1, ny+1, nz+1))(Pr, τ, V, η, Δτ, Δ)
set_bcs!(model.boundary_conditions.stress)
# velocity

# launch!(arch, grid, (update_res_V! => (rV, V, Pr, τ, η, Δτ, Δ), update_V! => (V, rV, dt)); exchangers = exchangers.velocity, boundary_conditions = boundary_conditions.velocity)
update_V!(backend, 256, (nx+1, ny+1, nz+1))(V, Pr, τ, η, Δτ, Δ)
set_bcs!(model.boundary_conditions.velocity)
# rheology
Expand Down
48 changes: 48 additions & 0 deletions src/architecture.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
module Architecture

export AbstractArchitecture

export SingleDeviceArchitecture

export launch!, set_device!, heuristic_groupsize

using FastIce.Grids

using KernelAbstractions
import KernelAbstractions.Kernel

abstract type AbstractArchitecture end

set_device!(arch::AbstractArchitecture) = set_device!(device(arch))

heuristic_groupsize(arch::AbstractArchitecture) = heuristic_groupsize(device(arch))

struct SingleDeviceArchitecture{B,D} <: AbstractArchitecture
backend::B
device::D
end

set_device!(::SingleDeviceArchitecture{CPU}) = nothing

heuristic_groupsize(::SingleDeviceArchitecture{CPU}) = 256

device(arch::SingleDeviceArchitecture) = arch.device

function launch!(arch::SingleDeviceArchitecture, grid::CartesianGrid, kernel::Pair{Kernel,Args}; kwargs...) where {Args}
worksize = size(grid, Vertex())
launch!(arch, worksize, kernel; kwargs...)
end

function launch!(arch::SingleDeviceArchitecture, worksize::NTuple{N,Int}, kernel::Pair{Kernel,Args}; boundary_conditions=nothing, async=true) where {N,Args}
fun, args = kernel

groupsize = heuristic_groupsize(device(arch))

fun(arch.backend, groupsize, worksize)(args...)
isnothing(boundary_conditions) || apply_boundary_conditions!(boundary_conditions)

async || synchronize(arch.backend)
return
end

end

0 comments on commit 5c79fdd

Please sign in to comment.