Skip to content

Commit

Permalink
fix: don't deadlock when lots of async reactors are sharing a concurr…
Browse files Browse the repository at this point in the history
…ency pool. (#36)
  • Loading branch information
jimsynz authored Sep 27, 2023
1 parent 9220ba3 commit 4a57a92
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 60 deletions.
52 changes: 48 additions & 4 deletions lib/reactor/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ defmodule Reactor.Executor do
{:continue, ready_steps} <- find_ready_steps(reactor),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state, ready_steps),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state, ready_steps),
{:continue, reactor, state} <- maybe_run_any_step_sync(reactor, state, ready_steps),
{:continue, reactor} <- all_done(reactor) do
execute(reactor, subtract_iteration(state))
else
Expand Down Expand Up @@ -132,10 +133,6 @@ defmodule Reactor.Executor do

defp start_ready_async_steps(reactor, state, []), do: {:continue, reactor, state}

defp start_ready_async_steps(reactor, state, _steps)
when map_size(state.current_tasks) == state.max_concurrency,
do: {:continue, reactor, state}

defp start_ready_async_steps(reactor, state, steps) do
steps = Enum.filter(steps, &(&1.async? == true))

Expand All @@ -154,6 +151,53 @@ defmodule Reactor.Executor do
Executor.Sync.run(reactor, state, step)
end

# This seems a little unintuitive, but this is what allows reactors who are
# sharing a concurrency pool to move forward even then there's no concurrency
# left without deadlocking.
#
# It's a complicated scenario, so let's lay out the pieces:
#
# 1. When a new reactor is started it allocates a concurrency pool using
# `Reactor.Executor.ConcurrencyTracker` **unless** it is explicitly passed
# a `concurrency_key` option.
# 2. Every time a reactor runs an async step it starts a `Task` and consumes a
# space in the concurrency pool (if possible).
# 3. Every task that is started has it's concurrency key stored in it's
# process dictionary (actually a stack of them because we may be multiple
# nested reactors deep).
# 4. If that async step then turns around and runs a new reactor with shared
# concurrency then that reactor is already consuming a concurrency slot and
# may not be able to allocate any more slots for it's tasks.
#
# This situation can lead to a deadlock where we have multiple reactors all in
# a tight loop trying to start tasks but none of them able to proceed.
#
# We detect this situation by:
#
# 1. We are unable to start any async steps (`start_ready_async_steps/3`
# returns `:continue`).
# 2. We are unable to start any sync steps (`run_ready_sync_step/3` returns
# `:continue`).
# 3. We have any steps which can be run (ie async ones which we couldn't
# start).
# 4. Our concurrency key is in the process dictionary.
#
# If all four of these conditions are met we pick the first step and run it
# synchronously. This is fine because the reactor process itself is a task in
# another reactor so in effect is still running asynchronously.
defp maybe_run_any_step_sync(reactor, state, []), do: {:continue, reactor, state}

defp maybe_run_any_step_sync(reactor, state, [step | _]) do
:__reactor__
|> Process.get([])
|> Enum.any?(&(&1.concurrency_key == state.concurrency_key))
|> if do
Executor.Sync.run(reactor, state, step)
else
{:continue, reactor, state}
end
end

defp subtract_iteration(state) when state.max_iterations == :infinity, do: state

defp subtract_iteration(state) when state.max_iterations > 0,
Expand Down
27 changes: 9 additions & 18 deletions lib/reactor/executor/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@ defmodule Reactor.Executor.Async do
def start_steps(reactor, state, [], _supervisor), do: {:continue, reactor, state}

def start_steps(reactor, state, steps, supervisor) do
available_concurrency = state.max_concurrency - map_size(state.current_tasks)
available_steps = length(steps)

start_steps(reactor, state, steps, supervisor, available_concurrency)
end

defp start_steps(reactor, state, _steps, _supervisor, 0), do: {:continue, reactor, state}
locked_concurrency =
acquire_concurrency_resource_from_pool(state.concurrency_key, available_steps)

defp start_steps(reactor, state, steps, supervisor, available_concurrency) do
started =
steps
|> Enum.take(available_concurrency)
|> Enum.take_while(&acquire_concurrency_resource_from_pool(state.concurrency_key, &1))
|> Enum.take(locked_concurrency)
|> Enum.reduce_while(%{}, fn step, started ->
case start_task_for_step(reactor, state, step, supervisor, state.concurrency_key) do
{:ok, task} -> {:cont, Map.put(started, task, step)}
Expand Down Expand Up @@ -362,17 +358,12 @@ defmodule Reactor.Executor.Async do
%{reactor | steps: Enum.concat(steps, reactor.steps)}
end

defp release_concurrency_resources_to_pool(_pool_key, 0), do: :ok

defp release_concurrency_resources_to_pool(pool_key, n) when n > 0 do
ConcurrencyTracker.release(pool_key)
release_concurrency_resources_to_pool(pool_key, n - 1)
defp release_concurrency_resources_to_pool(pool_key, how_many) do
ConcurrencyTracker.release(pool_key, how_many)
end

defp acquire_concurrency_resource_from_pool(pool_key, _) do
case ConcurrencyTracker.acquire(pool_key) do
:ok -> true
:error -> false
end
defp acquire_concurrency_resource_from_pool(pool_key, requested) do
{:ok, actual} = ConcurrencyTracker.acquire(pool_key, requested)
actual
end
end
84 changes: 60 additions & 24 deletions lib/reactor/executor/concurrency_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@ defmodule Reactor.Executor.ConcurrencyTracker do
This avoids nested Reactors spawning too many workers and thrashing the
system.
The process calling `allocate_pool/1` is monitored, and when it terminates
it's allocation is removed. Any processes which are using that pool will
not be able to allocate any new resources.
"""

use GenServer

@type pool_key :: reference()

@type record ::
{pool_key, concurrency_limit :: pos_integer, available_slots :: non_neg_integer(),
allocator :: pid}

@doc false
@spec start_link(any) :: GenServer.on_start()
def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
Expand Down Expand Up @@ -65,35 +73,63 @@ defmodule Reactor.Executor.ConcurrencyTracker do
end

@doc """
Release a concurrency allocation back to the pool.
Release concurrency allocation back to the pool.
"""
@spec release(pool_key) :: :ok
def release(key) do
:ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:"=<", {:+, :"$2", 1}, :"$3"}, {:==, :"$1", key}}],
[{{:"$1", {:+, :"$2", 1}, :"$3", :"$4"}}]}
])

:ok
@spec release(pool_key, how_many :: pos_integer) :: :ok | :error
def release(key, how_many \\ 1) do
# generated using:
#
# :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner}
# when key == :key and concurrency_available + 1 <= concurrency_limit ->
# {key, concurrency_limit, concurrency_available + 1, owner}
# end)
#
# and replacing `:key` with the provided key.

Enum.reduce_while(1..how_many, :ok, fn _, :ok ->
case :ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:==, :"$1", key}, {:"=<", {:+, :"$3", 1}, :"$2"}}],
[{{:"$1", :"$2", {:+, :"$3", 1}, :"$4"}}]}
]) do
0 -> {:halt, :error}
1 -> {:cont, :ok}
end
end)
end

@doc """
Attempt to acquire a concurrency allocation from the pool.
Attempt to acquire a number of concurrency allocations from the pool.
Returns `:ok` if the allocation was successful, otherwise `:error`.
Returns `{:ok, n}` where `n` was the number of slots that were actually
allocated. It's important to note that whilst you may request `16` slots, if
there is only `3` available, then this function will return `{:ok, 3}` and you
must abide by it.
It is possible for this function to return `{:ok, 0}` if there is no slots
available.
"""
@spec acquire(pool_key) :: :ok | :error
def acquire(key) do
__MODULE__
|> :ets.select_replace([
{{:"$1", :"$2", :"$3", :"$4"}, [{:andalso, {:>=, {:-, :"$2", 1}, 0}, {:==, :"$1", key}}],
[{{:"$1", {:-, :"$2", 1}, :"$3", :"$4"}}]}
])
|> case do
0 -> :error
1 -> :ok
end
@spec acquire(pool_key, how_many :: pos_integer()) :: {:ok, non_neg_integer()}
def acquire(key, how_many \\ 1) do
# generated using:
#
# :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner}
# when key == :key and concurrency_available - 1 >= 0 ->
# {key, concurrency_limit, concurrency_available - 1, owner}
# end)
#
# and replacing `:key` with the provided key.

Enum.reduce_while(1..how_many, {:ok, 0}, fn _, {:ok, n} ->
case :ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:==, :"$1", key}, {:>=, {:-, :"$3", 1}, 0}}],
[{{:"$1", :"$2", {:-, :"$3", 1}, :"$4"}}]}
]) do
0 -> {:halt, {:ok, n}}
1 -> {:cont, {:ok, n + 1}}
end
end)
end

@doc """
Expand All @@ -105,7 +141,7 @@ defmodule Reactor.Executor.ConcurrencyTracker do
__MODULE__
|> :ets.lookup(key)
|> case do
[{_, available, limit, _}] -> {:ok, available, limit}
[{_, limit, available, _}] -> {:ok, available, limit}
[] -> {:error, "Unknown concurrency pool"}
end
end
Expand Down
14 changes: 13 additions & 1 deletion lib/reactor/executor/step_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ defmodule Reactor.Executor.StepRunner do
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
do_run(module, options, arguments, context)
metadata = %{
current_step: step,
pid: self(),
reactor: reactor,
concurrency_key: concurrency_key
}

metadata_stack = Process.get(:__reactor__, [])
Process.put(:__reactor__, [metadata | metadata_stack])
result = do_run(module, options, arguments, context)
Process.put(:__reactor__, metadata_stack)
result
end
after
end

@doc """
Expand Down
7 changes: 0 additions & 7 deletions test/reactor/executor/async_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ defmodule Reactor.Executor.AsyncTest do
assert {:continue, ^reactor, ^state} = start_steps(reactor, state, [])
end

test "when there is no available concurrency slots, it tells the reactor to continue",
%{reactor: reactor, state: state, doable: doable} do
state = %{state | max_concurrency: 1, current_tasks: %{nil => nil}}

assert {:continue, ^reactor, ^state} = start_steps(reactor, state, [doable])
end

test "when steps are started, it stores them in the state",
%{reactor: reactor, state: state, doable: doable, supervisor: supervisor} do
assert {_, _reactor, state} = start_steps(reactor, state, [doable], supervisor)
Expand Down
12 changes: 6 additions & 6 deletions test/reactor/executor/concurrency_tracker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do
describe "acquire/1" do
test "when there is available concurrency in the pool, it returns ok" do
pool = allocate_pool(16)
assert :ok = acquire(pool)
assert {:ok, 1} = acquire(pool)
assert {:ok, 15, 16} = status(pool)
end

test "when there is no available concurrency in the pool, it returns error" do
test "when there is no available concurrency in the pool, it returns zero" do
pool = allocate_pool(0)
assert :error = acquire(pool)
assert {:ok, 0} = acquire(pool)
end

test "when there is 1 slot left, it can be acquired" do
pool = allocate_pool(1)
assert :ok = acquire(pool)
assert {:ok, 1} = acquire(pool)
assert {:ok, 0, 1} = status(pool)
end
end

describe "release/1" do
test "it increments the available concurrency in the pool when possible" do
pool = allocate_pool(16)
:ok = acquire(pool)
{:ok, 1} = acquire(pool)
assert {:ok, 15, 16} = status(pool)
assert :ok = release(pool)
assert {:ok, 16, 16} = status(pool)
Expand All @@ -57,7 +57,7 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do
test "it doesn't allow the pool to grow" do
pool = allocate_pool(16)
assert {:ok, 16, 16} = status(pool)
assert :ok = release(pool)
assert :error = release(pool)
assert {:ok, 16, 16} = status(pool)
end
end
Expand Down
Loading

0 comments on commit 4a57a92

Please sign in to comment.