From 6471fccd87966ef0e1121578c7a7dd7930041fb3 Mon Sep 17 00:00:00 2001 From: Adam Vaughan Date: Thu, 30 Sep 2021 09:45:09 -0600 Subject: [PATCH] Add support for marking nodes as unavailable. Marking a node as unavailable will temporarily remove it from the cluster and add it back after a delay. If the node continues to fail the delay before adding it back will increase. --- lib/herd/cluster.ex | 64 +++++++++++++++++++++++++++++++++++---- test/herd_test.exs | 26 ++++++++++++++++ test/support/mock_herd.ex | 10 +++++- 3 files changed, 93 insertions(+), 7 deletions(-) diff --git a/lib/herd/cluster.ex b/lib/herd/cluster.ex index 112e107..87bf52b 100644 --- a/lib/herd/cluster.ex +++ b/lib/herd/cluster.ex @@ -16,6 +16,10 @@ defmodule Herd.Cluster do """ require Logger + defmodule State do + defstruct table: nil, marked_nodes: MapSet.new(), restored_nodes: [] + end + defmacro __using__(opts) do app = Keyword.get(opts, :otp_app) health_check = Keyword.get(opts, :health_check, 60_000) @@ -23,6 +27,9 @@ defmodule Herd.Cluster do discovery = Keyword.get(opts, :discovery) pool = Keyword.get(opts, :pool) table_name = :"#{app}_servers" + mark_duration = Keyword.get(opts, :mark_duration, 30_000) + mark_lookback = Keyword.get(opts, :mark_lookback, 300_000) + max_backoff = Keyword.get(opts, :max_backoff, 300_000) quote do use GenServer @@ -32,6 +39,9 @@ defmodule Herd.Cluster do @otp unquote(app) @table_name unquote(table_name) @health_check unquote(health_check) + @mark_duration unquote(mark_duration) + @mark_lookback unquote(mark_lookback) + @max_backoff unquote(max_backoff) @router unquote(router) @discovery unquote(discovery) @pool unquote(pool) @@ -53,7 +63,7 @@ defmodule Herd.Cluster do @pool.initialize(servers) schedule_healthcheck() - {:ok, table} + {:ok, %State{table: table}} end def servers(), do: servers(@table_name, @router) @@ -62,9 +72,31 @@ defmodule Herd.Cluster do def get_nodes(keys), do: get_nodes(@table_name, @router, keys) - def handle_info(:health_check, table) do + def mark_node(node), do: GenServer.call(__MODULE__, {:mark_node, node}) + + def handle_call({:mark_node, node}, _from, state) do + marked_nodes = MapSet.put(state.marked_nodes, node) + remove_node(state.table, node) + + restored_nodes = discard_expired(state.restored_nodes) + count = restored_nodes |> Enum.filter(fn {n, _} -> n == node end) |> length() + delay = min(round(@mark_duration * :math.pow(2, count)), @max_backoff) + schedule_restore_node(node, delay) + + Logger.info "Marked #{inspect(node)} as unavailable for #{delay}ms" + {:reply, :ok, %State{state | marked_nodes: marked_nodes, restored_nodes: restored_nodes}} + end + + def handle_info(:health_check, state) do schedule_healthcheck() - health_check(table, @router, @pool, @discovery) + health_check(state.table, @router, @pool, @discovery, state.marked_nodes) + {:noreply, state} + end + + def handle_info({:restore_node, node}, state) do + marked_nodes = MapSet.delete(state.marked_nodes, node) + restored_nodes = [{node, DateTime.utc_now()} | discard_expired(state.restored_nodes)] + {:noreply, %State{state | marked_nodes: marked_nodes, restored_nodes: restored_nodes}} end defp get_router(), do: get_router(@table_name) @@ -72,6 +104,27 @@ defmodule Herd.Cluster do defp schedule_healthcheck() do Process.send_after(self(), :health_check, @health_check) end + + defp schedule_restore_node(node, delay) do + Process.send_after(self(), {:restore_node, node}, delay) + end + + defp discard_expired(restored_nodes) do + now = DateTime.utc_now() + Enum.filter(restored_nodes, fn {_, t} -> DateTime.diff(now, t, :millisecond) > @mark_lookback end) + end + + defp remove_node(table, node) do + {:ok, lb} = get_router(table) + lb = @router.remove_nodes(lb, [node]) + nodes = @router.nodes(lb) |> MapSet.new() |> MapSet.delete(node) + + # don't completely drain the cluster + if MapSet.size(nodes) > 0 do + :ets.insert(table, {:lb, lb}) + @pool.handle_diff([], [node]) + end + end end end @@ -83,8 +136,8 @@ defmodule Herd.Cluster do with {:ok, lb} <- get_router(table), do: router.get_nodes(lb, keys) end - def health_check(table, router, pool, discovery) do - servers = discovery.nodes() + def health_check(table, router, pool, discovery, marked_nodes) do + servers = discovery.nodes() |> MapSet.new() |> MapSet.difference(marked_nodes) |> MapSet.to_list() do_health_check(table, router, pool, servers) end @@ -124,6 +177,5 @@ defmodule Herd.Cluster do :ets.insert(table, {:lb, lb}) pool.handle_diff(add, remove) - {:noreply, table} end end diff --git a/test/herd_test.exs b/test/herd_test.exs index e51b2e0..4899c2f 100644 --- a/test/herd_test.exs +++ b/test/herd_test.exs @@ -10,6 +10,7 @@ defmodule HerdTest do setup do node = {"localhost", 123} :ok = MockDiscovery.update([node]) + clear_marked() send_health_check() :ok end @@ -84,6 +85,27 @@ defmodule HerdTest do verify_nodes_present(nodes) end + test "It will exclude marked nodes" do + nodes = [{"localhost", 567}, {"localhost", 234}] + :ok = MockDiscovery.update(nodes) + send_health_check() + + verify_nodes_equal(nodes) + verify_nodes_present(nodes) + + MockCluster.mark_node({"localhost", 567}) + verify_nodes_equal([{"localhost", 234}]) + verify_nodes_present([{"localhost", 234}]) + + send_health_check() + verify_nodes_equal([{"localhost", 234}]) + verify_nodes_present([{"localhost", 234}]) + + MockCluster.mark_node({"localhost", 234}) + verify_nodes_equal([{"localhost", 234}]) + verify_nodes_present([{"localhost", 234}]) + end + defp verify_nodes_equal(nodes) do servers = MockCluster.servers() assert MapSet.equal?(MapSet.new(servers), MapSet.new(nodes)) @@ -98,6 +120,10 @@ defmodule HerdTest do end end + defp clear_marked() do + GenServer.call(MockCluster, :clear_marked) + end + defp send_health_check() do send(MockCluster, :health_check) :timer.sleep(100) diff --git a/test/support/mock_herd.ex b/test/support/mock_herd.ex index 5fc8b2a..641498b 100644 --- a/test/support/mock_herd.ex +++ b/test/support/mock_herd.ex @@ -3,6 +3,14 @@ defmodule Herd.MockCluster do pool: Herd.MockPool, discovery: Herd.MockDiscovery, router: Herd.Router.HashRing # defaults to Herd.Router.HashRing + + def clear_marked do + GenServer.call(__MODULE__, :clear_marked) + end + + def handle_call(:clear_marked, _from, state) do + {:reply, :ok, %Herd.Cluster.State{state | marked_nodes: MapSet.new(), restored_nodes: []}} + end end defmodule Herd.MockPool do @@ -22,4 +30,4 @@ defmodule Herd.MockWorker do def init(name), do: {:ok, name} def handle_call(:name, _, name), do: {:reply, name, name} -end \ No newline at end of file +end