253 lines
6.9 KiB
Elixir
253 lines
6.9 KiB
Elixir
defmodule Wayfarer.Target do
|
|
# @moduledoc ⬇️⬇️
|
|
|
|
use GenServer, restart: :transient
|
|
require Logger
|
|
alias Spark.Options
|
|
alias Wayfarer.{Dsl.HealthCheck, Router, Server, Target}
|
|
import Wayfarer.Utils
|
|
|
|
@options_schema [
|
|
scheme: [
|
|
type: {:in, [:http, :https]},
|
|
doc: "The connection protocol.",
|
|
required: true
|
|
],
|
|
port: [
|
|
type: {:in, 1..0xFFFF},
|
|
doc: "The TCP port to connect to.",
|
|
required: true
|
|
],
|
|
address: [
|
|
type: {:struct, IP.Address},
|
|
doc: "The IP address to connect to.",
|
|
required: true
|
|
],
|
|
module: [
|
|
type: {:behaviour, Wayfarer.Server},
|
|
doc: "The proxy module this target is linked to.",
|
|
required: true
|
|
],
|
|
name: [
|
|
type: {:or, [nil, :string]},
|
|
doc: "An optional name for the target.",
|
|
required: false
|
|
],
|
|
health_checks: [
|
|
type: {:list, {:keyword_list, HealthCheck.schema()}},
|
|
required: false,
|
|
default: HealthCheck.default() |> HealthCheck.to_options() |> then(&[&1])
|
|
]
|
|
]
|
|
|
|
@wayfarer_vsn Application.spec(:wayfarer, :vsn) || Mix.Project.config()[:version]
|
|
@elixir_vsn Application.spec(:elixir, :vsn)
|
|
@erlang_vsn :erlang.system_info(:otp_release)
|
|
@mint_vsn Application.spec(:mint, :vsn)
|
|
|
|
@default_headers [
|
|
{"User-Agent",
|
|
"Wayfarer/#{@wayfarer_vsn} (Elixir #{@elixir_vsn}; Erlang #{@erlang_vsn}) Mint/#{@mint_vsn}"},
|
|
{"Connection", "close"},
|
|
{"Accept", "*/*"}
|
|
]
|
|
|
|
@moduledoc """
|
|
A GenServer responsible for performing health-checks against HTTP and HTTPS
|
|
targets.
|
|
|
|
You should not need to create one of these yourself, instead use it via
|
|
`Wayfarer.Server`.
|
|
|
|
## Options
|
|
|
|
#{Options.docs(@options_schema)}
|
|
"""
|
|
|
|
@type key :: {module, :http | :https, IP.Address.t(), :socket.port_number()}
|
|
|
|
@doc false
|
|
@spec check_failed({key, reference}) :: :ok
|
|
def check_failed({key, id}),
|
|
do: GenServer.cast({:via, Registry, {Wayfarer.Target.Registry, key}}, {:check_failed, id})
|
|
|
|
@doc false
|
|
@spec check_passed({key, reference}) :: :ok
|
|
def check_passed({key, id}),
|
|
do: GenServer.cast({:via, Registry, {Wayfarer.Target.Registry, key}}, {:check_passed, id})
|
|
|
|
@doc "Return the current health status of the target"
|
|
@spec current_status(pid | key) :: {:ok, Router.health()} | {:error, any}
|
|
def current_status(pid) when is_pid(pid), do: GenServer.call(pid, :current_status)
|
|
|
|
def current_status(key),
|
|
do: GenServer.call({:via, Registry, {Wayfarer.Target.Registry, key}}, :current_status)
|
|
|
|
@doc false
|
|
@spec start_link(keyword) :: GenServer.on_start()
|
|
def start_link(options), do: GenServer.start_link(__MODULE__, options)
|
|
|
|
@doc false
|
|
@impl true
|
|
def init(options) do
|
|
with {:ok, options} <- Options.validate(options, @options_schema),
|
|
{:ok, uri} <- to_uri(options[:scheme], options[:address], options[:port]) do
|
|
target = options |> Keyword.take(~w[scheme address port]a) |> Map.new()
|
|
module = options[:module]
|
|
|
|
key = {module, target.scheme, target.address, target.port}
|
|
|
|
checks =
|
|
options
|
|
|> Keyword.get(:health_checks, [])
|
|
|> Map.new(fn check ->
|
|
id = make_ref()
|
|
|
|
check =
|
|
check
|
|
|> Map.new()
|
|
|> Map.merge(%{
|
|
status: :initial,
|
|
scheme: target.scheme,
|
|
address: IP.Address.to_tuple(target.address),
|
|
port: target.port,
|
|
uri: %{uri | path: check[:path]},
|
|
id: id,
|
|
ref: {key, id},
|
|
method: check[:method] |> to_string() |> String.upcase(),
|
|
headers: @default_headers,
|
|
hostname: check[:hostname] || uri.host,
|
|
passes: 0
|
|
})
|
|
|
|
{id, check}
|
|
end)
|
|
|
|
Logger.info("Starting Wayfarer target #{uri}.")
|
|
|
|
state = %{
|
|
target: target,
|
|
checks: checks,
|
|
uri: uri,
|
|
module: module,
|
|
name: options[:name],
|
|
status: :initial
|
|
}
|
|
|
|
Registry.register(Wayfarer.Target.Registry, key, uri)
|
|
|
|
{:ok, state, {:continue, :perform_health_checks}}
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
@impl true
|
|
def handle_continue(:perform_health_checks, state) do
|
|
perform_health_checks(state)
|
|
{:noreply, state}
|
|
end
|
|
|
|
@doc false
|
|
@impl true
|
|
def handle_cast({:check_failed, id}, state)
|
|
when is_map_key(state.checks, id) and state.status == :unhealthy do
|
|
checks =
|
|
Map.update!(state.checks, id, fn check ->
|
|
check
|
|
|> queue_check()
|
|
|> then(&%{&1 | status: :unhealthy, passes: 0})
|
|
end)
|
|
|
|
{:noreply, %{state | checks: checks}}
|
|
end
|
|
|
|
def handle_cast({:check_failed, id}, state) when is_map_key(state.checks, id) do
|
|
checks =
|
|
Map.update!(state.checks, id, fn check ->
|
|
check
|
|
|> queue_check()
|
|
|> then(&%{&1 | status: :unhealthy, passes: 0})
|
|
end)
|
|
|
|
Server.target_status_change(
|
|
{state.module, state.target.scheme, state.target.address, state.target.port},
|
|
:unhealthy
|
|
)
|
|
|
|
{:noreply, %{state | checks: checks, status: :unhealthy}}
|
|
end
|
|
|
|
def handle_cast({:check_passed, id}, state) when state.status == :healthy do
|
|
check =
|
|
state.checks
|
|
|> Map.fetch!(id)
|
|
|> queue_check()
|
|
|> increment_success()
|
|
|
|
checks = Map.put(state.checks, id, check)
|
|
|
|
{:noreply, %{state | checks: checks}}
|
|
end
|
|
|
|
def handle_cast({:check_passed, id}, state) when is_map_key(state.checks, id) do
|
|
check =
|
|
state.checks
|
|
|> Map.fetch!(id)
|
|
|> queue_check()
|
|
|> increment_success()
|
|
|
|
checks = Map.put(state.checks, id, check)
|
|
|
|
target_became_healthy? =
|
|
checks
|
|
|> Map.values()
|
|
|> Enum.all?(&(&1.status == :healthy))
|
|
|
|
if target_became_healthy? do
|
|
Server.target_status_change(
|
|
{state.module, state.target.scheme, state.target.address, state.target.port},
|
|
:healthy
|
|
)
|
|
|
|
Logger.info("Target #{state.uri} became healthy")
|
|
|
|
{:noreply, %{state | checks: checks, status: :healthy}}
|
|
else
|
|
{:noreply, %{state | checks: checks}}
|
|
end
|
|
end
|
|
|
|
def handle_cast(_message, state), do: {:noreply, state}
|
|
|
|
@doc false
|
|
@impl true
|
|
def handle_info({:perform_check, id}, state) do
|
|
check = Map.fetch!(state.checks, id)
|
|
{:ok, _pid} = GenServer.start(Target.Check, check)
|
|
{:noreply, state}
|
|
end
|
|
|
|
@doc false
|
|
@impl true
|
|
def handle_call(:current_status, _from, state) do
|
|
{:reply, {:ok, state.status}, state}
|
|
end
|
|
|
|
defp perform_health_checks(state) do
|
|
for {_ref, check} <- state.checks do
|
|
{:ok, _pid} = GenServer.start(Target.Check, check)
|
|
end
|
|
end
|
|
|
|
defp queue_check(check) do
|
|
Process.send_after(self(), {:perform_check, check.id}, check.interval)
|
|
check
|
|
end
|
|
|
|
defp increment_success(check), do: increment_success(check, check.passes + 1)
|
|
|
|
defp increment_success(check, passes) when passes >= check.threshold,
|
|
do: %{check | passes: passes, status: :healthy}
|
|
|
|
defp increment_success(check, passes), do: %{check | passes: passes}
|
|
end
|