diff --git a/lib/wayfarer/router.ex b/lib/wayfarer/router.ex index 67d6775..4dd89f0 100644 --- a/lib/wayfarer/router.ex +++ b/lib/wayfarer/router.ex @@ -6,8 +6,7 @@ defmodule Wayfarer.Router do This module provides a standardised interface to interact with a routing table. """ - import Wayfarer.Utils - alias Wayfarer.Target.Selector + alias Wayfarer.{Target.Selector, Utils} require Selector @table_options [ @@ -50,10 +49,9 @@ defmodule Wayfarer.Router do Uniquely identifies a request target, either a remote http(s) server or a local `Plug`. """ - @type target :: - {scheme, :inet.ip_address(), :socket.port_number()} - | {:plug, module} - | {:plug, {module, any}} + @type target :: http_target | plug_target + @type http_target :: {scheme, :inet.ip_address(), :socket.port_number()} + @type plug_target :: {:plug, module} | {:plug, {module, any}} @typedoc """ The algorithm used to select which target to forward requests to (when there @@ -142,12 +140,14 @@ defmodule Wayfarer.Router do Change a target's health state. """ @spec update_target_health_status(:ets.tid(), target, health) :: :ok - def update_target_health_status(table, {scheme, address, port}, status) do + def update_target_health_status(table, target, status) do # Match spec generated using: # :ets.fun2ms(fn {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, _} -> # {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, :healthy} # end) + {:ok, {scheme, address, port}} = Utils.sanitise_target(target) + match_spec = [ {{:"$1", :"$2", {scheme, address, port}, :"$3", :_}, [], [{{:"$1", :"$2", {{scheme, {address}, port}}, :"$3", status}}]} @@ -266,16 +266,7 @@ defmodule Wayfarer.Router do )} end - defp sanitise_listener({scheme, address, port}) do - with {:ok, scheme} <- sanitise_scheme(scheme), - {:ok, address} <- sanitise_ip_address(address), - {:ok, port} <- sanitise_port(port) do - {:ok, {scheme, address, port}} - end - end - - defp sanitise_listener(listener), - do: {:error, ArgumentError.exception(message: "Not a valid listener: `#{inspect(listener)}")} + defp sanitise_listener(listener), do: Utils.sanitise_target(listener) defp sanitise_target({:plug, module}), do: sanitise_target({:plug, module, []}) @@ -290,7 +281,8 @@ defmodule Wayfarer.Router do end end - defp sanitise_target({scheme, address, port}), do: sanitise_listener({scheme, address, port}) + defp sanitise_target({scheme, address, port}), + do: Utils.sanitise_target({scheme, address, port}) defp sanitise_target(target), do: {:error, ArgumentError.exception(message: "Not a valid target: `#{inspect(target)}")} diff --git a/lib/wayfarer/server.ex b/lib/wayfarer/server.ex index 2e8bbc3..826ae44 100644 --- a/lib/wayfarer/server.ex +++ b/lib/wayfarer/server.ex @@ -83,6 +83,34 @@ defmodule Wayfarer.Server do @type options :: keyword + @type target_options :: + [ + unquote( + Dsl.Target.schema() + |> OptionsHelpers.sanitize_schema() + |> NimbleOptions.option_typespec() + ) + ] + + @doc """ + Add a new target to the server. + """ + @spec add_target(GenServer.server(), target_options) :: :ok | {:error, any} + def add_target(server, target) do + case OptionsHelpers.validate(target, Dsl.Target.schema()) do + {:ok, options} -> GenServer.call(server, {:add_target, options}) + {:error, reason} -> {:error, reason} + end + end + + @doc """ + Stop new connections from being sent to the target. + """ + @spec drain_target(GenServer.server(), Router.http_target()) :: :ok | {:error, any} + def drain_target(server, {scheme, address, port}) do + GenServer.cast(server, {:target_status_change, scheme, address, port, :draining}) + end + @doc false @spec __using__(any) :: Macro.output() defmacro __using__(opts) do @@ -152,7 +180,8 @@ defmodule Wayfarer.Server do initial_routing_table <- Keyword.get(options, :routing_table, []), {:ok, routing_table} <- Router.init(module), :ok <- Router.import_routes(routing_table, initial_routing_table), - state <- %{module: module, routing_table: routing_table}, + {:ok, tref} <- :timer.send_interval(1_000, :tick), + state <- %{module: module, routing_table: routing_table, draining: %{}, timer: tref}, {:ok, state} <- start_listeners(listeners, state), {:ok, state} <- start_targets(targets, state) do {:ok, state} @@ -166,15 +195,34 @@ defmodule Wayfarer.Server do @impl true @spec handle_cast(any, map) :: {:noreply, map} def handle_cast({:target_status_change, scheme, address, port, status}, state) do - Router.update_target_health_status( - state.routing_table, - {scheme, IP.Address.to_tuple(address), port}, - status - ) + :ok = + Router.update_target_health_status( + state.routing_table, + {scheme, address, port}, + status + ) {:noreply, state} end + @doc false + @impl true + @spec handle_call(any, GenServer.from(), map) :: {:reply, :ok | {:error, any}, map} + def handle_call({:add_target, target}, _from, state) do + case start_targets([target], state) do + {:ok, state} -> {:reply, :ok, state} + {:error, reason} -> {:reply, {:error, reason}, state} + end + end + + @doc false + @impl true + def handle_info(:tick, state) do + # Iterate all targets for this server and terminate any draining ones with + # no active connections. + {:noreply, state} + end + defp start_listeners(listeners, state) do listeners |> Enum.reduce_while({:ok, state}, fn listener, success -> @@ -215,6 +263,9 @@ defmodule Wayfarer.Server do end) end + defp terminate_target(target), + do: DynamicSupervisor.terminate_child(Target.DynamicSupervisor, {Target, target}) + defp assert_is_server(module) do if Spark.implements_behaviour?(module, __MODULE__) do {:ok, module} diff --git a/lib/wayfarer/target/active_connections.ex b/lib/wayfarer/target/active_connections.ex index 9253fde..2be4956 100644 --- a/lib/wayfarer/target/active_connections.ex +++ b/lib/wayfarer/target/active_connections.ex @@ -39,6 +39,8 @@ defmodule Wayfarer.Target.ActiveConnections do size = :ets.info(state.table, :size) Logger.debug("Active connections: #{size}") + dbg(:ets.tab2list(state.table)) + {:noreply, state} end @@ -60,6 +62,7 @@ defmodule Wayfarer.Target.ActiveConnections do """ @spec connect(Router.target()) :: :ok def connect(target) do + {:ok, target} = Utils.sanitise_target(target) :ets.insert(__MODULE__, {target, self(), System.monotonic_time()}) GenServer.cast(__MODULE__, {:monitor, self()}) end @@ -69,6 +72,7 @@ defmodule Wayfarer.Target.ActiveConnections do """ @spec disconnect(Router.target()) :: :ok def disconnect(target) do + {:ok, target} = Utils.sanitise_target(target) :ets.match_delete(__MODULE__, {target, self(), :_}) :ok end @@ -82,7 +86,11 @@ defmodule Wayfarer.Target.ActiveConnections do def request_count(targets) do # :ets.fun2ms(fn {target, _, _} when target in [:targeta, :targetb] -> target end) - targets = List.wrap(targets) + {:ok, targets} = + targets + |> List.wrap() + |> Utils.sanitise_targets() + target_guard = Utils.targets_to_ms_guard(:"$1", targets) match_spec = [{{:"$1", :_, :_}, target_guard, [:"$1"]}] @@ -98,6 +106,11 @@ defmodule Wayfarer.Target.ActiveConnections do def last_request_time(targets) do # :ets.fun2ms(fn {target, _, t} when target in [:targeta, :targetb] -> {target, t} end) + {:ok, targets} = + targets + |> List.wrap() + |> Utils.sanitise_targets() + target_guard = Utils.targets_to_ms_guard(:"$1", targets) match_spec = [{{:"$1", :_, :"$2"}, target_guard, [{{:"$1", :"$2"}}]}] diff --git a/lib/wayfarer/utils.ex b/lib/wayfarer/utils.ex index 59d0b4d..e68c7cb 100644 --- a/lib/wayfarer/utils.ex +++ b/lib/wayfarer/utils.ex @@ -6,6 +6,36 @@ defmodule Wayfarer.Utils do @type address_input :: IP.Address.t() | String.t() | :inet.ip_address() @type port_number :: 1..0xFFFF @type scheme :: :http | :https + @type target :: {scheme, address_input, port_number} + + @doc """ + Sanitise a target tuple by applying `sanitise_scheme/1`, + `sanitise_ip_address/1` and `sanitise_port/1` in sequence. + """ + @spec sanitise_target(target) :: {:ok, target} | {:error, any} + def sanitise_target({scheme, address, port}) do + with {:ok, scheme} <- sanitise_scheme(scheme), + {:ok, address} <- sanitise_ip_address(address), + {:ok, port} <- sanitise_port(port) do + {:ok, {scheme, address, port}} + end + end + + def sanitise_target(invalid), do: {:error, "Invalid target #{inspect(invalid)}"} + + @doc """ + Sanitise an enumerable of targets by applying `sanitise_target/1` to them all. + """ + @spec sanitise_targets(Enumerable.t(target)) :: {:ok, [target]} | {:error, any} + def sanitise_targets(targets) do + targets + |> Enum.reduce_while({:ok, []}, fn target, {:ok, targets} -> + case sanitise_target(target) do + {:ok, target} -> {:cont, {:ok, [target | targets]}} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end @doc """ Verify an IP address and convert it into a tuple. diff --git a/test/support/example.ex b/test/support/example.ex index bec3d7e..5e9e345 100644 --- a/test/support/example.ex +++ b/test/support/example.ex @@ -9,7 +9,7 @@ defmodule Support.Example do end targets do - http "127.0.0.1", 8082 + # http "127.0.0.1", 8082 http "192.168.4.26", 80 end diff --git a/test/support/utils.ex b/test/support/utils.ex new file mode 100644 index 0000000..3f6cc34 --- /dev/null +++ b/test/support/utils.ex @@ -0,0 +1,50 @@ +defmodule Support.Utils do + @moduledoc false + + import Wayfarer.Utils + + defmacro __using__(_) do + quote do + import unquote(__MODULE__) + end + end + + def wait_for_target_state({module, scheme, address, port}, state) do + with {:ok, scheme} <- sanitise_scheme(scheme), + {:ok, address} <- sanitise_ip_address(address), + {:ok, port} <- sanitise_port(port) do + do_wait_for_target_state({module, scheme, IP.Address.from_tuple!(address), port}, state, 10) + end + end + + defp do_wait_for_target_state(key, state, 0) do + case Wayfarer.Target.current_status(key) do + {:ok, ^state} -> + :ok + + {:ok, _} -> + {:error, "Target never reached state #{inspect(state)}"} + + {:error, reason} -> + raise reason + end + end + + defp do_wait_for_target_state(key, state, iterations_left) do + case Wayfarer.Target.current_status(key) do + {:ok, ^state} -> + :ok + + {:ok, _} -> + Process.sleep(100) + do_wait_for_target_state(key, state, iterations_left - 1) + + {:error, reason} -> + raise reason + end + catch + :exit, {:noproc, _} -> + Process.sleep(100) + do_wait_for_target_state(key, state, iterations_left - 1) + end +end diff --git a/test/wayfarer/server_test.exs b/test/wayfarer/server_test.exs index 69691d4..890df1a 100644 --- a/test/wayfarer/server_test.exs +++ b/test/wayfarer/server_test.exs @@ -3,19 +3,23 @@ defmodule Wayfarer.ServerTest do use ExUnit.Case, async: false use Support.PortTracker use Support.HttpRequest + alias Support.HttpRequest + alias Support.HttpServer + alias Support.Utils + alias Wayfarer.{Listener, Server, Target} import IP.Sigil setup do - start_supervised!(Wayfarer.Listener.Supervisor) - start_supervised!(Wayfarer.Target.Supervisor) - start_supervised!(Wayfarer.Server.Supervisor) + start_supervised!(Listener.Supervisor) + start_supervised!(Target.Supervisor) + start_supervised!(Server.Supervisor) :ok end describe "init/1" do test "it requires a module option which implements the `Wayfarer.Server` behaviour" do - assert {:stop, reason} = Wayfarer.Server.init(module: URI) + assert {:stop, reason} = Server.init(module: URI) assert reason =~ ~r/does not implement/ end @@ -23,7 +27,7 @@ defmodule Wayfarer.ServerTest do port = random_port() assert {:ok, _state} = - Wayfarer.Server.init( + Server.init( module: Support.Example, listeners: [[address: ~i"127.0.0.1", port: port, scheme: :http]] ) @@ -35,13 +39,13 @@ defmodule Wayfarer.ServerTest do port = random_port() assert {:ok, _state} = - Wayfarer.Server.init( + Server.init( module: Support.Example, targets: [[address: ~i"127.0.0.1", port: port, scheme: :http]] ) assert {:ok, :initial} = - Wayfarer.Target.current_status({Support.Example, :http, ~i"127.0.0.1", port}) + Target.current_status({Support.Example, :http, ~i"127.0.0.1", port}) end test "an initial routing table can be passed as options" do @@ -49,7 +53,7 @@ defmodule Wayfarer.ServerTest do target_port = random_port() assert {:ok, _state} = - Wayfarer.Server.init( + Server.init( module: Support.Example, routing_table: [ { @@ -67,4 +71,61 @@ defmodule Wayfarer.ServerTest do ] end end + + describe "add_target/2" do + test "it can add a new target to the server" do + {:ok, pid} = Server.start_link(module: Support.Example) + + target_port = random_port() + + {:ok, _} = HttpServer.start_link(target_port, 200, "OK") + + assert :ok = + Server.add_target(pid, + address: ~i"127.0.0.1", + port: target_port, + scheme: :http, + health_checks: [[interval: 10, threshold: 3]] + ) + end + end + + describe "drain_target/2" do + test "it stops new requests being forwarded to the target" do + listener_port = random_port() + target_port = random_port() + + {:ok, _} = HttpServer.start_link(target_port, 200, "OK") + + {:ok, pid} = + Server.start_link( + module: Support.Example, + listeners: [ + [ + scheme: :http, + address: ~i"127.0.0.1", + port: listener_port + ] + ], + targets: [ + [ + scheme: :http, + address: ~i"127.0.0.1", + port: target_port, + health_checks: [[interval: 10, threshold: 3]] + ] + ], + routing_table: [ + {{:http, ~i"127.0.0.1", listener_port}, {:http, ~i"127.0.0.1", target_port}, + ["example.com"], :round_robin} + ] + ) + + Utils.wait_for_target_state({Support.Example, :http, ~i"127.0.0.1", target_port}, :healthy) + + assert {:ok, %{status: 200}} = HttpRequest.request(:http, ~i"127.0.0.1", listener_port) + + assert :ok = Server.drain_target(pid, {:http, ~i"127.0.0.1", target_port}) + end + end end diff --git a/test/wayfarer_test.exs b/test/wayfarer_test.exs index c13d435..56e128a 100644 --- a/test/wayfarer_test.exs +++ b/test/wayfarer_test.exs @@ -8,6 +8,7 @@ defmodule WayfarerTest do use Support.HttpRequest use Support.PortTracker + use Support.Utils import IP.Sigil @@ -92,18 +93,4 @@ defmodule WayfarerTest do |> Enum.map(&elem(&1, 1)) end end - - defp wait_for_target_state(key, state) do - case Wayfarer.Target.current_status(key) do - {:ok, ^state} -> - :ok - - {:ok, _} -> - Process.sleep(100) - wait_for_target_state(key, state) - - {:error, reason} -> - raise reason - end - end end