From fb110326955fe9b2cedc62ce927dc0ac66b28ede Mon Sep 17 00:00:00 2001 From: James Harton Date: Sat, 14 Oct 2023 20:53:39 +1300 Subject: [PATCH] feat(Target): Add healthy-checking HTTP targets. --- config/config.exs | 3 +- lib/wayfarer/application.ex | 9 +- lib/wayfarer/target.ex | 23 +++ lib/wayfarer/target/server.ex | 321 ++++++++++++++++++++++++++++++ lib/wayfarer/target/supervisor.ex | 22 ++ lib/wayfarer/target/table.ex | 53 +++++ test/support/http_server.ex | 33 +++ test/wayfarer/target_test.exs | 92 +++++++++ 8 files changed, 551 insertions(+), 5 deletions(-) create mode 100644 lib/wayfarer/target.ex create mode 100644 lib/wayfarer/target/server.ex create mode 100644 lib/wayfarer/target/supervisor.ex create mode 100644 lib/wayfarer/target/table.ex create mode 100644 test/support/http_server.ex create mode 100644 test/wayfarer/target_test.exs diff --git a/config/config.exs b/config/config.exs index a0641b5..2229caf 100644 --- a/config/config.exs +++ b/config/config.exs @@ -9,4 +9,5 @@ config :git_ops, manage_readme_version: "README.md" config :wayfarer, - start_listeners?: config_env() != :test + start_listener_supervisor?: config_env() != :test, + start_target_supervisor?: config_env() != :test diff --git a/lib/wayfarer/application.ex b/lib/wayfarer/application.ex index 626ba8b..8f1321b 100644 --- a/lib/wayfarer/application.ex +++ b/lib/wayfarer/application.ex @@ -7,13 +7,14 @@ defmodule Wayfarer.Application do @spec start(any, any) :: {:error, any} | {:ok, pid} def start(_type, _args) do [] - |> start_listeners?() + |> maybe_append_child(:start_target_supervisor?, Wayfarer.Target.Supervisor) + |> maybe_append_child(:start_listener_supervisor?, Wayfarer.Listener.Supervisor) |> Supervisor.start_link(strategy: :one_for_one, name: Wayfarer.Supervisor) end - defp start_listeners?(children) do - if Application.get_env(:wayfarer, :start_listeners?, true) do - Enum.concat(children, [Wayfarer.Listener.Supervisor]) + defp maybe_append_child(children, config_key, child, default \\ true) do + if Application.get_env(:wayfarer, config_key, default) do + Enum.concat(children, [child]) else children end diff --git a/lib/wayfarer/target.ex b/lib/wayfarer/target.ex new file mode 100644 index 0000000..05adfad --- /dev/null +++ b/lib/wayfarer/target.ex @@ -0,0 +1,23 @@ +defmodule Wayfarer.Target do + @moduledoc """ + Manage HTTP targets. + """ + + alias Wayfarer.Target.DynamicSupervisor, as: TargetSupervisor + alias Wayfarer.Target.Registry, as: TargetRegistry + alias Wayfarer.Target.Server + + @doc """ + Start target. + """ + @spec start_target(Server.options()) :: Supervisor.on_start_child() + def start_target(options), + do: DynamicSupervisor.start_child(TargetSupervisor, {Server, options}) + + @doc """ + Stop target + """ + @spec stop_target(:inet.socket_address(), :inet.port_number()) :: :ok + def stop_target(ip, port), + do: GenServer.stop({:via, Registry, {TargetRegistry, {ip, port}}}, :normal) +end diff --git a/lib/wayfarer/target/server.ex b/lib/wayfarer/target/server.ex new file mode 100644 index 0000000..bf03827 --- /dev/null +++ b/lib/wayfarer/target/server.ex @@ -0,0 +1,321 @@ +defmodule Wayfarer.Target.Server do + alias Mint.HTTP + alias Wayfarer.Target.Registry, as: TargetRegistry + alias Wayfarer.Target.Table + + use GenServer, restart: :transient + require Logger + + @options_schema NimbleOptions.new!( + scheme: [ + type: {:in, [:http, :https]}, + required: true, + doc: "The target's protocol" + ], + port: [ + type: {:or, [nil, :pos_integer]}, + required: true, + doc: "The target's listening port" + ], + ip: [ + type: + {:or, + [ + {:tuple, [:integer, :integer, :integer, :integer]}, + {:tuple, + [ + :integer, + :integer, + :integer, + :integer, + :integer, + :integer, + :integer, + :integer + ]}, + {:tuple, [{:in, [:local]}, :string]} + ]}, + required: true, + doc: "The target's IP address or domain socket" + ], + health_check: [ + type: :keyword_list, + required: false, + keys: [ + timeout: [ + type: :pos_integer, + required: false, + default: :timer.seconds(5), + doc: "Health check connection timeout in milliseconds" + ], + interval: [ + type: :pos_integer, + required: false, + default: :timer.seconds(5), + doc: "Health check interval in milliseconds" + ], + threshold: [ + type: :pos_integer, + required: false, + default: 3, + doc: "Success threshold" + ], + path: [ + type: :string, + required: false, + default: "/", + doc: "Health check endpoint" + ], + success_codes: [ + type: {:or, [{:struct, Range}, {:list, {:struct, Range}}]}, + required: false, + default: [200..299] + ] + ] + ] + ) + + @default_health_check @options_schema.schema + |> get_in(~w[health_check keys]a) + |> Enum.map(fn {key, opts} -> {key, opts[:default]} end) + + @wayfarer_vsn Application.spec(:wayfarer)[:vsn] + @elixir_vsn Application.spec(:elixir)[:vsn] + @erlang_vsn :erlang.system_info(:otp_release) + @mint_vsn Application.spec(:mint)[:vsn] + + @default_headers [ + {"User-Agent", + "Wayfarer/#{@wayfarer_vsn} (Elixir #{@elixir_vsn}; Erlang #{@erlang_vsn}) Mint/#{@mint_vsn}"}, + {"Connection", "close"}, + {"Accept", "*/*"} + ] + + @moduledoc """ + A GenServer which monitors the health of each target. + + ## Options + + #{NimbleOptions.docs(@options_schema)} + """ + + @type status :: :initial | :healthy | :unhealthy | :draining + @type options :: [unquote(NimbleOptions.option_typespec(@options_schema))] + @type state :: %{ + check: %{ + timeout: pos_integer(), + interval: pos_integer(), + threshold: pos_integer(), + path: Path.t(), + success_codes: [100..599] + }, + conn: HTTP.t(), + status: status, + success_count: non_neg_integer(), + target: %{ + ip: :inet.ip_address(), + port: :inet.port_number(), + scheme: :http | :https + }, + url: String.t() + } + + @doc false + @spec start_link(options) :: GenServer.on_start() + def start_link(options), do: GenServer.start_link(__MODULE__, options) + + @doc false + @impl true + @spec init(options) :: {:ok, state, {:continue, :health_check}} | {:stop, any} + def init(options) do + with {:ok, options} <- NimbleOptions.validate(options, @options_schema), + {:ok, _} <- + Registry.register( + TargetRegistry, + {options[:scheme], options[:ip], options[:port]}, + nil + ) do + target = options |> Keyword.take(~w[scheme ip port]a) |> Map.new() + + check = + options + |> Keyword.get(:health_check, @default_health_check) + |> Map.new() + |> Map.update!(:success_codes, &List.wrap/1) + + state = + %{ + target: target, + check: check, + status: :initial, + success_count: 0, + conn: nil, + url: generate_url(target.scheme, target.ip, target.port, check.path) + } + + Table.set_status(target.scheme, target.ip, target.port, :initial) + + {:ok, state, {:continue, :health_check}} + else + {:error, reason} -> + {:stop, reason} + end + end + + @doc false + @impl true + @spec handle_continue(:health_check, state) :: {:noreply, state} + def handle_continue(:health_check, state) do + state = + state + |> perform_health_check() + + {:noreply, state} + end + + @doc false + @impl true + @spec handle_info(any, state) :: {:noreply, state} + def handle_info(:health_check, state) when is_nil(state.conn) do + state = + state + |> perform_health_check() + + {:noreply, state} + end + + def handle_info(message, state) when not is_nil(state.conn) do + state = + case Mint.HTTP.stream(state.conn, message) do + {:ok, conn, responses} -> + Enum.reduce_while(responses, state, fn + {:status, _, status}, state -> + state = + state + |> drop_conn() + |> handle_status(status) + |> queue_next_check() + + {:halt, state} + + _, state -> + {:cont, %{state | conn: conn}} + end) + + {:error, _conn, error, _} -> + state + |> fail_state(error) + |> queue_next_check() + + :unknown -> + state + end + + {:noreply, state} + end + + def handle_info(_, state), do: {:noreply, state} + + defp perform_health_check(state) when is_nil(state.conn) do + address = "#{:inet.ntoa(state.target.ip)}" + + with {:ok, conn} <- HTTP.connect(state.target.scheme, address, state.target.port), + {:ok, headers} <- generate_request_headers(address, state.target.port), + {:ok, conn, _} <- HTTP.request(conn, "GET", state.check.path, headers, nil) do + %{state | conn: conn} + else + {:error, reason} -> + state + |> fail_state(reason) + |> queue_next_check() + + {:error, _conn, reason} -> + state + |> fail_state(reason) + |> queue_next_check() + end + end + + defp generate_request_headers(address, port) do + {:ok, [{"Host", "#{address}:#{port}"} | @default_headers]} + end + + defp handle_status(state, status) do + success? = Enum.any?(state.check.success_codes, &Enum.member?(&1, status)) + + if success? do + state + |> increment_counter() + |> next_success_state() + else + state + |> fail_state("HTTP Status #{status}") + end + end + + defp fail_state(state, reason) do + fail_warning(state, reason) + + state + |> drop_conn() + |> zero_counter() + |> next_fail_state() + end + + defp fail_warning(state, %{reason: reason}), do: fail_warning(state, reason) + + defp fail_warning(state, reason) when state.status not in [:unhealthy, :initial], + do: Logger.warning("Target #{state.url} is now unhealthy: #{inspect(reason)}") + + defp fail_warning(_state, _reason), do: :ok + + defp drop_conn(state) when is_nil(state.conn), do: state + + defp drop_conn(state) do + HTTP.close(state.conn) + %{state | conn: nil} + end + + defp next_fail_state(state) when state.status == :initial, do: state + + defp next_fail_state(state) when state.status != :unhealthy do + Table.set_status(state.target.scheme, state.target.ip, state.target.port, :unhealthy) + + %{state | status: :unhealthy} + end + + defp next_fail_state(state), do: state + + defp generate_url(scheme, ip, port, path) when tuple_size(ip) == 8 do + "#{scheme}://[#{:inet.ntoa(ip)}]:#{port}#{path}" + |> URI.new!() + |> to_string() + end + + defp generate_url(scheme, ip, port, path) do + "#{scheme}://#{:inet.ntoa(ip)}:#{port}#{path}" + |> URI.new!() + |> to_string() + end + + defp queue_next_check(state) do + Process.send_after(self(), :health_check, state.check.interval) + state + end + + defp zero_counter(state) when state.success_count == 0, do: state + defp zero_counter(state), do: %{state | success_count: 0} + + defp increment_counter(state) when state.success_count == state.check.threshold, do: state + defp increment_counter(state), do: %{state | success_count: state.success_count + 1} + + defp next_success_state(state) + when state.success_count == state.check.threshold and state.status != :healthy do + Logger.info("Target #{state.url} is now healthy") + Table.set_status(state.target.scheme, state.target.ip, state.target.port, :healthy) + + %{state | status: :healthy} + end + + defp next_success_state(state), do: state +end diff --git a/lib/wayfarer/target/supervisor.ex b/lib/wayfarer/target/supervisor.ex new file mode 100644 index 0000000..765fc71 --- /dev/null +++ b/lib/wayfarer/target/supervisor.ex @@ -0,0 +1,22 @@ +defmodule Wayfarer.Target.Supervisor do + @moduledoc """ + Supervisor for HTTP Targets. + """ + + use Supervisor + + @doc false + @spec start_link(any) :: Supervisor.on_start() + def start_link(arg), do: Supervisor.start_link(__MODULE__, arg) + + @doc false + @impl true + def init(_arg) do + [ + {Registry, keys: :unique, name: Wayfarer.Target.Registry}, + {DynamicSupervisor, name: Wayfarer.Target.DynamicSupervisor}, + Wayfarer.Target.Table + ] + |> Supervisor.init(strategy: :one_for_one) + end +end diff --git a/lib/wayfarer/target/table.ex b/lib/wayfarer/target/table.ex new file mode 100644 index 0000000..e67432e --- /dev/null +++ b/lib/wayfarer/target/table.ex @@ -0,0 +1,53 @@ +defmodule Wayfarer.Target.Table do + @moduledoc """ + Manages an ETS table containing target statuses. + """ + use GenServer + alias Wayfarer.Target.Server + + @doc false + @spec start_link(any) :: GenServer.on_start() + def start_link(args), do: GenServer.start_link(__MODULE__, args) + + @doc false + @spec init(any) :: {:ok, :ets.tid(), :hibernate} + def init(_) do + table = + __MODULE__ + |> :ets.new([:set, :public, :named_table]) + + {:ok, table, :hibernate} + end + + @doc """ + Store the target status in the table. + """ + @spec set_status(:http | :https, :inet.ip_address(), :inet.port_number(), Server.status()) :: + :ok + def set_status(scheme, ip, port, status) do + :ets.insert(__MODULE__, {{scheme, ip, port}, status}) + :ok + end + + @doc """ + Return the status of a target + """ + @spec get_status(:http | :https, :inet.ip_address(), :inet.port_number()) :: + {:ok, Server.status()} | :error + def get_status(scheme, ip, port) do + case :ets.lookup_element(__MODULE__, {scheme, ip, port}, 2, nil) do + nil -> :error + status -> {:ok, status} + end + end + + @doc """ + Return all the target statuses + """ + @spec status :: %{{:http | :https, :inet.ip_address(), :inet.port_number()} => Server.status()} + def status do + __MODULE__ + |> :ets.tab2list() + |> Map.new() + end +end diff --git a/test/support/http_server.ex b/test/support/http_server.ex new file mode 100644 index 0000000..32e59db --- /dev/null +++ b/test/support/http_server.ex @@ -0,0 +1,33 @@ +defmodule Support.HttpServer do + @moduledoc """ + A basic HTTP server which returns a canned response. + """ + + @behaviour Plug + import Plug.Conn + + @doc false + @spec start_link(:inet.port_number(), 100..599, String.t()) :: Supervisor.on_start() + def start_link(port, status, body), + do: + Bandit.start_link( + scheme: :http, + port: port, + ip: {127, 0, 0, 1}, + plug: {__MODULE__, {status, body}} + ) + + @doc false + @impl true + @spec init(any) :: any + def init({status, body}), do: {status, body} + + @doc false + @impl true + @spec call(Plug.Conn.t(), any) :: Plug.Conn.t() + def call(conn, {status, body}) do + conn + |> put_resp_content_type("text/plain") + |> send_resp(status, body) + end +end diff --git a/test/wayfarer/target_test.exs b/test/wayfarer/target_test.exs new file mode 100644 index 0000000..acdf538 --- /dev/null +++ b/test/wayfarer/target_test.exs @@ -0,0 +1,92 @@ +defmodule Wayfarer.TargetTest do + @moduledoc false + use ExUnit.Case, async: false + alias Support.HttpServer + alias Wayfarer.Target + + setup do + start_supervised!(Target.Supervisor) + + :ok + end + + describe "start_target/1" do + test "it returns an error when options are invalid" do + assert {:error, _} = + Target.start_target(scheme: :spdy, ip: {127, 0, 0, 1}, port: random_port()) + end + + test "it can start a target checker" do + port = random_port() + assert {:ok, _pid} = Target.start_target(scheme: :http, ip: {127, 0, 0, 1}, port: port) + end + + test "it becomes healthy when the target is ok" do + port = random_port() + + HttpServer.start_link(port, 200, "OK") + + assert {:ok, _pid} = + Target.start_target( + scheme: :http, + ip: {127, 0, 0, 1}, + port: port, + health_check: [interval: 10] + ) + + Process.sleep(200) + + assert {:ok, :healthy} = Target.Table.get_status(:http, {127, 0, 0, 1}, port) + end + + test "it stays in initial state when the target is not ok" do + port = random_port() + + HttpServer.start_link(port, 500, "BANG!") + + assert {:ok, _pid} = + Target.start_target( + scheme: :http, + ip: {127, 0, 0, 1}, + port: port, + health_check: [interval: 10] + ) + + Process.sleep(200) + + assert {:ok, :initial} = Target.Table.get_status(:http, {127, 0, 0, 1}, port) + end + + test "it can transitions from healthy -> unhealthy -> healthy when the server changes state" do + port = random_port() + + {:ok, http} = HttpServer.start_link(port, 200, "OK") + + assert {:ok, _pid} = + Target.start_target( + scheme: :http, + ip: {127, 0, 0, 1}, + port: port, + health_check: [interval: 10] + ) + + Process.sleep(500) + + assert {:ok, :healthy} = Target.Table.get_status(:http, {127, 0, 0, 1}, port) + + GenServer.stop(http, :normal) + + Process.sleep(250) + + assert {:ok, :unhealthy} = Target.Table.get_status(:http, {127, 0, 0, 1}, port) + + {:ok, http} = HttpServer.start_link(port, 200, "OK") + + Process.sleep(500) + + assert {:ok, :healthy} = Target.Table.get_status(:http, {127, 0, 0, 1}, port) + end + end + + defp random_port, do: :rand.uniform(0xFFFF - 1000) + 1000 +end