From ce53a145f6a80bb50e9608b31064112548515a40 Mon Sep 17 00:00:00 2001 From: James Harton Date: Sat, 7 Sep 2024 21:27:33 +1200 Subject: [PATCH] wip: add listing and shutting down of listeners. --- lib/wayfarer/error.ex | 14 +++ lib/wayfarer/error/listener.ex | 7 ++ .../error/listener/no_such_listener.ex | 24 +++++ lib/wayfarer/error/server.ex | 7 ++ lib/wayfarer/error/target.ex | 7 ++ lib/wayfarer/error/unknown.ex | 11 +++ lib/wayfarer/listener.ex | 75 +++++++++++++++- lib/wayfarer/listener/registry.ex | 87 +++++++++++++++++++ lib/wayfarer/listener/supervisor.ex | 3 +- lib/wayfarer/server.ex | 40 ++++++++- lib/wayfarer/target.ex | 13 ++- mix.exs | 1 + mix.lock | 1 + test/wayfarer/server/dynamic_test.exs | 49 +++++++++++ 14 files changed, 333 insertions(+), 6 deletions(-) create mode 100644 lib/wayfarer/error.ex create mode 100644 lib/wayfarer/error/listener.ex create mode 100644 lib/wayfarer/error/listener/no_such_listener.ex create mode 100644 lib/wayfarer/error/server.ex create mode 100644 lib/wayfarer/error/target.ex create mode 100644 lib/wayfarer/error/unknown.ex create mode 100644 lib/wayfarer/listener/registry.ex diff --git a/lib/wayfarer/error.ex b/lib/wayfarer/error.ex new file mode 100644 index 0000000..46e8fcc --- /dev/null +++ b/lib/wayfarer/error.ex @@ -0,0 +1,14 @@ +defmodule Wayfarer.Error do + @moduledoc """ + The main error aggregator for Wayfarer errors. + """ + + use Splode, + error_classes: [ + listener: __MODULE__.Listener, + target: __MODULE__.Target, + server: __MODULE__.Server, + unknown: __MODULE__.Unknown + ], + unknown_error: __MODULE__.Unknown +end diff --git a/lib/wayfarer/error/listener.ex b/lib/wayfarer/error/listener.ex new file mode 100644 index 0000000..165e4f5 --- /dev/null +++ b/lib/wayfarer/error/listener.ex @@ -0,0 +1,7 @@ +defmodule Wayfarer.Error.Listener do + @moduledoc """ + The error class for Listener errors. + """ + + use Splode.ErrorClass, class: :listener +end diff --git a/lib/wayfarer/error/listener/no_such_listener.ex b/lib/wayfarer/error/listener/no_such_listener.ex new file mode 100644 index 0000000..5e5c9d6 --- /dev/null +++ b/lib/wayfarer/error/listener/no_such_listener.ex @@ -0,0 +1,24 @@ +defmodule Wayfarer.Error.Listener.NoSuchListener do + @moduledoc """ + An exception which is returned when requesting information about a listener + which no longer exists. + """ + + use Splode.Error, fields: [:listener], class: :listener + + @doc false + @impl true + def message(error) do + """ + # No Such Listener + + #{@moduledoc} + + ## Listener requested: + + ```elixir + #{inspect(error.listener)} + ``` + """ + end +end diff --git a/lib/wayfarer/error/server.ex b/lib/wayfarer/error/server.ex new file mode 100644 index 0000000..c15ed01 --- /dev/null +++ b/lib/wayfarer/error/server.ex @@ -0,0 +1,7 @@ +defmodule Wayfarer.Error.Server do + @moduledoc """ + The error class for Server errors. + """ + + use Splode.ErrorClass, class: :server +end diff --git a/lib/wayfarer/error/target.ex b/lib/wayfarer/error/target.ex new file mode 100644 index 0000000..becfed2 --- /dev/null +++ b/lib/wayfarer/error/target.ex @@ -0,0 +1,7 @@ +defmodule Wayfarer.Error.Target do + @moduledoc """ + The error class for Target errors. + """ + + use Splode.ErrorClass, class: :target +end diff --git a/lib/wayfarer/error/unknown.ex b/lib/wayfarer/error/unknown.ex new file mode 100644 index 0000000..65bd69e --- /dev/null +++ b/lib/wayfarer/error/unknown.ex @@ -0,0 +1,11 @@ +defmodule Wayfarer.Error.Unknown do + @moduledoc """ + An unknown error. + """ + + use Splode.Error, fields: [:error, :message], class: :unknown + + @doc false + @impl true + def message(error), do: error.message +end diff --git a/lib/wayfarer/listener.ex b/lib/wayfarer/listener.ex index e894962..829cada 100644 --- a/lib/wayfarer/listener.ex +++ b/lib/wayfarer/listener.ex @@ -1,9 +1,26 @@ defmodule Wayfarer.Listener do # @moduledoc ⬇️⬇️ + defstruct [ + :address, + :certfile, + :cipher_suite, + :drain_timeout, + :http_1_options, + :http_2_options, + :keyfile, + :module, + :name, + :port, + :scheme, + :thousand_island_options, + :websocket_options + ] + use GenServer, restart: :transient require Logger alias Spark.Options + alias Wayfarer.Listener.Registry import Wayfarer.Utils @options_schema [ @@ -82,6 +99,24 @@ defmodule Wayfarer.Listener do ] ] + @type t :: %__MODULE__{ + scheme: :http | :https | :ws | :wss, + port: :socket.port_number(), + address: IP.Address.t(), + drain_timeout: timeout(), + module: module, + name: nil | String.t(), + keyfile: String.t(), + certfile: String.t(), + cipher_suite: nil | :strong | :compatible, + http_1_options: Bandit.http_1_options(), + http_2_options: Bandit.http_2_options(), + websocket_options: Bandit.websocket_options(), + thousand_island_options: ThousandIsland.options() + } + + @type status :: :starting | :accepting_connections | :draining + @moduledoc """ A GenServer which manages the state of each Bandit listener. @@ -101,14 +136,17 @@ defmodule Wayfarer.Listener do @impl true def init(options) do with {:ok, options} <- validate_options(options), + listener <- struct(__MODULE__, options), + :ok <- Registry.register(listener), bandit_options <- build_bandit_options(options), {:ok, pid} <- Bandit.start_link(bandit_options), {:ok, {listen_address, listen_port}} <- ThousandIsland.listener_info(pid), + :ok <- Registry.update_status(listener, :accepting_connections), {:ok, listen_address} <- IP.Address.from_tuple(listen_address), {:ok, uri} <- to_uri(options[:scheme], listen_address, listen_port) do Logger.info("Started Wayfarer listener on #{uri}") - {:ok, %{server: pid, name: options[:name], uri: uri}} + {:ok, %{server: pid, name: options[:name], uri: uri, listener: listener}} else :error -> {:stop, "Unable to retrieve listener information."} {:error, reason} -> {:stop, reason} @@ -117,10 +155,41 @@ defmodule Wayfarer.Listener do @doc false @impl true - def terminate(:normal, %{server: server}) do - GenServer.stop(server, :normal) + def handle_call(:terminate, _from, state) do + case Registry.update_status(state.listener, :draining) do + :ok -> + task = + Task.async(fn -> + ThousandIsland.stop(state.server, state.listener.drain_timeout) + end) + + state = Map.put(state, :shutdown_task, task) + + case Registry.get_status(state.listener) do + {:ok, status} -> {:reply, status, state} + {:error, _} -> {:stop, :normal, :stopped, state} + end + + {:error, reason} -> + {:reply, {:error, reason}, state} + end end + @doc false + @impl true + def handle_info({ref, _}, state) when ref == state.shutdown_task.ref, + # We don't actually care about the result of the shutdown task, just ignore + # it. + do: {:noreply, state} + + def handle_info({:DOWN, _ref, :process, pid, reason}, state) + when state.shutdown_task.pid == pid, + do: {:stop, reason, state} + + def handle_info({:DOWN, _ref, :process, pid, reason}, state) + when state.server == pid, + do: {:stop, reason, state} + defp validate_options(options) do case Keyword.fetch(options, :scheme) do {:ok, :https} -> diff --git a/lib/wayfarer/listener/registry.ex b/lib/wayfarer/listener/registry.ex new file mode 100644 index 0000000..6972690 --- /dev/null +++ b/lib/wayfarer/listener/registry.ex @@ -0,0 +1,87 @@ +defmodule Wayfarer.Listener.Registry do + @moduledoc """ + Functions for interacting with the Listener registry. + """ + alias Wayfarer.{ + Error.Listener.NoSuchListener, + Listener + } + + @doc """ + Register the calling process as a listener. + """ + @spec register(Listener.t(), Listener.status()) :: :ok | {:error, any} + def register(listener, status \\ :starting) do + key = registry_key(listener) + + with {:ok, _pid} <- Registry.register(__MODULE__, key, {listener, status}) do + :ok + end + end + + @doc """ + List active listeners for a server module. + """ + @spec list_listeners_for_module(module) :: {:ok, [Listener.t()]} | {:error, any} + def list_listeners_for_module(module) do + # :ets.fun2ms(fn {{module, _scheme, _address, _port}, _pid, {listener, _status}} when module == :module -> + # listener + # end) + + Registry.select( + __MODULE__, + [ + {{{:"$1", :"$2", :"$3", :"$4"}, :"$5", {:"$6", :"$7"}}, [{:==, :"$1", module}], [:"$6"]} + ] + ) + end + + @doc """ + Return the status of a listener. + """ + @spec get_status(Listener.t()) :: {:ok, Listener.status()} | {:error, any} + def get_status(listener) do + key = registry_key(listener) + + case Registry.lookup(__MODULE__, key) do + [{_pid, value}] -> {:ok, value} + [] -> {:error, NoSuchListener.exception(listener: listener)} + end + end + + @doc """ + Return the process ID of a registered listener. + """ + @spec get_pid(Listener.t()) :: {:ok, pid} | {:error, any} + def get_pid(listener) do + key = registry_key(listener) + + case Registry.lookup(__MODULE__, key) do + [{pid, _value}] -> {:ok, pid} + [] -> {:error, NoSuchListener.exception(listener: listener)} + end + end + + @doc """ + Update the status of a listener. + """ + @spec update_status(Listener.t(), Listener.status()) :: :ok | {:error, any} + def update_status(listener, status) do + key = registry_key(listener) + + case Registry.update_value(__MODULE__, key, &update_status_callback(&1, status)) do + {_, _} -> :ok + :error -> {:error, NoSuchListener.exception(listener: listener)} + end + end + + defp update_status_callback({listener, _old_status}, new_status), + do: {listener, new_status} + + @doc """ + Returns the registry key for a listener. + """ + @spec registry_key(Listener.t()) :: any + def registry_key(listener), + do: {listener.module, listener.scheme, listener.address, listener.port} +end diff --git a/lib/wayfarer/listener/supervisor.ex b/lib/wayfarer/listener/supervisor.ex index 7e4a37d..d2d2e88 100644 --- a/lib/wayfarer/listener/supervisor.ex +++ b/lib/wayfarer/listener/supervisor.ex @@ -8,7 +8,8 @@ defmodule Wayfarer.Listener.Supervisor do @impl true def init(_) do [ - {DynamicSupervisor, name: Wayfarer.Listener.DynamicSupervisor, strategy: :one_for_one} + {DynamicSupervisor, name: Wayfarer.Listener.DynamicSupervisor, strategy: :one_for_one}, + {Registry, name: Wayfarer.Listener.Registry, keys: :unique} ] |> Supervisor.init(strategy: :one_for_one) end diff --git a/lib/wayfarer/server.ex b/lib/wayfarer/server.ex index 6c33015..92dfdbf 100644 --- a/lib/wayfarer/server.ex +++ b/lib/wayfarer/server.ex @@ -1,6 +1,14 @@ defmodule Wayfarer.Server do alias Spark.Options - alias Wayfarer.{Dsl, Listener, Router, Server, Target} + + alias Wayfarer.{ + Dsl, + Listener, + Router, + Server, + Target + } + use GenServer require Logger @@ -142,6 +150,21 @@ defmodule Wayfarer.Server do end end + @doc """ + List the active listeners for a server. + """ + @spec list_listeners(module) :: {:ok, [Listener.t()]} | {:error, any} + def list_listeners(module), do: Listener.Registry.list_listeners_for_module(module) + + @doc """ + Remove a listener from a running server. + """ + @spec remove_listener(module, Listener.t()) :: {:ok, :stopped | :draining} | {:error, any} + def remove_listener(module, listener) do + {:via, Registry, {Wayfarer.Server.Registry, module}} + |> GenServer.call({:remove_listener, listener}) + end + @doc """ Add a target to an already running server. @@ -153,6 +176,10 @@ defmodule Wayfarer.Server do The following options are supported by target configuration: #{Options.docs(Target.schema())} + + The following options are supported by health-check configuration: + + #{Options.docs(Dsl.HealthCheck.schema())} """ @spec add_target(module, options) :: :ok | {:error, any} def add_target(module, options) do @@ -234,6 +261,17 @@ defmodule Wayfarer.Server do {:reply, start_target(target, state), state} end + def handle_call({:remove_listener, listener}, _from, state) do + case Listener.Registry.get_pid(listener) do + {:ok, pid} -> + Process.unlink(pid) + {:reply, GenServer.call(pid, :terminate), state} + + {:error, reason} -> + {:reply, {:error, reason}, state} + end + end + defp start_listeners(listeners, state) do listeners |> Enum.reduce_while({:ok, state}, fn listener, success -> diff --git a/lib/wayfarer/target.ex b/lib/wayfarer/target.ex index d1b7e93..3f2e3bb 100644 --- a/lib/wayfarer/target.ex +++ b/lib/wayfarer/target.ex @@ -1,6 +1,8 @@ defmodule Wayfarer.Target do # @moduledoc ⬇️⬇️ + defstruct [:scheme, :port, :address, :module, :name, :transport] + use GenServer, restart: :transient require Logger alias Spark.Options @@ -58,6 +60,15 @@ defmodule Wayfarer.Target do {"Accept", "*/*"} ] + @type t :: %__MODULE__{ + scheme: :http | :https | :ws | :wss, + port: :socket.port_number(), + address: IP.Address.t(), + module: module, + name: nil | String.t(), + transport: :http1 | :http2 | :auto + } + @moduledoc """ A GenServer responsible for performing health-checks against HTTP and HTTPS targets. @@ -144,7 +155,7 @@ defmodule Wayfarer.Target do status: :initial } - Registry.register(Wayfarer.Target.Registry, key, uri) + Registry.register(Wayfarer.Target.Registry, key, struct(__MODULE__, options)) {:ok, state, {:continue, :perform_health_checks}} end diff --git a/mix.exs b/mix.exs index 30b08a0..3a0a502 100644 --- a/mix.exs +++ b/mix.exs @@ -100,6 +100,7 @@ defmodule Wayfarer.MixProject do {:nimble_options, "~> 1.0"}, {:plug, "~> 1.15"}, {:spark, "~> 2.0"}, + {:splode, "~> 0.2"}, {:telemetry, "~> 1.2"}, {:websock, "~> 0.5"}, {:websock_adapter, "~> 0.5"}, diff --git a/mix.lock b/mix.lock index 079ae8e..022ca4a 100644 --- a/mix.lock +++ b/mix.lock @@ -38,6 +38,7 @@ "sourceror": {:hex, :sourceror, "1.6.0", "9907884e1449a4bd7dbaabe95088ed4d9a09c3c791fb0103964e6316bc9448a7", [:mix], [], "hexpm", "e90aef8c82dacf32c89c8ef83d1416fc343cd3e5556773eeffd2c1e3f991f699"}, "spark": {:hex, :spark, "2.2.23", "78f0a1b0b713a91ad556fe9dc19ec92d977aaa0803cce2e255d90e58b9045c2a", [:mix], [{:igniter, ">= 0.2.6 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "a354b5cd7c3f021e3cd1da5a033b7643fe7b3c71c96b96d9f500a742f40c94db"}, "spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"}, + "splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "ucwidth": {:hex, :ucwidth, "0.2.0", "1f0a440f541d895dff142275b96355f7e91e15bca525d4a0cc788ea51f0e3441", [:mix], [], "hexpm", "c1efd1798b8eeb11fb2bec3cafa3dd9c0c3647bee020543f0340b996177355bf"}, diff --git a/test/wayfarer/server/dynamic_test.exs b/test/wayfarer/server/dynamic_test.exs index f73fe15..11039e5 100644 --- a/test/wayfarer/server/dynamic_test.exs +++ b/test/wayfarer/server/dynamic_test.exs @@ -1,4 +1,5 @@ defmodule Wayfarer.Server.DynamicTest do + @moduledoc false use ExUnit.Case, async: true alias Wayfarer.{Listener, Server, Target} @@ -7,10 +8,12 @@ defmodule Wayfarer.Server.DynamicTest do import IP.Sigil defmodule DynamicServer1 do + @moduledoc false use Wayfarer.Server end defmodule DynamicServer2 do + @moduledoc false use Wayfarer.Server end @@ -56,4 +59,50 @@ defmodule Wayfarer.Server.DynamicTest do ) end end + + describe "Server.list_listeners/1" do + test "when there are no listeners it returns an empty list" do + assert [] = Server.list_listeners(DynamicServer1) + end + + test "when there are listeners running it returns a list" do + port = random_port() + + assert {:ok, _pid} = + Server.add_listener(DynamicServer1, + scheme: :http, + address: ~i"127.0.0.1", + port: port + ) + + assert [{%Listener{} = listener, :accepting_connections}] = + Server.list_listeners(DynamicServer1) + + assert listener.scheme == :http + assert listener.address == ~i"127.0.0.1" + assert listener.port == port + end + end + + describe "Server.remove_listener/2" do + test "when there are no listeners running it returns an error" do + listener = %Listener{scheme: :http, address: ~i"127.0.0.1", port: random_port()} + assert {:error, :wat} = Server.remove_listener(DynamicServer1, listener) + end + + test "when there listeners running with no active connections, it stops immediately" do + port = random_port() + + assert {:ok, _pid} = + Server.add_listener(DynamicServer1, + scheme: :http, + address: ~i"127.0.0.1", + port: port + ) + + [{listener, _}] = Server.list_listeners(DynamicServer1) + + assert {:error, :wat} = Server.remove_listener(DynamicServer1, listener) + end + end end