wip:
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
James Harton 2024-02-05 14:53:39 +13:00
parent cc82bfb9d6
commit b2ab14809c
8 changed files with 232 additions and 48 deletions

View file

@ -6,8 +6,7 @@ defmodule Wayfarer.Router do
This module provides a standardised interface to interact with a routing
table.
"""
import Wayfarer.Utils
alias Wayfarer.Target.Selector
alias Wayfarer.{Target.Selector, Utils}
require Selector
@table_options [
@ -50,10 +49,9 @@ defmodule Wayfarer.Router do
Uniquely identifies a request target, either a remote http(s) server or a
local `Plug`.
"""
@type target ::
{scheme, :inet.ip_address(), :socket.port_number()}
| {:plug, module}
| {:plug, {module, any}}
@type target :: http_target | plug_target
@type http_target :: {scheme, :inet.ip_address(), :socket.port_number()}
@type plug_target :: {:plug, module} | {:plug, {module, any}}
@typedoc """
The algorithm used to select which target to forward requests to (when there
@ -142,12 +140,14 @@ defmodule Wayfarer.Router do
Change a target's health state.
"""
@spec update_target_health_status(:ets.tid(), target, health) :: :ok
def update_target_health_status(table, {scheme, address, port}, status) do
def update_target_health_status(table, target, status) do
# Match spec generated using:
# :ets.fun2ms(fn {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, _} ->
# {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, :healthy}
# end)
{:ok, {scheme, address, port}} = Utils.sanitise_target(target)
match_spec = [
{{:"$1", :"$2", {scheme, address, port}, :"$3", :_}, [],
[{{:"$1", :"$2", {{scheme, {address}, port}}, :"$3", status}}]}
@ -266,16 +266,7 @@ defmodule Wayfarer.Router do
)}
end
defp sanitise_listener({scheme, address, port}) do
with {:ok, scheme} <- sanitise_scheme(scheme),
{:ok, address} <- sanitise_ip_address(address),
{:ok, port} <- sanitise_port(port) do
{:ok, {scheme, address, port}}
end
end
defp sanitise_listener(listener),
do: {:error, ArgumentError.exception(message: "Not a valid listener: `#{inspect(listener)}")}
defp sanitise_listener(listener), do: Utils.sanitise_target(listener)
defp sanitise_target({:plug, module}), do: sanitise_target({:plug, module, []})
@ -290,7 +281,8 @@ defmodule Wayfarer.Router do
end
end
defp sanitise_target({scheme, address, port}), do: sanitise_listener({scheme, address, port})
defp sanitise_target({scheme, address, port}),
do: Utils.sanitise_target({scheme, address, port})
defp sanitise_target(target),
do: {:error, ArgumentError.exception(message: "Not a valid target: `#{inspect(target)}")}

View file

@ -83,6 +83,34 @@ defmodule Wayfarer.Server do
@type options :: keyword
@type target_options ::
[
unquote(
Dsl.Target.schema()
|> OptionsHelpers.sanitize_schema()
|> NimbleOptions.option_typespec()
)
]
@doc """
Add a new target to the server.
"""
@spec add_target(GenServer.server(), target_options) :: :ok | {:error, any}
def add_target(server, target) do
case OptionsHelpers.validate(target, Dsl.Target.schema()) do
{:ok, options} -> GenServer.call(server, {:add_target, options})
{:error, reason} -> {:error, reason}
end
end
@doc """
Stop new connections from being sent to the target.
"""
@spec drain_target(GenServer.server(), Router.http_target()) :: :ok | {:error, any}
def drain_target(server, {scheme, address, port}) do
GenServer.cast(server, {:target_status_change, scheme, address, port, :draining})
end
@doc false
@spec __using__(any) :: Macro.output()
defmacro __using__(opts) do
@ -152,7 +180,8 @@ defmodule Wayfarer.Server do
initial_routing_table <- Keyword.get(options, :routing_table, []),
{:ok, routing_table} <- Router.init(module),
:ok <- Router.import_routes(routing_table, initial_routing_table),
state <- %{module: module, routing_table: routing_table},
{:ok, tref} <- :timer.send_interval(1_000, :tick),
state <- %{module: module, routing_table: routing_table, draining: %{}, timer: tref},
{:ok, state} <- start_listeners(listeners, state),
{:ok, state} <- start_targets(targets, state) do
{:ok, state}
@ -166,15 +195,34 @@ defmodule Wayfarer.Server do
@impl true
@spec handle_cast(any, map) :: {:noreply, map}
def handle_cast({:target_status_change, scheme, address, port, status}, state) do
Router.update_target_health_status(
state.routing_table,
{scheme, IP.Address.to_tuple(address), port},
status
)
:ok =
Router.update_target_health_status(
state.routing_table,
{scheme, address, port},
status
)
{:noreply, state}
end
@doc false
@impl true
@spec handle_call(any, GenServer.from(), map) :: {:reply, :ok | {:error, any}, map}
def handle_call({:add_target, target}, _from, state) do
case start_targets([target], state) do
{:ok, state} -> {:reply, :ok, state}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
@doc false
@impl true
def handle_info(:tick, state) do
# Iterate all targets for this server and terminate any draining ones with
# no active connections.
{:noreply, state}
end
defp start_listeners(listeners, state) do
listeners
|> Enum.reduce_while({:ok, state}, fn listener, success ->
@ -215,6 +263,9 @@ defmodule Wayfarer.Server do
end)
end
defp terminate_target(target),
do: DynamicSupervisor.terminate_child(Target.DynamicSupervisor, {Target, target})
defp assert_is_server(module) do
if Spark.implements_behaviour?(module, __MODULE__) do
{:ok, module}

View file

@ -39,6 +39,8 @@ defmodule Wayfarer.Target.ActiveConnections do
size = :ets.info(state.table, :size)
Logger.debug("Active connections: #{size}")
dbg(:ets.tab2list(state.table))
{:noreply, state}
end
@ -60,6 +62,7 @@ defmodule Wayfarer.Target.ActiveConnections do
"""
@spec connect(Router.target()) :: :ok
def connect(target) do
{:ok, target} = Utils.sanitise_target(target)
:ets.insert(__MODULE__, {target, self(), System.monotonic_time()})
GenServer.cast(__MODULE__, {:monitor, self()})
end
@ -69,6 +72,7 @@ defmodule Wayfarer.Target.ActiveConnections do
"""
@spec disconnect(Router.target()) :: :ok
def disconnect(target) do
{:ok, target} = Utils.sanitise_target(target)
:ets.match_delete(__MODULE__, {target, self(), :_})
:ok
end
@ -82,7 +86,11 @@ defmodule Wayfarer.Target.ActiveConnections do
def request_count(targets) do
# :ets.fun2ms(fn {target, _, _} when target in [:targeta, :targetb] -> target end)
targets = List.wrap(targets)
{:ok, targets} =
targets
|> List.wrap()
|> Utils.sanitise_targets()
target_guard = Utils.targets_to_ms_guard(:"$1", targets)
match_spec = [{{:"$1", :_, :_}, target_guard, [:"$1"]}]
@ -98,6 +106,11 @@ defmodule Wayfarer.Target.ActiveConnections do
def last_request_time(targets) do
# :ets.fun2ms(fn {target, _, t} when target in [:targeta, :targetb] -> {target, t} end)
{:ok, targets} =
targets
|> List.wrap()
|> Utils.sanitise_targets()
target_guard = Utils.targets_to_ms_guard(:"$1", targets)
match_spec = [{{:"$1", :_, :"$2"}, target_guard, [{{:"$1", :"$2"}}]}]

View file

@ -6,6 +6,36 @@ defmodule Wayfarer.Utils do
@type address_input :: IP.Address.t() | String.t() | :inet.ip_address()
@type port_number :: 1..0xFFFF
@type scheme :: :http | :https
@type target :: {scheme, address_input, port_number}
@doc """
Sanitise a target tuple by applying `sanitise_scheme/1`,
`sanitise_ip_address/1` and `sanitise_port/1` in sequence.
"""
@spec sanitise_target(target) :: {:ok, target} | {:error, any}
def sanitise_target({scheme, address, port}) do
with {:ok, scheme} <- sanitise_scheme(scheme),
{:ok, address} <- sanitise_ip_address(address),
{:ok, port} <- sanitise_port(port) do
{:ok, {scheme, address, port}}
end
end
def sanitise_target(invalid), do: {:error, "Invalid target #{inspect(invalid)}"}
@doc """
Sanitise an enumerable of targets by applying `sanitise_target/1` to them all.
"""
@spec sanitise_targets(Enumerable.t(target)) :: {:ok, [target]} | {:error, any}
def sanitise_targets(targets) do
targets
|> Enum.reduce_while({:ok, []}, fn target, {:ok, targets} ->
case sanitise_target(target) do
{:ok, target} -> {:cont, {:ok, [target | targets]}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
@doc """
Verify an IP address and convert it into a tuple.

View file

@ -9,7 +9,7 @@ defmodule Support.Example do
end
targets do
http "127.0.0.1", 8082
# http "127.0.0.1", 8082
http "192.168.4.26", 80
end

50
test/support/utils.ex Normal file
View file

@ -0,0 +1,50 @@
defmodule Support.Utils do
@moduledoc false
import Wayfarer.Utils
defmacro __using__(_) do
quote do
import unquote(__MODULE__)
end
end
def wait_for_target_state({module, scheme, address, port}, state) do
with {:ok, scheme} <- sanitise_scheme(scheme),
{:ok, address} <- sanitise_ip_address(address),
{:ok, port} <- sanitise_port(port) do
do_wait_for_target_state({module, scheme, IP.Address.from_tuple!(address), port}, state, 10)
end
end
defp do_wait_for_target_state(key, state, 0) do
case Wayfarer.Target.current_status(key) do
{:ok, ^state} ->
:ok
{:ok, _} ->
{:error, "Target never reached state #{inspect(state)}"}
{:error, reason} ->
raise reason
end
end
defp do_wait_for_target_state(key, state, iterations_left) do
case Wayfarer.Target.current_status(key) do
{:ok, ^state} ->
:ok
{:ok, _} ->
Process.sleep(100)
do_wait_for_target_state(key, state, iterations_left - 1)
{:error, reason} ->
raise reason
end
catch
:exit, {:noproc, _} ->
Process.sleep(100)
do_wait_for_target_state(key, state, iterations_left - 1)
end
end

View file

@ -3,19 +3,23 @@ defmodule Wayfarer.ServerTest do
use ExUnit.Case, async: false
use Support.PortTracker
use Support.HttpRequest
alias Support.HttpRequest
alias Support.HttpServer
alias Support.Utils
alias Wayfarer.{Listener, Server, Target}
import IP.Sigil
setup do
start_supervised!(Wayfarer.Listener.Supervisor)
start_supervised!(Wayfarer.Target.Supervisor)
start_supervised!(Wayfarer.Server.Supervisor)
start_supervised!(Listener.Supervisor)
start_supervised!(Target.Supervisor)
start_supervised!(Server.Supervisor)
:ok
end
describe "init/1" do
test "it requires a module option which implements the `Wayfarer.Server` behaviour" do
assert {:stop, reason} = Wayfarer.Server.init(module: URI)
assert {:stop, reason} = Server.init(module: URI)
assert reason =~ ~r/does not implement/
end
@ -23,7 +27,7 @@ defmodule Wayfarer.ServerTest do
port = random_port()
assert {:ok, _state} =
Wayfarer.Server.init(
Server.init(
module: Support.Example,
listeners: [[address: ~i"127.0.0.1", port: port, scheme: :http]]
)
@ -35,13 +39,13 @@ defmodule Wayfarer.ServerTest do
port = random_port()
assert {:ok, _state} =
Wayfarer.Server.init(
Server.init(
module: Support.Example,
targets: [[address: ~i"127.0.0.1", port: port, scheme: :http]]
)
assert {:ok, :initial} =
Wayfarer.Target.current_status({Support.Example, :http, ~i"127.0.0.1", port})
Target.current_status({Support.Example, :http, ~i"127.0.0.1", port})
end
test "an initial routing table can be passed as options" do
@ -49,7 +53,7 @@ defmodule Wayfarer.ServerTest do
target_port = random_port()
assert {:ok, _state} =
Wayfarer.Server.init(
Server.init(
module: Support.Example,
routing_table: [
{
@ -67,4 +71,61 @@ defmodule Wayfarer.ServerTest do
]
end
end
describe "add_target/2" do
test "it can add a new target to the server" do
{:ok, pid} = Server.start_link(module: Support.Example)
target_port = random_port()
{:ok, _} = HttpServer.start_link(target_port, 200, "OK")
assert :ok =
Server.add_target(pid,
address: ~i"127.0.0.1",
port: target_port,
scheme: :http,
health_checks: [[interval: 10, threshold: 3]]
)
end
end
describe "drain_target/2" do
test "it stops new requests being forwarded to the target" do
listener_port = random_port()
target_port = random_port()
{:ok, _} = HttpServer.start_link(target_port, 200, "OK")
{:ok, pid} =
Server.start_link(
module: Support.Example,
listeners: [
[
scheme: :http,
address: ~i"127.0.0.1",
port: listener_port
]
],
targets: [
[
scheme: :http,
address: ~i"127.0.0.1",
port: target_port,
health_checks: [[interval: 10, threshold: 3]]
]
],
routing_table: [
{{:http, ~i"127.0.0.1", listener_port}, {:http, ~i"127.0.0.1", target_port},
["example.com"], :round_robin}
]
)
Utils.wait_for_target_state({Support.Example, :http, ~i"127.0.0.1", target_port}, :healthy)
assert {:ok, %{status: 200}} = HttpRequest.request(:http, ~i"127.0.0.1", listener_port)
assert :ok = Server.drain_target(pid, {:http, ~i"127.0.0.1", target_port})
end
end
end

View file

@ -8,6 +8,7 @@ defmodule WayfarerTest do
use Support.HttpRequest
use Support.PortTracker
use Support.Utils
import IP.Sigil
@ -92,18 +93,4 @@ defmodule WayfarerTest do
|> Enum.map(&elem(&1, 1))
end
end
defp wait_for_target_state(key, state) do
case Wayfarer.Target.current_status(key) do
{:ok, ^state} ->
:ok
{:ok, _} ->
Process.sleep(100)
wait_for_target_state(key, state)
{:error, reason} ->
raise reason
end
end
end