WIP: feat(Server): Allow dynamic addition and removal or listeners, targets and routes. #117

Draft
james wants to merge 5 commits from feat/dynamic-configuration into main
22 changed files with 792 additions and 49 deletions

14
lib/wayfarer/error.ex Normal file
View file

@ -0,0 +1,14 @@
defmodule Wayfarer.Error do
@moduledoc """
The main error aggregator for Wayfarer errors.
"""
use Splode,
error_classes: [
listener: __MODULE__.Listener,
target: __MODULE__.Target,
server: __MODULE__.Server,
unknown: __MODULE__.Unknown
],
unknown_error: __MODULE__.Unknown
end

View file

@ -0,0 +1,7 @@
defmodule Wayfarer.Error.Listener do
@moduledoc """
The error class for Listener errors.
"""
use Splode.ErrorClass, class: :listener
end

View file

@ -0,0 +1,24 @@
defmodule Wayfarer.Error.Listener.NoSuchListener do
@moduledoc """
An exception which is returned when requesting information about a listener
which no longer exists.
"""
use Splode.Error, fields: [:listener], class: :listener
@doc false
@impl true
def message(error) do
"""
# No Such Listener
#{@moduledoc}
## Listener requested:
```elixir
#{inspect(error.listener)}
```
"""
end
end

View file

@ -0,0 +1,7 @@
defmodule Wayfarer.Error.Server do
@moduledoc """
The error class for Server errors.
"""
use Splode.ErrorClass, class: :server
end

View file

@ -0,0 +1,7 @@
defmodule Wayfarer.Error.Target do
@moduledoc """
The error class for Target errors.
"""
use Splode.ErrorClass, class: :target
end

View file

@ -0,0 +1,24 @@
defmodule Wayfarer.Error.Target.NoSuchTarget do
@moduledoc """
An exception which is returned when requesting information about a target
which no longer exists.
"""
use Splode.Error, fields: [:target], class: :target
@doc false
@impl true
def message(error) do
"""
# No Such Listener
#{@moduledoc}
## Target requested:
```elixir
#{inspect(error.target)}
```
"""
end
end

View file

@ -0,0 +1,11 @@
defmodule Wayfarer.Error.Unknown do
@moduledoc """
An unknown error.
"""
use Splode.Error, fields: [:error, :message], class: :unknown
@doc false
@impl true
def message(error), do: error.message
end

View file

@ -1,9 +1,26 @@
defmodule Wayfarer.Listener do defmodule Wayfarer.Listener do
# @moduledoc ⬇️⬇️ # @moduledoc ⬇️⬇️
defstruct [
:address,
:certfile,
:cipher_suite,
:drain_timeout,
:http_1_options,
:http_2_options,
:keyfile,
:module,
:name,
:port,
:scheme,
:thousand_island_options,
:websocket_options
]
use GenServer, restart: :transient use GenServer, restart: :transient
require Logger require Logger
alias Spark.Options alias Spark.Options
alias Wayfarer.Listener.Registry
import Wayfarer.Utils import Wayfarer.Utils
@options_schema [ @options_schema [
@ -82,6 +99,24 @@ defmodule Wayfarer.Listener do
] ]
] ]
@type t :: %__MODULE__{
scheme: :http | :https | :ws | :wss,
port: :socket.port_number(),
address: IP.Address.t(),
drain_timeout: timeout(),
module: module,
name: nil | String.t(),
keyfile: String.t(),
certfile: String.t(),
cipher_suite: nil | :strong | :compatible,
http_1_options: Bandit.http_1_options(),
http_2_options: Bandit.http_2_options(),
websocket_options: Bandit.websocket_options(),
thousand_island_options: ThousandIsland.options()
}
@type status :: :starting | :accepting_connections | :draining
@moduledoc """ @moduledoc """
A GenServer which manages the state of each Bandit listener. A GenServer which manages the state of each Bandit listener.
@ -101,14 +136,17 @@ defmodule Wayfarer.Listener do
@impl true @impl true
def init(options) do def init(options) do
with {:ok, options} <- validate_options(options), with {:ok, options} <- validate_options(options),
listener <- struct(__MODULE__, options),
:ok <- Registry.register(listener),
bandit_options <- build_bandit_options(options), bandit_options <- build_bandit_options(options),
{:ok, pid} <- Bandit.start_link(bandit_options), {:ok, pid} <- Bandit.start_link(bandit_options),
{:ok, {listen_address, listen_port}} <- ThousandIsland.listener_info(pid), {:ok, {listen_address, listen_port}} <- ThousandIsland.listener_info(pid),
:ok <- Registry.update_status(listener, :accepting_connections),
{:ok, listen_address} <- IP.Address.from_tuple(listen_address), {:ok, listen_address} <- IP.Address.from_tuple(listen_address),
{:ok, uri} <- to_uri(options[:scheme], listen_address, listen_port) do {:ok, uri} <- to_uri(options[:scheme], listen_address, listen_port) do
Logger.info("Started Wayfarer listener on #{uri}") Logger.info("Started Wayfarer listener on #{uri}")
{:ok, %{server: pid, name: options[:name], uri: uri}} {:ok, %{server: pid, name: options[:name], uri: uri, listener: listener}}
else else
:error -> {:stop, "Unable to retrieve listener information."} :error -> {:stop, "Unable to retrieve listener information."}
{:error, reason} -> {:stop, reason} {:error, reason} -> {:stop, reason}
@ -117,8 +155,28 @@ defmodule Wayfarer.Listener do
@doc false @doc false
@impl true @impl true
def terminate(:normal, %{server: server}) do def handle_call(:terminate, _from, state),
GenServer.stop(server, :normal) do: {:stop, :normal, {:ok, :draining}, state}
@doc false
@impl true
def terminate(_reason, state) do
with :ok <- Registry.update_status(state.listener, :draining) do
ThousandIsland.stop(state.server, state.listener.drain_timeout)
end
:ok
end
@doc false
@impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, state)
when state.server == pid do
Logger.error(fn ->
"Listener #{state.uri} terminating for reason: #{inspect(reason)}"
end)
{:stop, reason, state}
end end
defp validate_options(options) do defp validate_options(options) do

View file

@ -0,0 +1,84 @@
defmodule Wayfarer.Listener.Registry do
@moduledoc """
Functions for interacting with the Listener registry.
"""
alias Wayfarer.{
Error.Listener.NoSuchListener,
Listener
}
@doc """
Register the calling process as a listener.
"""
@spec register(Listener.t(), Listener.status()) :: :ok | {:error, any}
def register(listener, status \\ :starting) do
key = registry_key(listener)
with {:ok, _pid} <- Registry.register(__MODULE__, key, {listener, status}) do
:ok
end
end
@doc """
List active listeners for a server module.
"""
@spec list_listeners_for_module(module) :: [Listener.t()]
def list_listeners_for_module(module) do
# :ets.fun2ms(fn {{module, _scheme, _address, _port}, _pid, {listener, _status}} when module == :module ->
# listener
# end)
Registry.select(
__MODULE__,
[
{{{:"$1", :"$2", :"$3", :"$4"}, :"$5", {:"$6", :"$7"}}, [{:==, :"$1", module}], [:"$6"]}
]
)
end
@doc """
Return the status of a listener.
"""
@spec get_status(Listener.t()) :: {:ok, Listener.status()} | {:error, any}
def get_status(listener) do
key = registry_key(listener)
case Registry.lookup(__MODULE__, key) do
[{_pid, {_listener, status}}] -> {:ok, status}
[] -> {:error, NoSuchListener.exception(listener: listener)}
end
end
@doc """
Return the process ID of a registered listener.
"""
@spec get_pid(Listener.t()) :: {:ok, pid} | {:error, any}
def get_pid(listener) do
key = registry_key(listener)
case Registry.lookup(__MODULE__, key) do
[{pid, _value}] -> {:ok, pid}
[] -> {:error, NoSuchListener.exception(listener: listener)}
end
end
@doc """
Update the status of a listener.
"""
@spec update_status(Listener.t(), Listener.status()) :: :ok | {:error, any}
def update_status(listener, status) do
key = registry_key(listener)
case Registry.update_value(__MODULE__, key, fn {_listener, _status} -> {listener, status} end) do
{_, _} -> :ok
:error -> {:error, NoSuchListener.exception(listener: listener)}
end
end
@doc """
Returns the registry key for a listener.
"""
@spec registry_key(Listener.t()) :: any
def registry_key(listener),
do: {listener.module, listener.scheme, listener.address, listener.port}
end

View file

@ -8,7 +8,8 @@ defmodule Wayfarer.Listener.Supervisor do
@impl true @impl true
def init(_) do def init(_) do
[ [
{DynamicSupervisor, name: Wayfarer.Listener.DynamicSupervisor, strategy: :one_for_one} {DynamicSupervisor, name: Wayfarer.Listener.DynamicSupervisor, strategy: :one_for_one},
{Registry, name: Wayfarer.Listener.Registry, keys: :unique}
] ]
|> Supervisor.init(strategy: :one_for_one) |> Supervisor.init(strategy: :one_for_one)
end end

View file

@ -1,6 +1,14 @@
defmodule Wayfarer.Server do defmodule Wayfarer.Server do
alias Spark.Options alias Spark.Options
alias Wayfarer.{Dsl, Listener, Router, Server, Target}
alias Wayfarer.{
Dsl,
Listener,
Router,
Server,
Target
}
use GenServer use GenServer
require Logger require Logger
@ -98,7 +106,12 @@ defmodule Wayfarer.Server do
|> Keyword.merge(opts) |> Keyword.merge(opts)
|> Keyword.put(:module, __MODULE__) |> Keyword.put(:module, __MODULE__)
Server.child_spec(opts) default = %{
id: __MODULE__,
start: {Wayfarer.Server, :start_link, [opts]}
}
Supervisor.child_spec(default, [])
end end
@doc false @doc false
@ -116,6 +129,72 @@ defmodule Wayfarer.Server do
end end
end end
@doc """
Add a listener to an already running server.
If the listener fails to start for any reason, then this function will return
an error, otherwise it will block until the listener is ready to accept
requests.
## Options
The following options are supported by listener configuration:
#{Options.docs(Dsl.Listener.schema())}
"""
@spec add_listener(module, options) :: :ok | {:error, any}
def add_listener(module, options) do
with {:ok, options} <- Options.validate(options, Dsl.Listener.schema()) do
{:via, Registry, {Wayfarer.Server.Registry, module}}
|> GenServer.call({:add_listener, options})
end
end
@doc """
List the active listeners for a server.
"""
@spec list_listeners(module) :: [Listener.t()]
def list_listeners(module), do: Listener.Registry.list_listeners_for_module(module)
@doc """
Remove a listener from a running server.
"""
@spec remove_listener(module, Listener.t()) :: {:ok, :stopped | :draining} | {:error, any}
def remove_listener(module, listener) do
{:via, Registry, {Wayfarer.Server.Registry, module}}
|> GenServer.call({:remove_listener, listener})
end
@doc """
List the active targets for a server.
"""
@spec list_targets(module) :: [Target.t()]
def list_targets(module), do: Target.Registry.list_targets_for_module(module)
@doc """
Add a target to an already running server.
If the target fails to start for any reason, then this function will return an
error, otherwise it will block until the target is started.
## Options
The following options are supported by target configuration:
#{Options.docs(Target.schema())}
The following options are supported by health-check configuration:
#{Options.docs(Dsl.HealthCheck.schema())}
"""
@spec add_target(module, options) :: :ok | {:error, any}
def add_target(module, options) do
with {:ok, options} <- Options.validate(options, Dsl.Target.schema()) do
{:via, Registry, {Wayfarer.Server.Registry, module}}
|> GenServer.call({:add_target, options})
end
end
@doc false @doc false
@spec target_status_change( @spec target_status_change(
{module, :http | :https, IP.Address.t(), :socket.port_number(), {module, :http | :https, IP.Address.t(), :socket.port_number(),
@ -177,44 +256,80 @@ defmodule Wayfarer.Server do
{:noreply, state} {:noreply, state}
end end
@doc false
@impl true
@spec handle_call(any, GenServer.from(), map) :: {:reply, any, map}
def handle_call({:add_listener, listener}, _from, state) do
{:reply, start_listener(listener, state), state}
end
def handle_call({:add_target, target}, _from, state) do
{:reply, start_target(target, state), state}
end
def handle_call({:remove_listener, listener}, _from, state) do
case Listener.Registry.get_pid(listener) do
{:ok, pid} ->
Process.unlink(pid)
{:reply, GenServer.call(pid, :terminate), state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
defp start_listeners(listeners, state) do defp start_listeners(listeners, state) do
listeners listeners
|> Enum.reduce_while({:ok, state}, fn listener, success -> |> Enum.reduce_while({:ok, state}, fn listener, success ->
case start_listener(listener, state) do
{:ok, _} -> {:cont, success}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp start_listener(listener, state) do
listener = Keyword.put(listener, :module, state.module) listener = Keyword.put(listener, :module, state.module)
case DynamicSupervisor.start_child(Listener.DynamicSupervisor, {Listener, listener}) do case DynamicSupervisor.start_child(Listener.DynamicSupervisor, {Listener, listener}) do
{:ok, pid} -> {:ok, pid} ->
Process.link(pid) Process.link(pid)
{:cont, success} {:ok, pid}
{:error, {:already_started, pid}} -> {:error, {:already_started, pid}} ->
Process.link(pid) Process.link(pid)
{:cont, success} {:ok, pid}
{:error, reason} -> {:error, reason} ->
{:halt, {:error, reason}} {:error, reason}
end end
end)
end end
defp start_targets(targets, state) do defp start_targets(targets, state) do
targets targets
|> Enum.reduce_while({:ok, state}, fn target, success -> |> Enum.reduce_while({:ok, state}, fn target, success ->
case start_target(target, state) do
{:ok, _} -> {:cont, success}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp start_target(target, state) do
target = Keyword.put(target, :module, state.module) target = Keyword.put(target, :module, state.module)
case DynamicSupervisor.start_child(Target.DynamicSupervisor, {Target, target}) do case DynamicSupervisor.start_child(Target.DynamicSupervisor, {Target, target}) do
{:ok, pid} -> {:ok, pid} ->
Process.link(pid) Process.link(pid)
{:cont, success} {:ok, pid}
{:error, {:already_started, pid}} -> {:error, {:already_started, pid}} ->
Process.link(pid) Process.link(pid)
{:cont, success} {:ok, pid}
{:error, reason} -> {:error, reason} ->
{:halt, {:error, reason}} {:error, reason}
end end
end)
end end
defp assert_is_server(module) do defp assert_is_server(module) do

View file

@ -1,6 +1,8 @@
defmodule Wayfarer.Target do defmodule Wayfarer.Target do
# @moduledoc ⬇️⬇️ # @moduledoc ⬇️⬇️
defstruct [:scheme, :port, :address, :module, :name, :transport]
use GenServer, restart: :transient use GenServer, restart: :transient
require Logger require Logger
alias Spark.Options alias Spark.Options
@ -58,6 +60,15 @@ defmodule Wayfarer.Target do
{"Accept", "*/*"} {"Accept", "*/*"}
] ]
@type t :: %__MODULE__{
scheme: :http | :https | :ws | :wss,
port: :socket.port_number(),
address: IP.Address.t(),
module: module,
name: nil | String.t(),
transport: :http1 | :http2 | :auto
}
@moduledoc """ @moduledoc """
A GenServer responsible for performing health-checks against HTTP and HTTPS A GenServer responsible for performing health-checks against HTTP and HTTPS
targets. targets.
@ -72,6 +83,9 @@ defmodule Wayfarer.Target do
@type key :: {module, :http | :https, IP.Address.t(), :socket.port_number()} @type key :: {module, :http | :https, IP.Address.t(), :socket.port_number()}
@doc false
def schema, do: @options_schema
@doc false @doc false
@spec check_failed({key, reference}) :: :ok @spec check_failed({key, reference}) :: :ok
def check_failed({key, id}), def check_failed({key, id}),
@ -98,10 +112,10 @@ defmodule Wayfarer.Target do
def init(options) do def init(options) do
with {:ok, options} <- Options.validate(options, @options_schema), with {:ok, options} <- Options.validate(options, @options_schema),
{:ok, uri} <- to_uri(options[:scheme], options[:address], options[:port]) do {:ok, uri} <- to_uri(options[:scheme], options[:address], options[:port]) do
target = options |> Keyword.take(~w[scheme address port transport]a) |> Map.new() target = struct(__MODULE__, options)
module = options[:module] module = options[:module]
key = {module, target.scheme, target.address, target.port} key = Target.Registry.registry_key(target)
checks = checks =
options options
@ -138,10 +152,11 @@ defmodule Wayfarer.Target do
uri: uri, uri: uri,
module: module, module: module,
name: options[:name], name: options[:name],
status: :initial status: :initial,
key: key
} }
Registry.register(Wayfarer.Target.Registry, key, uri) Target.Registry.register(target)
{:ok, state, {:continue, :perform_health_checks}} {:ok, state, {:continue, :perform_health_checks}}
end end
@ -176,11 +191,7 @@ defmodule Wayfarer.Target do
|> then(&%{&1 | status: :unhealthy, passes: 0}) |> then(&%{&1 | status: :unhealthy, passes: 0})
end) end)
Server.target_status_change( Server.target_status_change(state.key, :unhealthy)
{state.module, state.target.scheme, state.target.address, state.target.port,
state.target.transport},
:unhealthy
)
{:noreply, %{state | checks: checks, status: :unhealthy}} {:noreply, %{state | checks: checks, status: :unhealthy}}
end end
@ -212,11 +223,7 @@ defmodule Wayfarer.Target do
|> Enum.all?(&(&1.status == :healthy)) |> Enum.all?(&(&1.status == :healthy))
if target_became_healthy? do if target_became_healthy? do
Server.target_status_change( Server.target_status_change(state.key, :healthy)
{state.module, state.target.scheme, state.target.address, state.target.port,
state.target.transport},
:healthy
)
Logger.info("Target #{state.uri} became healthy") Logger.info("Target #{state.uri} became healthy")

View file

@ -0,0 +1,50 @@
defmodule Wayfarer.Target.Registry do
@moduledoc """
Functions for interacting with the Target registry.
"""
alias Wayfarer.{Error.Target.NoSuchTarget, Target}
@doc """
Register the calling process as a target.
"""
@spec register(Target.t()) :: :ok | {:error, any}
def register(target) do
key = registry_key(target)
with {:ok, _pid} <- Registry.register(__MODULE__, key, target) do
:ok
end
end
@doc """
List active targets for a server module.
"""
@spec list_targets_for_module(module) :: [Target.t()]
def list_targets_for_module(module) do
# :ets.fun2ms(fn {{module, _scheme, _address, _port, _transport}, _pid, target} when module == :module -> target end)
Registry.select(
__MODULE__,
[
{{{:"$1", :"$2", :"$3", :"$4", :"$5"}, :"$6", :"$7"}, [{:==, :"$1", module}], [:"$7"]}
]
)
end
@doc """
Return the process ID of a registered target.
"""
@spec get_pid(Target.t()) :: {:ok, pid} | {:error, any}
def get_pid(target) do
key = registry_key(target)
case Registry.lookup(__MODULE__, key) do
[{pid, _value}] -> {:ok, pid}
[] -> {:error, NoSuchTarget.exception(target: target)}
end
end
@doc false
def registry_key(target),
do: {target.module, target.scheme, target.address, target.port, target.transport}
end

View file

@ -100,6 +100,7 @@ defmodule Wayfarer.MixProject do
{:nimble_options, "~> 1.0"}, {:nimble_options, "~> 1.0"},
{:plug, "~> 1.15"}, {:plug, "~> 1.15"},
{:spark, "~> 2.0"}, {:spark, "~> 2.0"},
{:splode, "~> 0.2"},
{:telemetry, "~> 1.2"}, {:telemetry, "~> 1.2"},
{:websock, "~> 0.5"}, {:websock, "~> 0.5"},
{:websock_adapter, "~> 0.5"}, {:websock_adapter, "~> 0.5"},

View file

@ -37,6 +37,7 @@
"sourceror": {:hex, :sourceror, "1.6.0", "9907884e1449a4bd7dbaabe95088ed4d9a09c3c791fb0103964e6316bc9448a7", [:mix], [], "hexpm", "e90aef8c82dacf32c89c8ef83d1416fc343cd3e5556773eeffd2c1e3f991f699"}, "sourceror": {:hex, :sourceror, "1.6.0", "9907884e1449a4bd7dbaabe95088ed4d9a09c3c791fb0103964e6316bc9448a7", [:mix], [], "hexpm", "e90aef8c82dacf32c89c8ef83d1416fc343cd3e5556773eeffd2c1e3f991f699"},
"spark": {:hex, :spark, "2.2.24", "0cbd0e224af530f8f12f0e83ac5743b21802fb821d85b58d32a4da7e2268522b", [:mix], [{:igniter, ">= 0.2.6 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "f05fd64ef74b3f3fe7817743962956dcc8a8e84bb9dc796ac7bf7fdcf4db5b6d"}, "spark": {:hex, :spark, "2.2.24", "0cbd0e224af530f8f12f0e83ac5743b21802fb821d85b58d32a4da7e2268522b", [:mix], [{:igniter, ">= 0.2.6 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "f05fd64ef74b3f3fe7817743962956dcc8a8e84bb9dc796ac7bf7fdcf4db5b6d"},
"spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"}, "spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"},
"splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},

6
test/support/dynamic.ex Normal file
View file

@ -0,0 +1,6 @@
defmodule Support.Dynamic do
@moduledoc """
An empty server for testing dynamic proxy configuration.
"""
use Wayfarer.Server, targets: [], listeners: [], routing_table: []
end

View file

@ -1,5 +1,6 @@
{:ok, _} = Support.PortTracker.start_link([]) {:ok, _} = Support.PortTracker.start_link([])
Mimic.copy(Mint.HTTP) Mimic.copy(Mint.HTTP)
Mimic.copy(ThousandIsland)
Mimic.copy(Wayfarer.Router) Mimic.copy(Wayfarer.Router)
Mimic.copy(Wayfarer.Target.ActiveConnections) Mimic.copy(Wayfarer.Target.ActiveConnections)
Mimic.copy(Wayfarer.Target.TotalConnections) Mimic.copy(Wayfarer.Target.TotalConnections)

View file

@ -0,0 +1,134 @@
defmodule Wayfarer.Listener.RegistryTest do
@moduledoc false
use ExUnit.Case, async: false
alias Wayfarer.{Error.Listener.NoSuchListener, Listener, Server, Target}
use Support.PortTracker
use Support.HttpRequest
import IP.Sigil
defmodule DynamicServer do
@moduledoc false
use Wayfarer.Server
end
setup do
start_supervised!(Server.Supervisor)
start_supervised!(Listener.Supervisor)
start_supervised!(Target.Supervisor)
start_supervised!(DynamicServer)
:ok
end
describe "listener/1" do
test "it registers the calling process with the default status" do
listener = make_listener()
assert :ok = Listener.Registry.register(listener)
assert {:ok, :starting} = Listener.Registry.get_status(listener)
end
end
describe "listener/2" do
test "it registers the calling process with the provided status" do
listener = make_listener()
assert :ok = Listener.Registry.register(listener, :accepting_connections)
assert {:ok, :accepting_connections} = Listener.Registry.get_status(listener)
end
end
describe "list_listeners_for_module/1" do
test "when there are listeners running, it returns them" do
Server.add_listener(DynamicServer,
scheme: :http,
address: ~i"127.0.0.1",
port: random_port()
)
Server.add_listener(DynamicServer,
scheme: :http,
address: ~i"127.0.0.1",
port: random_port()
)
assert [%Listener{}, %Listener{}] =
Listener.Registry.list_listeners_for_module(DynamicServer)
end
test "when there are no listeners running, it returns an empty list" do
assert [] = Listener.Registry.list_listeners_for_module(DynamicServer)
end
end
describe "get_status/1" do
test "when the listener exists it returns the status of the listener" do
port = random_port()
Server.add_listener(DynamicServer,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
listener = make_listener(port)
assert {:ok, :accepting_connections} = Listener.Registry.get_status(listener)
end
test "when the listener doesn't exist it returns an error" do
listener = make_listener()
assert {:error, %NoSuchListener{}} = Listener.Registry.get_status(listener)
end
end
describe "get_pid/1" do
test "when the listener exists it returns the pid of the listener" do
port = random_port()
Server.add_listener(DynamicServer,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
listener = make_listener(port)
assert {:ok, pid} = Listener.Registry.get_pid(listener)
assert is_pid(pid)
end
test "when the listener doesn't exist it returns an error" do
listener = make_listener()
assert {:error, %NoSuchListener{}} = Listener.Registry.get_status(listener)
end
end
describe "update_status/2" do
test "when the listener exists it updates the status of the listener" do
listener = make_listener()
assert :ok = Listener.Registry.register(listener, :accepting_connections)
assert {:ok, :accepting_connections} = Listener.Registry.get_status(listener)
assert :ok = Listener.Registry.update_status(listener, :draining)
assert {:ok, :draining} = Listener.Registry.get_status(listener)
end
test "when the listener doesn't exist it returns an error" do
listener = make_listener()
assert {:error, %NoSuchListener{}} = Listener.Registry.update_status(listener, :draining)
end
end
defp make_listener(port \\ random_port()) do
%Listener{
scheme: :http,
address: ~i"127.0.0.1",
port: port,
module: DynamicServer
}
end
end

View file

@ -3,6 +3,7 @@ defmodule Wayfarer.ListenerTest do
use ExUnit.Case, async: false use ExUnit.Case, async: false
use Support.PortTracker use Support.PortTracker
use Support.HttpRequest use Support.HttpRequest
use Mimic
alias Wayfarer.{Listener, Router} alias Wayfarer.{Listener, Router}
import IP.Sigil import IP.Sigil
@ -51,4 +52,43 @@ defmodule Wayfarer.ListenerTest do
options: [transport_opts: [verify: :verify_none]] options: [transport_opts: [verify: :verify_none]]
) )
end end
describe "handle_call(:terminate, _, _)" do
setup :set_mimic_global
test "it cleanly shuts down" do
port = random_port()
assert {:ok, pid} =
Listener.start_link(
scheme: :http,
address: ~i"127.0.0.1",
port: port,
module: Support.Example
)
assert {:ok, :draining} = GenServer.call(pid, :terminate)
refute Process.alive?(pid)
end
test "it correctly drains connections" do
port = random_port()
assert {:ok, pid} =
Listener.start_link(
scheme: :http,
address: ~i"127.0.0.1",
port: port,
module: Support.Example,
drain_timeout: 123_456
)
ThousandIsland
|> expect(:stop, fn _pid, timeout ->
assert timeout == 123_456
end)
assert {:ok, :draining} = GenServer.call(pid, :terminate)
end
end
end end

View file

@ -0,0 +1,142 @@
defmodule Wayfarer.Server.DynamicTest do
@moduledoc false
use ExUnit.Case, async: false
alias Wayfarer.{Error.Listener.NoSuchListener, Listener, Server, Target}
use Support.PortTracker
use Support.HttpRequest
import IP.Sigil
defmodule DynamicServer1 do
@moduledoc false
use Wayfarer.Server
end
defmodule DynamicServer2 do
@moduledoc false
use Wayfarer.Server
end
setup do
start_supervised!(Server.Supervisor)
start_supervised!(Listener.Supervisor)
start_supervised!(Target.Supervisor)
start_supervised!(DynamicServer1)
start_supervised!(DynamicServer2)
:ok
end
describe "Server.add_listener/2" do
test "a listener can be dynamically added to a server" do
port = random_port()
assert {:ok, _pid} =
Server.add_listener(DynamicServer1,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
assert {:ok, %{status: 502}} = request(:http, ~i"127.0.0.1", port, host: "www.example.com")
end
test "the same listener cannot be added to two servers" do
port = random_port()
assert {:ok, _pid} =
Server.add_listener(DynamicServer1,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
assert {:error, _} =
Server.add_listener(DynamicServer2,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
end
end
describe "Server.list_listeners/1" do
test "when there are no listeners it returns an empty list" do
assert [] = Server.list_listeners(DynamicServer1)
end
test "when there are listeners running it returns a list" do
port = random_port()
assert {:ok, _pid} =
Server.add_listener(DynamicServer1,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
assert [%Listener{} = listener] =
Server.list_listeners(DynamicServer1)
assert listener.scheme == :http
assert listener.address == ~i"127.0.0.1"
assert listener.port == port
end
end
describe "Server.remove_listener/2" do
test "when there are no listeners running it returns an error" do
listener = %Listener{scheme: :http, address: ~i"127.0.0.1", port: random_port()}
assert {:error, %NoSuchListener{}} = Server.remove_listener(DynamicServer1, listener)
end
test "when there listeners running they are asked to stop" do
port = random_port()
assert {:ok, pid} =
Server.add_listener(DynamicServer1,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
[listener] = Server.list_listeners(DynamicServer1)
assert {:ok, :draining} = Server.remove_listener(DynamicServer1, listener)
refute Process.alive?(pid)
end
end
describe "Server.add_target/2" do
test "a target can be dynamically added to a server" do
port = random_port()
assert {:ok, _pid} =
Server.add_target(DynamicServer1,
scheme: :http,
port: port,
address: ~i"127.0.0.1"
)
assert [target] = Target.Registry.list_targets_for_module(DynamicServer1)
assert target.port == port
end
end
describe "Server.list_targets/1" do
test "running targets are returned" do
port = random_port()
assert {:ok, _pid} =
Server.add_target(DynamicServer1,
scheme: :http,
port: port,
address: ~i"127.0.0.1"
)
assert [target] = Server.list_targets(DynamicServer1)
assert target.port == port
end
end
end

View file

@ -41,7 +41,9 @@ defmodule Wayfarer.ServerTest do
) )
assert {:ok, :initial} = assert {:ok, :initial} =
Wayfarer.Target.current_status({Support.Example, :http, ~i"127.0.0.1", port}) Wayfarer.Target.current_status(
{Support.Example, :http, ~i"127.0.0.1", port, :auto}
)
end end
test "an initial routing table can be passed as options" do test "an initial routing table can be passed as options" do

View file

@ -58,8 +58,15 @@ defmodule WayfarerTest do
] ]
) )
wait_for_target_state({IntegrationProxy, :http, ~i"127.0.0.1", target0_port}, :healthy) wait_for_target_state(
wait_for_target_state({IntegrationProxy, :http, ~i"127.0.0.1", target1_port}, :healthy) {IntegrationProxy, :http, ~i"127.0.0.1", target0_port, :auto},
:healthy
)
wait_for_target_state(
{IntegrationProxy, :http, ~i"127.0.0.1", target1_port, :auto},
:healthy
)
assert {:ok, %{status: 200}} = assert {:ok, %{status: 200}} =
request(:http, ~i"127.0.0.1", listener_port, host: "www.example.com") request(:http, ~i"127.0.0.1", listener_port, host: "www.example.com")