diff --git a/lib/reactor/executor.ex b/lib/reactor/executor.ex index 9b2ba1f..362db68 100644 --- a/lib/reactor/executor.ex +++ b/lib/reactor/executor.ex @@ -76,6 +76,7 @@ defmodule Reactor.Executor do {:continue, ready_steps} <- find_ready_steps(reactor), {:continue, reactor, state} <- start_ready_async_steps(reactor, state, ready_steps), {:continue, reactor, state} <- run_ready_sync_step(reactor, state, ready_steps), + {:continue, reactor, state} <- maybe_run_any_step_sync(reactor, state, ready_steps), {:continue, reactor} <- all_done(reactor) do execute(reactor, subtract_iteration(state)) else @@ -132,10 +133,6 @@ defmodule Reactor.Executor do defp start_ready_async_steps(reactor, state, []), do: {:continue, reactor, state} - defp start_ready_async_steps(reactor, state, _steps) - when map_size(state.current_tasks) == state.max_concurrency, - do: {:continue, reactor, state} - defp start_ready_async_steps(reactor, state, steps) do steps = Enum.filter(steps, &(&1.async? == true)) @@ -154,6 +151,53 @@ defmodule Reactor.Executor do Executor.Sync.run(reactor, state, step) end + # This seems a little unintuitive, but this is what allows reactors who are + # sharing a concurrency pool to move forward even then there's no concurrency + # left without deadlocking. + # + # It's a complicated scenario, so let's lay out the pieces: + # + # 1. When a new reactor is started it allocates a concurrency pool using + # `Reactor.Executor.ConcurrencyTracker` **unless** it is explicitly passed + # a `concurrency_key` option. + # 2. Every time a reactor runs an async step it starts a `Task` and consumes a + # space in the concurrency pool (if possible). + # 3. Every task that is started has it's concurrency key stored in it's + # process dictionary (actually a stack of them because we may be multiple + # nested reactors deep). + # 4. If that async step then turns around and runs a new reactor with shared + # concurrency then that reactor is already consuming a concurrency slot and + # may not be able to allocate any more slots for it's tasks. + # + # This situation can lead to a deadlock where we have multiple reactors all in + # a tight loop trying to start tasks but none of them able to proceed. + # + # We detect this situation by: + # + # 1. We are unable to start any async steps (`start_ready_async_steps/3` + # returns `:continue`). + # 2. We are unable to start any sync steps (`run_ready_sync_step/3` returns + # `:continue`). + # 3. We have any steps which can be run (ie async ones which we couldn't + # start). + # 4. Our concurrency key is in the process dictionary. + # + # If all four of these conditions are met we pick the first step and run it + # synchronously. This is fine because the reactor process itself is a task in + # another reactor so in effect is still running asynchronously. + defp maybe_run_any_step_sync(reactor, state, []), do: {:continue, reactor, state} + + defp maybe_run_any_step_sync(reactor, state, [step | _]) do + :__reactor__ + |> Process.get([]) + |> Enum.any?(&(&1.concurrency_key == state.concurrency_key)) + |> if do + Executor.Sync.run(reactor, state, step) + else + {:continue, reactor, state} + end + end + defp subtract_iteration(state) when state.max_iterations == :infinity, do: state defp subtract_iteration(state) when state.max_iterations > 0, diff --git a/lib/reactor/executor/async.ex b/lib/reactor/executor/async.ex index e082cbc..97f21f2 100644 --- a/lib/reactor/executor/async.ex +++ b/lib/reactor/executor/async.ex @@ -24,18 +24,14 @@ defmodule Reactor.Executor.Async do def start_steps(reactor, state, [], _supervisor), do: {:continue, reactor, state} def start_steps(reactor, state, steps, supervisor) do - available_concurrency = state.max_concurrency - map_size(state.current_tasks) + available_steps = length(steps) - start_steps(reactor, state, steps, supervisor, available_concurrency) - end - - defp start_steps(reactor, state, _steps, _supervisor, 0), do: {:continue, reactor, state} + locked_concurrency = + acquire_concurrency_resource_from_pool(state.concurrency_key, available_steps) - defp start_steps(reactor, state, steps, supervisor, available_concurrency) do started = steps - |> Enum.take(available_concurrency) - |> Enum.take_while(&acquire_concurrency_resource_from_pool(state.concurrency_key, &1)) + |> Enum.take(locked_concurrency) |> Enum.reduce_while(%{}, fn step, started -> case start_task_for_step(reactor, state, step, supervisor, state.concurrency_key) do {:ok, task} -> {:cont, Map.put(started, task, step)} @@ -362,17 +358,12 @@ defmodule Reactor.Executor.Async do %{reactor | steps: Enum.concat(steps, reactor.steps)} end - defp release_concurrency_resources_to_pool(_pool_key, 0), do: :ok - - defp release_concurrency_resources_to_pool(pool_key, n) when n > 0 do - ConcurrencyTracker.release(pool_key) - release_concurrency_resources_to_pool(pool_key, n - 1) + defp release_concurrency_resources_to_pool(pool_key, how_many) do + ConcurrencyTracker.release(pool_key, how_many) end - defp acquire_concurrency_resource_from_pool(pool_key, _) do - case ConcurrencyTracker.acquire(pool_key) do - :ok -> true - :error -> false - end + defp acquire_concurrency_resource_from_pool(pool_key, requested) do + {:ok, actual} = ConcurrencyTracker.acquire(pool_key, requested) + actual end end diff --git a/lib/reactor/executor/concurrency_tracker.ex b/lib/reactor/executor/concurrency_tracker.ex index bfaa4e1..f3d0ca8 100644 --- a/lib/reactor/executor/concurrency_tracker.ex +++ b/lib/reactor/executor/concurrency_tracker.ex @@ -8,12 +8,20 @@ defmodule Reactor.Executor.ConcurrencyTracker do This avoids nested Reactors spawning too many workers and thrashing the system. + + The process calling `allocate_pool/1` is monitored, and when it terminates + it's allocation is removed. Any processes which are using that pool will + not be able to allocate any new resources. """ use GenServer @type pool_key :: reference() + @type record :: + {pool_key, concurrency_limit :: pos_integer, available_slots :: non_neg_integer(), + allocator :: pid} + @doc false @spec start_link(any) :: GenServer.on_start() def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__) @@ -65,35 +73,63 @@ defmodule Reactor.Executor.ConcurrencyTracker do end @doc """ - Release a concurrency allocation back to the pool. + Release concurrency allocation back to the pool. """ - @spec release(pool_key) :: :ok - def release(key) do - :ets.select_replace(__MODULE__, [ - {{:"$1", :"$2", :"$3", :"$4"}, - [{:andalso, {:"=<", {:+, :"$2", 1}, :"$3"}, {:==, :"$1", key}}], - [{{:"$1", {:+, :"$2", 1}, :"$3", :"$4"}}]} - ]) - - :ok + @spec release(pool_key, how_many :: pos_integer) :: :ok | :error + def release(key, how_many \\ 1) do + # generated using: + # + # :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner} + # when key == :key and concurrency_available + 1 <= concurrency_limit -> + # {key, concurrency_limit, concurrency_available + 1, owner} + # end) + # + # and replacing `:key` with the provided key. + + Enum.reduce_while(1..how_many, :ok, fn _, :ok -> + case :ets.select_replace(__MODULE__, [ + {{:"$1", :"$2", :"$3", :"$4"}, + [{:andalso, {:==, :"$1", key}, {:"=<", {:+, :"$3", 1}, :"$2"}}], + [{{:"$1", :"$2", {:+, :"$3", 1}, :"$4"}}]} + ]) do + 0 -> {:halt, :error} + 1 -> {:cont, :ok} + end + end) end @doc """ - Attempt to acquire a concurrency allocation from the pool. + Attempt to acquire a number of concurrency allocations from the pool. - Returns `:ok` if the allocation was successful, otherwise `:error`. + Returns `{:ok, n}` where `n` was the number of slots that were actually + allocated. It's important to note that whilst you may request `16` slots, if + there is only `3` available, then this function will return `{:ok, 3}` and you + must abide by it. + + It is possible for this function to return `{:ok, 0}` if there is no slots + available. """ - @spec acquire(pool_key) :: :ok | :error - def acquire(key) do - __MODULE__ - |> :ets.select_replace([ - {{:"$1", :"$2", :"$3", :"$4"}, [{:andalso, {:>=, {:-, :"$2", 1}, 0}, {:==, :"$1", key}}], - [{{:"$1", {:-, :"$2", 1}, :"$3", :"$4"}}]} - ]) - |> case do - 0 -> :error - 1 -> :ok - end + @spec acquire(pool_key, how_many :: pos_integer()) :: {:ok, non_neg_integer()} + def acquire(key, how_many \\ 1) do + # generated using: + # + # :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner} + # when key == :key and concurrency_available - 1 >= 0 -> + # {key, concurrency_limit, concurrency_available - 1, owner} + # end) + # + # and replacing `:key` with the provided key. + + Enum.reduce_while(1..how_many, {:ok, 0}, fn _, {:ok, n} -> + case :ets.select_replace(__MODULE__, [ + {{:"$1", :"$2", :"$3", :"$4"}, + [{:andalso, {:==, :"$1", key}, {:>=, {:-, :"$3", 1}, 0}}], + [{{:"$1", :"$2", {:-, :"$3", 1}, :"$4"}}]} + ]) do + 0 -> {:halt, {:ok, n}} + 1 -> {:cont, {:ok, n + 1}} + end + end) end @doc """ @@ -105,7 +141,7 @@ defmodule Reactor.Executor.ConcurrencyTracker do __MODULE__ |> :ets.lookup(key) |> case do - [{_, available, limit, _}] -> {:ok, available, limit} + [{_, limit, available, _}] -> {:ok, available, limit} [] -> {:error, "Unknown concurrency pool"} end end diff --git a/lib/reactor/executor/step_runner.ex b/lib/reactor/executor/step_runner.ex index 3c819ec..541d31f 100644 --- a/lib/reactor/executor/step_runner.ex +++ b/lib/reactor/executor/step_runner.ex @@ -19,8 +19,20 @@ defmodule Reactor.Executor.StepRunner do {module, options} <- module_and_opts(step), {:ok, context} <- build_context(reactor, state, step, concurrency_key), {:ok, arguments} <- maybe_replace_arguments(arguments, context) do - do_run(module, options, arguments, context) + metadata = %{ + current_step: step, + pid: self(), + reactor: reactor, + concurrency_key: concurrency_key + } + + metadata_stack = Process.get(:__reactor__, []) + Process.put(:__reactor__, [metadata | metadata_stack]) + result = do_run(module, options, arguments, context) + Process.put(:__reactor__, metadata_stack) + result end + after end @doc """ diff --git a/test/reactor/executor/async_test.exs b/test/reactor/executor/async_test.exs index 6aadf4c..0b2c802 100644 --- a/test/reactor/executor/async_test.exs +++ b/test/reactor/executor/async_test.exs @@ -33,13 +33,6 @@ defmodule Reactor.Executor.AsyncTest do assert {:continue, ^reactor, ^state} = start_steps(reactor, state, []) end - test "when there is no available concurrency slots, it tells the reactor to continue", - %{reactor: reactor, state: state, doable: doable} do - state = %{state | max_concurrency: 1, current_tasks: %{nil => nil}} - - assert {:continue, ^reactor, ^state} = start_steps(reactor, state, [doable]) - end - test "when steps are started, it stores them in the state", %{reactor: reactor, state: state, doable: doable, supervisor: supervisor} do assert {_, _reactor, state} = start_steps(reactor, state, [doable], supervisor) diff --git a/test/reactor/executor/concurrency_tracker_test.exs b/test/reactor/executor/concurrency_tracker_test.exs index c4835d8..026ce8f 100644 --- a/test/reactor/executor/concurrency_tracker_test.exs +++ b/test/reactor/executor/concurrency_tracker_test.exs @@ -29,18 +29,18 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do describe "acquire/1" do test "when there is available concurrency in the pool, it returns ok" do pool = allocate_pool(16) - assert :ok = acquire(pool) + assert {:ok, 1} = acquire(pool) assert {:ok, 15, 16} = status(pool) end - test "when there is no available concurrency in the pool, it returns error" do + test "when there is no available concurrency in the pool, it returns zero" do pool = allocate_pool(0) - assert :error = acquire(pool) + assert {:ok, 0} = acquire(pool) end test "when there is 1 slot left, it can be acquired" do pool = allocate_pool(1) - assert :ok = acquire(pool) + assert {:ok, 1} = acquire(pool) assert {:ok, 0, 1} = status(pool) end end @@ -48,7 +48,7 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do describe "release/1" do test "it increments the available concurrency in the pool when possible" do pool = allocate_pool(16) - :ok = acquire(pool) + {:ok, 1} = acquire(pool) assert {:ok, 15, 16} = status(pool) assert :ok = release(pool) assert {:ok, 16, 16} = status(pool) @@ -57,7 +57,7 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do test "it doesn't allow the pool to grow" do pool = allocate_pool(16) assert {:ok, 16, 16} = status(pool) - assert :ok = release(pool) + assert :error = release(pool) assert {:ok, 16, 16} = status(pool) end end diff --git a/test/reactor/executor_test.exs b/test/reactor/executor_test.exs index 804d689..c0f559e 100644 --- a/test/reactor/executor_test.exs +++ b/test/reactor/executor_test.exs @@ -424,5 +424,117 @@ defmodule Reactor.ExecutorTest do assert elapsed >= 200 assert elapsed < 300 end + + test "reactors running inside async steps with shared concurrency don't cause a deadlock" do + defmodule MaybeDeadlockedReactor.Inner do + @moduledoc false + use Reactor + + step :sleep do + run(fn _ -> + Process.sleep(100) + {:ok, nil} + end) + end + end + + defmodule MaybeDeadlockedReactor do + @moduledoc false + use Reactor + + for i <- 0..16 do + step :"sleep_#{i}" do + run(fn _, context -> + Reactor.run( + MaybeDeadlockedReactor.Inner, + %{}, + %{}, + concurrency_key: context.concurrency_key + ) + end) + end + end + end + + Reactor.run(MaybeDeadlockedReactor, %{}, %{}, max_concurrency: 8) + end + + test "lots of reactors sharing a concurrency key do not deadlock" do + defmodule OversharingReactor do + @moduledoc false + use Reactor + + step :sleep do + run fn _ -> + Process.sleep(100) + {:ok, nil} + end + end + end + + pool = ConcurrencyTracker.allocate_pool(8) + + 0..16 + |> Enum.map(fn _ -> + Task.async(fn -> + Reactor.run(OversharingReactor, %{}, %{}, concurrency_key: pool) + end) + end) + |> Enum.map(&Task.await/1) + end + + test "Zach's hunch" do + defmodule GrandchildReactor do + @moduledoc false + use Reactor + + step :sleep do + run fn _ -> + Process.sleep(1000) + {:ok, nil} + end + end + end + + defmodule ChildReactor do + @moduledoc false + use Reactor + + step :splode do + run fn _, context -> + 0..16 + |> Enum.map(fn _ -> + Task.async(fn -> + Reactor.run(GrandchildReactor, %{}, %{}, concurrency_key: context.concurrency_key) + end) + end) + |> Enum.map(&Task.await/1) + + {:ok, nil} + end + end + end + + defmodule ParentReactor do + @moduledoc false + use Reactor + + step :splode do + run fn _ -> + pool = ConcurrencyTracker.allocate_pool(16) + + 0..16 + |> Enum.map(fn _ -> + Task.async(fn -> Reactor.run(ChildReactor, %{}, %{}, concurrency_key: pool) end) + end) + |> Enum.map(&Task.await/1) + + {:ok, nil} + end + end + end + end + + Reactor.run(ParentReactor) end end