Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for marking nodes as unavailable. #14

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions lib/herd/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@ defmodule Herd.Cluster do
"""
require Logger

defmodule State do
defstruct table: nil, marked_nodes: MapSet.new(), restored_nodes: []
end

defmacro __using__(opts) do
app = Keyword.get(opts, :otp_app)
health_check = Keyword.get(opts, :health_check, 60_000)
router = Keyword.get(opts, :router, Herd.Router.HashRing)
discovery = Keyword.get(opts, :discovery)
pool = Keyword.get(opts, :pool)
table_name = :"#{app}_servers"
mark_duration = Keyword.get(opts, :mark_duration, 30_000)
mark_lookback = Keyword.get(opts, :mark_lookback, 300_000)
max_backoff = Keyword.get(opts, :max_backoff, 300_000)

quote do
use GenServer
Expand All @@ -32,6 +39,9 @@ defmodule Herd.Cluster do
@otp unquote(app)
@table_name unquote(table_name)
@health_check unquote(health_check)
@mark_duration unquote(mark_duration)
@mark_lookback unquote(mark_lookback)
@max_backoff unquote(max_backoff)
@router unquote(router)
@discovery unquote(discovery)
@pool unquote(pool)
Expand All @@ -53,7 +63,7 @@ defmodule Herd.Cluster do
@pool.initialize(servers)
schedule_healthcheck()

{:ok, table}
{:ok, %State{table: table}}
end

def servers(), do: servers(@table_name, @router)
Expand All @@ -62,16 +72,59 @@ defmodule Herd.Cluster do

def get_nodes(keys), do: get_nodes(@table_name, @router, keys)

def handle_info(:health_check, table) do
def mark_node(node), do: GenServer.call(__MODULE__, {:mark_node, node})

def handle_call({:mark_node, node}, _from, state) do
marked_nodes = MapSet.put(state.marked_nodes, node)
remove_node(state.table, node)

restored_nodes = discard_expired(state.restored_nodes)
count = restored_nodes |> Enum.filter(fn {n, _} -> n == node end) |> length()
delay = min(round(@mark_duration * :math.pow(2, count)), @max_backoff)
schedule_restore_node(node, delay)

Logger.info "Marked #{inspect(node)} as unavailable for #{delay}ms"
{:reply, :ok, %State{state | marked_nodes: marked_nodes, restored_nodes: restored_nodes}}
end

def handle_info(:health_check, state) do
schedule_healthcheck()
health_check(table, @router, @pool, @discovery)
health_check(state.table, @router, @pool, @discovery, state.marked_nodes)
{:noreply, state}
end

def handle_info({:restore_node, node}, state) do
marked_nodes = MapSet.delete(state.marked_nodes, node)
restored_nodes = [{node, DateTime.utc_now()} | discard_expired(state.restored_nodes)]
{:noreply, %State{state | marked_nodes: marked_nodes, restored_nodes: restored_nodes}}
end

defp get_router(), do: get_router(@table_name)

defp schedule_healthcheck() do
Process.send_after(self(), :health_check, @health_check)
end

defp schedule_restore_node(node, delay) do
Process.send_after(self(), {:restore_node, node}, delay)
end

defp discard_expired(restored_nodes) do
now = DateTime.utc_now()
Enum.filter(restored_nodes, fn {_, t} -> DateTime.diff(now, t, :millisecond) > @mark_lookback end)
end

defp remove_node(table, node) do
{:ok, lb} = get_router(table)
lb = @router.remove_nodes(lb, [node])
nodes = @router.nodes(lb) |> MapSet.new() |> MapSet.delete(node)

# don't completely drain the cluster
if MapSet.size(nodes) > 0 do
:ets.insert(table, {:lb, lb})
@pool.handle_diff([], [node])
end
end
end
end

Expand All @@ -83,8 +136,8 @@ defmodule Herd.Cluster do
with {:ok, lb} <- get_router(table), do: router.get_nodes(lb, keys)
end

def health_check(table, router, pool, discovery) do
servers = discovery.nodes()
def health_check(table, router, pool, discovery, marked_nodes) do
servers = discovery.nodes() |> MapSet.new() |> MapSet.difference(marked_nodes) |> MapSet.to_list()
do_health_check(table, router, pool, servers)
end

Expand Down Expand Up @@ -124,6 +177,5 @@ defmodule Herd.Cluster do

:ets.insert(table, {:lb, lb})
pool.handle_diff(add, remove)
{:noreply, table}
end
end
26 changes: 26 additions & 0 deletions test/herd_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule HerdTest do
setup do
node = {"localhost", 123}
:ok = MockDiscovery.update([node])
clear_marked()
send_health_check()
:ok
end
Expand Down Expand Up @@ -84,6 +85,27 @@ defmodule HerdTest do
verify_nodes_present(nodes)
end

test "It will exclude marked nodes" do
nodes = [{"localhost", 567}, {"localhost", 234}]
:ok = MockDiscovery.update(nodes)
send_health_check()

verify_nodes_equal(nodes)
verify_nodes_present(nodes)

MockCluster.mark_node({"localhost", 567})
verify_nodes_equal([{"localhost", 234}])
verify_nodes_present([{"localhost", 234}])

send_health_check()
verify_nodes_equal([{"localhost", 234}])
verify_nodes_present([{"localhost", 234}])

MockCluster.mark_node({"localhost", 234})
verify_nodes_equal([{"localhost", 234}])
verify_nodes_present([{"localhost", 234}])
end

defp verify_nodes_equal(nodes) do
servers = MockCluster.servers()
assert MapSet.equal?(MapSet.new(servers), MapSet.new(nodes))
Expand All @@ -98,6 +120,10 @@ defmodule HerdTest do
end
end

defp clear_marked() do
GenServer.call(MockCluster, :clear_marked)
end

defp send_health_check() do
send(MockCluster, :health_check)
:timer.sleep(100)
Expand Down
10 changes: 9 additions & 1 deletion test/support/mock_herd.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ defmodule Herd.MockCluster do
pool: Herd.MockPool,
discovery: Herd.MockDiscovery,
router: Herd.Router.HashRing # defaults to Herd.Router.HashRing

def clear_marked do
GenServer.call(__MODULE__, :clear_marked)
end

def handle_call(:clear_marked, _from, state) do
{:reply, :ok, %Herd.Cluster.State{state | marked_nodes: MapSet.new(), restored_nodes: []}}
end
end

defmodule Herd.MockPool do
Expand All @@ -22,4 +30,4 @@ defmodule Herd.MockWorker do
def init(name), do: {:ok, name}

def handle_call(:name, _, name), do: {:reply, name, name}
end
end