wip: add listing and shutting down of listeners.
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
James Harton 2024-09-07 21:27:33 +12:00
parent 6e9ac76dd5
commit ce53a145f6
Signed by: james
GPG key ID: 90E82DAA13F624F4
14 changed files with 333 additions and 6 deletions

14
lib/wayfarer/error.ex Normal file
View file

@ -0,0 +1,14 @@
defmodule Wayfarer.Error do
@moduledoc """
The main error aggregator for Wayfarer errors.
"""
use Splode,
error_classes: [
listener: __MODULE__.Listener,
target: __MODULE__.Target,
server: __MODULE__.Server,
unknown: __MODULE__.Unknown
],
unknown_error: __MODULE__.Unknown
end

View file

@ -0,0 +1,7 @@
defmodule Wayfarer.Error.Listener do
@moduledoc """
The error class for Listener errors.
"""
use Splode.ErrorClass, class: :listener
end

View file

@ -0,0 +1,24 @@
defmodule Wayfarer.Error.Listener.NoSuchListener do
@moduledoc """
An exception which is returned when requesting information about a listener
which no longer exists.
"""
use Splode.Error, fields: [:listener], class: :listener
@doc false
@impl true
def message(error) do
"""
# No Such Listener
#{@moduledoc}
## Listener requested:
```elixir
#{inspect(error.listener)}
```
"""
end
end

View file

@ -0,0 +1,7 @@
defmodule Wayfarer.Error.Server do
@moduledoc """
The error class for Server errors.
"""
use Splode.ErrorClass, class: :server
end

View file

@ -0,0 +1,7 @@
defmodule Wayfarer.Error.Target do
@moduledoc """
The error class for Target errors.
"""
use Splode.ErrorClass, class: :target
end

View file

@ -0,0 +1,11 @@
defmodule Wayfarer.Error.Unknown do
@moduledoc """
An unknown error.
"""
use Splode.Error, fields: [:error, :message], class: :unknown
@doc false
@impl true
def message(error), do: error.message
end

View file

@ -1,9 +1,26 @@
defmodule Wayfarer.Listener do
# @moduledoc ⬇️⬇️
defstruct [
:address,
:certfile,
:cipher_suite,
:drain_timeout,
:http_1_options,
:http_2_options,
:keyfile,
:module,
:name,
:port,
:scheme,
:thousand_island_options,
:websocket_options
]
use GenServer, restart: :transient
require Logger
alias Spark.Options
alias Wayfarer.Listener.Registry
import Wayfarer.Utils
@options_schema [
@ -82,6 +99,24 @@ defmodule Wayfarer.Listener do
]
]
@type t :: %__MODULE__{
scheme: :http | :https | :ws | :wss,
port: :socket.port_number(),
address: IP.Address.t(),
drain_timeout: timeout(),
module: module,
name: nil | String.t(),
keyfile: String.t(),
certfile: String.t(),
cipher_suite: nil | :strong | :compatible,
http_1_options: Bandit.http_1_options(),
http_2_options: Bandit.http_2_options(),
websocket_options: Bandit.websocket_options(),
thousand_island_options: ThousandIsland.options()
}
@type status :: :starting | :accepting_connections | :draining
@moduledoc """
A GenServer which manages the state of each Bandit listener.
@ -101,14 +136,17 @@ defmodule Wayfarer.Listener do
@impl true
def init(options) do
with {:ok, options} <- validate_options(options),
listener <- struct(__MODULE__, options),
:ok <- Registry.register(listener),
bandit_options <- build_bandit_options(options),
{:ok, pid} <- Bandit.start_link(bandit_options),
{:ok, {listen_address, listen_port}} <- ThousandIsland.listener_info(pid),
:ok <- Registry.update_status(listener, :accepting_connections),
{:ok, listen_address} <- IP.Address.from_tuple(listen_address),
{:ok, uri} <- to_uri(options[:scheme], listen_address, listen_port) do
Logger.info("Started Wayfarer listener on #{uri}")
{:ok, %{server: pid, name: options[:name], uri: uri}}
{:ok, %{server: pid, name: options[:name], uri: uri, listener: listener}}
else
:error -> {:stop, "Unable to retrieve listener information."}
{:error, reason} -> {:stop, reason}
@ -117,10 +155,41 @@ defmodule Wayfarer.Listener do
@doc false
@impl true
def terminate(:normal, %{server: server}) do
GenServer.stop(server, :normal)
def handle_call(:terminate, _from, state) do
case Registry.update_status(state.listener, :draining) do
:ok ->
task =
Task.async(fn ->
ThousandIsland.stop(state.server, state.listener.drain_timeout)
end)
state = Map.put(state, :shutdown_task, task)
case Registry.get_status(state.listener) do
{:ok, status} -> {:reply, status, state}
{:error, _} -> {:stop, :normal, :stopped, state}
end
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
@doc false
@impl true
def handle_info({ref, _}, state) when ref == state.shutdown_task.ref,
# We don't actually care about the result of the shutdown task, just ignore
# it.
do: {:noreply, state}
def handle_info({:DOWN, _ref, :process, pid, reason}, state)
when state.shutdown_task.pid == pid,
do: {:stop, reason, state}
def handle_info({:DOWN, _ref, :process, pid, reason}, state)
when state.server == pid,
do: {:stop, reason, state}
defp validate_options(options) do
case Keyword.fetch(options, :scheme) do
{:ok, :https} ->

View file

@ -0,0 +1,87 @@
defmodule Wayfarer.Listener.Registry do
@moduledoc """
Functions for interacting with the Listener registry.
"""
alias Wayfarer.{
Error.Listener.NoSuchListener,
Listener
}
@doc """
Register the calling process as a listener.
"""
@spec register(Listener.t(), Listener.status()) :: :ok | {:error, any}
def register(listener, status \\ :starting) do
key = registry_key(listener)
with {:ok, _pid} <- Registry.register(__MODULE__, key, {listener, status}) do
:ok
end
end
@doc """
List active listeners for a server module.
"""
@spec list_listeners_for_module(module) :: {:ok, [Listener.t()]} | {:error, any}
def list_listeners_for_module(module) do
# :ets.fun2ms(fn {{module, _scheme, _address, _port}, _pid, {listener, _status}} when module == :module ->
# listener
# end)
Registry.select(
__MODULE__,
[
{{{:"$1", :"$2", :"$3", :"$4"}, :"$5", {:"$6", :"$7"}}, [{:==, :"$1", module}], [:"$6"]}
]
)
end
@doc """
Return the status of a listener.
"""
@spec get_status(Listener.t()) :: {:ok, Listener.status()} | {:error, any}
def get_status(listener) do
key = registry_key(listener)
case Registry.lookup(__MODULE__, key) do
[{_pid, value}] -> {:ok, value}
[] -> {:error, NoSuchListener.exception(listener: listener)}
end
end
@doc """
Return the process ID of a registered listener.
"""
@spec get_pid(Listener.t()) :: {:ok, pid} | {:error, any}
def get_pid(listener) do
key = registry_key(listener)
case Registry.lookup(__MODULE__, key) do
[{pid, _value}] -> {:ok, pid}
[] -> {:error, NoSuchListener.exception(listener: listener)}
end
end
@doc """
Update the status of a listener.
"""
@spec update_status(Listener.t(), Listener.status()) :: :ok | {:error, any}
def update_status(listener, status) do
key = registry_key(listener)
case Registry.update_value(__MODULE__, key, &update_status_callback(&1, status)) do
{_, _} -> :ok
:error -> {:error, NoSuchListener.exception(listener: listener)}
end
end
defp update_status_callback({listener, _old_status}, new_status),
do: {listener, new_status}
@doc """
Returns the registry key for a listener.
"""
@spec registry_key(Listener.t()) :: any
def registry_key(listener),
do: {listener.module, listener.scheme, listener.address, listener.port}
end

View file

@ -8,7 +8,8 @@ defmodule Wayfarer.Listener.Supervisor do
@impl true
def init(_) do
[
{DynamicSupervisor, name: Wayfarer.Listener.DynamicSupervisor, strategy: :one_for_one}
{DynamicSupervisor, name: Wayfarer.Listener.DynamicSupervisor, strategy: :one_for_one},
{Registry, name: Wayfarer.Listener.Registry, keys: :unique}
]
|> Supervisor.init(strategy: :one_for_one)
end

View file

@ -1,6 +1,14 @@
defmodule Wayfarer.Server do
alias Spark.Options
alias Wayfarer.{Dsl, Listener, Router, Server, Target}
alias Wayfarer.{
Dsl,
Listener,
Router,
Server,
Target
}
use GenServer
require Logger
@ -142,6 +150,21 @@ defmodule Wayfarer.Server do
end
end
@doc """
List the active listeners for a server.
"""
@spec list_listeners(module) :: {:ok, [Listener.t()]} | {:error, any}
def list_listeners(module), do: Listener.Registry.list_listeners_for_module(module)
@doc """
Remove a listener from a running server.
"""
@spec remove_listener(module, Listener.t()) :: {:ok, :stopped | :draining} | {:error, any}
def remove_listener(module, listener) do
{:via, Registry, {Wayfarer.Server.Registry, module}}
|> GenServer.call({:remove_listener, listener})
end
@doc """
Add a target to an already running server.
@ -153,6 +176,10 @@ defmodule Wayfarer.Server do
The following options are supported by target configuration:
#{Options.docs(Target.schema())}
The following options are supported by health-check configuration:
#{Options.docs(Dsl.HealthCheck.schema())}
"""
@spec add_target(module, options) :: :ok | {:error, any}
def add_target(module, options) do
@ -234,6 +261,17 @@ defmodule Wayfarer.Server do
{:reply, start_target(target, state), state}
end
def handle_call({:remove_listener, listener}, _from, state) do
case Listener.Registry.get_pid(listener) do
{:ok, pid} ->
Process.unlink(pid)
{:reply, GenServer.call(pid, :terminate), state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
defp start_listeners(listeners, state) do
listeners
|> Enum.reduce_while({:ok, state}, fn listener, success ->

View file

@ -1,6 +1,8 @@
defmodule Wayfarer.Target do
# @moduledoc ⬇️⬇️
defstruct [:scheme, :port, :address, :module, :name, :transport]
use GenServer, restart: :transient
require Logger
alias Spark.Options
@ -58,6 +60,15 @@ defmodule Wayfarer.Target do
{"Accept", "*/*"}
]
@type t :: %__MODULE__{
scheme: :http | :https | :ws | :wss,
port: :socket.port_number(),
address: IP.Address.t(),
module: module,
name: nil | String.t(),
transport: :http1 | :http2 | :auto
}
@moduledoc """
A GenServer responsible for performing health-checks against HTTP and HTTPS
targets.
@ -144,7 +155,7 @@ defmodule Wayfarer.Target do
status: :initial
}
Registry.register(Wayfarer.Target.Registry, key, uri)
Registry.register(Wayfarer.Target.Registry, key, struct(__MODULE__, options))
{:ok, state, {:continue, :perform_health_checks}}
end

View file

@ -100,6 +100,7 @@ defmodule Wayfarer.MixProject do
{:nimble_options, "~> 1.0"},
{:plug, "~> 1.15"},
{:spark, "~> 2.0"},
{:splode, "~> 0.2"},
{:telemetry, "~> 1.2"},
{:websock, "~> 0.5"},
{:websock_adapter, "~> 0.5"},

View file

@ -38,6 +38,7 @@
"sourceror": {:hex, :sourceror, "1.6.0", "9907884e1449a4bd7dbaabe95088ed4d9a09c3c791fb0103964e6316bc9448a7", [:mix], [], "hexpm", "e90aef8c82dacf32c89c8ef83d1416fc343cd3e5556773eeffd2c1e3f991f699"},
"spark": {:hex, :spark, "2.2.23", "78f0a1b0b713a91ad556fe9dc19ec92d977aaa0803cce2e255d90e58b9045c2a", [:mix], [{:igniter, ">= 0.2.6 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "a354b5cd7c3f021e3cd1da5a033b7643fe7b3c71c96b96d9f500a742f40c94db"},
"spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"},
"splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"ucwidth": {:hex, :ucwidth, "0.2.0", "1f0a440f541d895dff142275b96355f7e91e15bca525d4a0cc788ea51f0e3441", [:mix], [], "hexpm", "c1efd1798b8eeb11fb2bec3cafa3dd9c0c3647bee020543f0340b996177355bf"},

View file

@ -1,4 +1,5 @@
defmodule Wayfarer.Server.DynamicTest do
@moduledoc false
use ExUnit.Case, async: true
alias Wayfarer.{Listener, Server, Target}
@ -7,10 +8,12 @@ defmodule Wayfarer.Server.DynamicTest do
import IP.Sigil
defmodule DynamicServer1 do
@moduledoc false
use Wayfarer.Server
end
defmodule DynamicServer2 do
@moduledoc false
use Wayfarer.Server
end
@ -56,4 +59,50 @@ defmodule Wayfarer.Server.DynamicTest do
)
end
end
describe "Server.list_listeners/1" do
test "when there are no listeners it returns an empty list" do
assert [] = Server.list_listeners(DynamicServer1)
end
test "when there are listeners running it returns a list" do
port = random_port()
assert {:ok, _pid} =
Server.add_listener(DynamicServer1,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
assert [{%Listener{} = listener, :accepting_connections}] =
Server.list_listeners(DynamicServer1)
assert listener.scheme == :http
assert listener.address == ~i"127.0.0.1"
assert listener.port == port
end
end
describe "Server.remove_listener/2" do
test "when there are no listeners running it returns an error" do
listener = %Listener{scheme: :http, address: ~i"127.0.0.1", port: random_port()}
assert {:error, :wat} = Server.remove_listener(DynamicServer1, listener)
end
test "when there listeners running with no active connections, it stops immediately" do
port = random_port()
assert {:ok, _pid} =
Server.add_listener(DynamicServer1,
scheme: :http,
address: ~i"127.0.0.1",
port: port
)
[{listener, _}] = Server.list_listeners(DynamicServer1)
assert {:error, :wat} = Server.remove_listener(DynamicServer1, listener)
end
end
end