Compare commits
2 commits
main
...
runtime-ta
Author | SHA1 | Date | |
---|---|---|---|
b2ab14809c | |||
cc82bfb9d6 |
10 changed files with 234 additions and 50 deletions
|
@ -6,8 +6,7 @@ defmodule Wayfarer.Router do
|
|||
This module provides a standardised interface to interact with a routing
|
||||
table.
|
||||
"""
|
||||
import Wayfarer.Utils
|
||||
alias Wayfarer.Target.Selector
|
||||
alias Wayfarer.{Target.Selector, Utils}
|
||||
require Selector
|
||||
|
||||
@table_options [
|
||||
|
@ -50,10 +49,9 @@ defmodule Wayfarer.Router do
|
|||
Uniquely identifies a request target, either a remote http(s) server or a
|
||||
local `Plug`.
|
||||
"""
|
||||
@type target ::
|
||||
{scheme, :inet.ip_address(), :socket.port_number()}
|
||||
| {:plug, module}
|
||||
| {:plug, {module, any}}
|
||||
@type target :: http_target | plug_target
|
||||
@type http_target :: {scheme, :inet.ip_address(), :socket.port_number()}
|
||||
@type plug_target :: {:plug, module} | {:plug, {module, any}}
|
||||
|
||||
@typedoc """
|
||||
The algorithm used to select which target to forward requests to (when there
|
||||
|
@ -142,12 +140,14 @@ defmodule Wayfarer.Router do
|
|||
Change a target's health state.
|
||||
"""
|
||||
@spec update_target_health_status(:ets.tid(), target, health) :: :ok
|
||||
def update_target_health_status(table, {scheme, address, port}, status) do
|
||||
def update_target_health_status(table, target, status) do
|
||||
# Match spec generated using:
|
||||
# :ets.fun2ms(fn {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, _} ->
|
||||
# {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, :healthy}
|
||||
# end)
|
||||
|
||||
{:ok, {scheme, address, port}} = Utils.sanitise_target(target)
|
||||
|
||||
match_spec = [
|
||||
{{:"$1", :"$2", {scheme, address, port}, :"$3", :_}, [],
|
||||
[{{:"$1", :"$2", {{scheme, {address}, port}}, :"$3", status}}]}
|
||||
|
@ -266,16 +266,7 @@ defmodule Wayfarer.Router do
|
|||
)}
|
||||
end
|
||||
|
||||
defp sanitise_listener({scheme, address, port}) do
|
||||
with {:ok, scheme} <- sanitise_scheme(scheme),
|
||||
{:ok, address} <- sanitise_ip_address(address),
|
||||
{:ok, port} <- sanitise_port(port) do
|
||||
{:ok, {scheme, address, port}}
|
||||
end
|
||||
end
|
||||
|
||||
defp sanitise_listener(listener),
|
||||
do: {:error, ArgumentError.exception(message: "Not a valid listener: `#{inspect(listener)}")}
|
||||
defp sanitise_listener(listener), do: Utils.sanitise_target(listener)
|
||||
|
||||
defp sanitise_target({:plug, module}), do: sanitise_target({:plug, module, []})
|
||||
|
||||
|
@ -290,7 +281,8 @@ defmodule Wayfarer.Router do
|
|||
end
|
||||
end
|
||||
|
||||
defp sanitise_target({scheme, address, port}), do: sanitise_listener({scheme, address, port})
|
||||
defp sanitise_target({scheme, address, port}),
|
||||
do: Utils.sanitise_target({scheme, address, port})
|
||||
|
||||
defp sanitise_target(target),
|
||||
do: {:error, ArgumentError.exception(message: "Not a valid target: `#{inspect(target)}")}
|
||||
|
|
|
@ -83,6 +83,34 @@ defmodule Wayfarer.Server do
|
|||
|
||||
@type options :: keyword
|
||||
|
||||
@type target_options ::
|
||||
[
|
||||
unquote(
|
||||
Dsl.Target.schema()
|
||||
|> OptionsHelpers.sanitize_schema()
|
||||
|> NimbleOptions.option_typespec()
|
||||
)
|
||||
]
|
||||
|
||||
@doc """
|
||||
Add a new target to the server.
|
||||
"""
|
||||
@spec add_target(GenServer.server(), target_options) :: :ok | {:error, any}
|
||||
def add_target(server, target) do
|
||||
case OptionsHelpers.validate(target, Dsl.Target.schema()) do
|
||||
{:ok, options} -> GenServer.call(server, {:add_target, options})
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Stop new connections from being sent to the target.
|
||||
"""
|
||||
@spec drain_target(GenServer.server(), Router.http_target()) :: :ok | {:error, any}
|
||||
def drain_target(server, {scheme, address, port}) do
|
||||
GenServer.cast(server, {:target_status_change, scheme, address, port, :draining})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec __using__(any) :: Macro.output()
|
||||
defmacro __using__(opts) do
|
||||
|
@ -152,7 +180,8 @@ defmodule Wayfarer.Server do
|
|||
initial_routing_table <- Keyword.get(options, :routing_table, []),
|
||||
{:ok, routing_table} <- Router.init(module),
|
||||
:ok <- Router.import_routes(routing_table, initial_routing_table),
|
||||
state <- %{module: module, routing_table: routing_table},
|
||||
{:ok, tref} <- :timer.send_interval(1_000, :tick),
|
||||
state <- %{module: module, routing_table: routing_table, draining: %{}, timer: tref},
|
||||
{:ok, state} <- start_listeners(listeners, state),
|
||||
{:ok, state} <- start_targets(targets, state) do
|
||||
{:ok, state}
|
||||
|
@ -166,15 +195,34 @@ defmodule Wayfarer.Server do
|
|||
@impl true
|
||||
@spec handle_cast(any, map) :: {:noreply, map}
|
||||
def handle_cast({:target_status_change, scheme, address, port, status}, state) do
|
||||
:ok =
|
||||
Router.update_target_health_status(
|
||||
state.routing_table,
|
||||
{scheme, IP.Address.to_tuple(address), port},
|
||||
{scheme, address, port},
|
||||
status
|
||||
)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
@spec handle_call(any, GenServer.from(), map) :: {:reply, :ok | {:error, any}, map}
|
||||
def handle_call({:add_target, target}, _from, state) do
|
||||
case start_targets([target], state) do
|
||||
{:ok, state} -> {:reply, :ok, state}
|
||||
{:error, reason} -> {:reply, {:error, reason}, state}
|
||||
end
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
# Iterate all targets for this server and terminate any draining ones with
|
||||
# no active connections.
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp start_listeners(listeners, state) do
|
||||
listeners
|
||||
|> Enum.reduce_while({:ok, state}, fn listener, success ->
|
||||
|
@ -215,6 +263,9 @@ defmodule Wayfarer.Server do
|
|||
end)
|
||||
end
|
||||
|
||||
defp terminate_target(target),
|
||||
do: DynamicSupervisor.terminate_child(Target.DynamicSupervisor, {Target, target})
|
||||
|
||||
defp assert_is_server(module) do
|
||||
if Spark.implements_behaviour?(module, __MODULE__) do
|
||||
{:ok, module}
|
||||
|
|
|
@ -39,6 +39,8 @@ defmodule Wayfarer.Target.ActiveConnections do
|
|||
size = :ets.info(state.table, :size)
|
||||
Logger.debug("Active connections: #{size}")
|
||||
|
||||
dbg(:ets.tab2list(state.table))
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
|
@ -60,6 +62,7 @@ defmodule Wayfarer.Target.ActiveConnections do
|
|||
"""
|
||||
@spec connect(Router.target()) :: :ok
|
||||
def connect(target) do
|
||||
{:ok, target} = Utils.sanitise_target(target)
|
||||
:ets.insert(__MODULE__, {target, self(), System.monotonic_time()})
|
||||
GenServer.cast(__MODULE__, {:monitor, self()})
|
||||
end
|
||||
|
@ -69,6 +72,7 @@ defmodule Wayfarer.Target.ActiveConnections do
|
|||
"""
|
||||
@spec disconnect(Router.target()) :: :ok
|
||||
def disconnect(target) do
|
||||
{:ok, target} = Utils.sanitise_target(target)
|
||||
:ets.match_delete(__MODULE__, {target, self(), :_})
|
||||
:ok
|
||||
end
|
||||
|
@ -82,7 +86,11 @@ defmodule Wayfarer.Target.ActiveConnections do
|
|||
def request_count(targets) do
|
||||
# :ets.fun2ms(fn {target, _, _} when target in [:targeta, :targetb] -> target end)
|
||||
|
||||
targets = List.wrap(targets)
|
||||
{:ok, targets} =
|
||||
targets
|
||||
|> List.wrap()
|
||||
|> Utils.sanitise_targets()
|
||||
|
||||
target_guard = Utils.targets_to_ms_guard(:"$1", targets)
|
||||
match_spec = [{{:"$1", :_, :_}, target_guard, [:"$1"]}]
|
||||
|
||||
|
@ -98,6 +106,11 @@ defmodule Wayfarer.Target.ActiveConnections do
|
|||
def last_request_time(targets) do
|
||||
# :ets.fun2ms(fn {target, _, t} when target in [:targeta, :targetb] -> {target, t} end)
|
||||
|
||||
{:ok, targets} =
|
||||
targets
|
||||
|> List.wrap()
|
||||
|> Utils.sanitise_targets()
|
||||
|
||||
target_guard = Utils.targets_to_ms_guard(:"$1", targets)
|
||||
match_spec = [{{:"$1", :_, :"$2"}, target_guard, [{{:"$1", :"$2"}}]}]
|
||||
|
||||
|
|
|
@ -6,6 +6,36 @@ defmodule Wayfarer.Utils do
|
|||
@type address_input :: IP.Address.t() | String.t() | :inet.ip_address()
|
||||
@type port_number :: 1..0xFFFF
|
||||
@type scheme :: :http | :https
|
||||
@type target :: {scheme, address_input, port_number}
|
||||
|
||||
@doc """
|
||||
Sanitise a target tuple by applying `sanitise_scheme/1`,
|
||||
`sanitise_ip_address/1` and `sanitise_port/1` in sequence.
|
||||
"""
|
||||
@spec sanitise_target(target) :: {:ok, target} | {:error, any}
|
||||
def sanitise_target({scheme, address, port}) do
|
||||
with {:ok, scheme} <- sanitise_scheme(scheme),
|
||||
{:ok, address} <- sanitise_ip_address(address),
|
||||
{:ok, port} <- sanitise_port(port) do
|
||||
{:ok, {scheme, address, port}}
|
||||
end
|
||||
end
|
||||
|
||||
def sanitise_target(invalid), do: {:error, "Invalid target #{inspect(invalid)}"}
|
||||
|
||||
@doc """
|
||||
Sanitise an enumerable of targets by applying `sanitise_target/1` to them all.
|
||||
"""
|
||||
@spec sanitise_targets(Enumerable.t(target)) :: {:ok, [target]} | {:error, any}
|
||||
def sanitise_targets(targets) do
|
||||
targets
|
||||
|> Enum.reduce_while({:ok, []}, fn target, {:ok, targets} ->
|
||||
case sanitise_target(target) do
|
||||
{:ok, target} -> {:cont, {:ok, [target | targets]}}
|
||||
{:error, reason} -> {:halt, {:error, reason}}
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Verify an IP address and convert it into a tuple.
|
||||
|
|
|
@ -9,7 +9,7 @@ defmodule Support.Example do
|
|||
end
|
||||
|
||||
targets do
|
||||
http "127.0.0.1", 8082
|
||||
# http "127.0.0.1", 8082
|
||||
|
||||
http "192.168.4.26", 80
|
||||
end
|
||||
|
|
50
test/support/utils.ex
Normal file
50
test/support/utils.ex
Normal file
|
@ -0,0 +1,50 @@
|
|||
defmodule Support.Utils do
|
||||
@moduledoc false
|
||||
|
||||
import Wayfarer.Utils
|
||||
|
||||
defmacro __using__(_) do
|
||||
quote do
|
||||
import unquote(__MODULE__)
|
||||
end
|
||||
end
|
||||
|
||||
def wait_for_target_state({module, scheme, address, port}, state) do
|
||||
with {:ok, scheme} <- sanitise_scheme(scheme),
|
||||
{:ok, address} <- sanitise_ip_address(address),
|
||||
{:ok, port} <- sanitise_port(port) do
|
||||
do_wait_for_target_state({module, scheme, IP.Address.from_tuple!(address), port}, state, 10)
|
||||
end
|
||||
end
|
||||
|
||||
defp do_wait_for_target_state(key, state, 0) do
|
||||
case Wayfarer.Target.current_status(key) do
|
||||
{:ok, ^state} ->
|
||||
:ok
|
||||
|
||||
{:ok, _} ->
|
||||
{:error, "Target never reached state #{inspect(state)}"}
|
||||
|
||||
{:error, reason} ->
|
||||
raise reason
|
||||
end
|
||||
end
|
||||
|
||||
defp do_wait_for_target_state(key, state, iterations_left) do
|
||||
case Wayfarer.Target.current_status(key) do
|
||||
{:ok, ^state} ->
|
||||
:ok
|
||||
|
||||
{:ok, _} ->
|
||||
Process.sleep(100)
|
||||
do_wait_for_target_state(key, state, iterations_left - 1)
|
||||
|
||||
{:error, reason} ->
|
||||
raise reason
|
||||
end
|
||||
catch
|
||||
:exit, {:noproc, _} ->
|
||||
Process.sleep(100)
|
||||
do_wait_for_target_state(key, state, iterations_left - 1)
|
||||
end
|
||||
end
|
|
@ -1,6 +1,6 @@
|
|||
defmodule Wayfarer.Server.PlugTest do
|
||||
@moduledoc false
|
||||
use ExUnit.Case, async: true
|
||||
use ExUnit.Case, async: false
|
||||
use Mimic
|
||||
use Plug.Test
|
||||
use Support.PortTracker
|
||||
|
|
|
@ -3,19 +3,23 @@ defmodule Wayfarer.ServerTest do
|
|||
use ExUnit.Case, async: false
|
||||
use Support.PortTracker
|
||||
use Support.HttpRequest
|
||||
alias Support.HttpRequest
|
||||
alias Support.HttpServer
|
||||
alias Support.Utils
|
||||
alias Wayfarer.{Listener, Server, Target}
|
||||
import IP.Sigil
|
||||
|
||||
setup do
|
||||
start_supervised!(Wayfarer.Listener.Supervisor)
|
||||
start_supervised!(Wayfarer.Target.Supervisor)
|
||||
start_supervised!(Wayfarer.Server.Supervisor)
|
||||
start_supervised!(Listener.Supervisor)
|
||||
start_supervised!(Target.Supervisor)
|
||||
start_supervised!(Server.Supervisor)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
describe "init/1" do
|
||||
test "it requires a module option which implements the `Wayfarer.Server` behaviour" do
|
||||
assert {:stop, reason} = Wayfarer.Server.init(module: URI)
|
||||
assert {:stop, reason} = Server.init(module: URI)
|
||||
assert reason =~ ~r/does not implement/
|
||||
end
|
||||
|
||||
|
@ -23,7 +27,7 @@ defmodule Wayfarer.ServerTest do
|
|||
port = random_port()
|
||||
|
||||
assert {:ok, _state} =
|
||||
Wayfarer.Server.init(
|
||||
Server.init(
|
||||
module: Support.Example,
|
||||
listeners: [[address: ~i"127.0.0.1", port: port, scheme: :http]]
|
||||
)
|
||||
|
@ -35,13 +39,13 @@ defmodule Wayfarer.ServerTest do
|
|||
port = random_port()
|
||||
|
||||
assert {:ok, _state} =
|
||||
Wayfarer.Server.init(
|
||||
Server.init(
|
||||
module: Support.Example,
|
||||
targets: [[address: ~i"127.0.0.1", port: port, scheme: :http]]
|
||||
)
|
||||
|
||||
assert {:ok, :initial} =
|
||||
Wayfarer.Target.current_status({Support.Example, :http, ~i"127.0.0.1", port})
|
||||
Target.current_status({Support.Example, :http, ~i"127.0.0.1", port})
|
||||
end
|
||||
|
||||
test "an initial routing table can be passed as options" do
|
||||
|
@ -49,7 +53,7 @@ defmodule Wayfarer.ServerTest do
|
|||
target_port = random_port()
|
||||
|
||||
assert {:ok, _state} =
|
||||
Wayfarer.Server.init(
|
||||
Server.init(
|
||||
module: Support.Example,
|
||||
routing_table: [
|
||||
{
|
||||
|
@ -67,4 +71,61 @@ defmodule Wayfarer.ServerTest do
|
|||
]
|
||||
end
|
||||
end
|
||||
|
||||
describe "add_target/2" do
|
||||
test "it can add a new target to the server" do
|
||||
{:ok, pid} = Server.start_link(module: Support.Example)
|
||||
|
||||
target_port = random_port()
|
||||
|
||||
{:ok, _} = HttpServer.start_link(target_port, 200, "OK")
|
||||
|
||||
assert :ok =
|
||||
Server.add_target(pid,
|
||||
address: ~i"127.0.0.1",
|
||||
port: target_port,
|
||||
scheme: :http,
|
||||
health_checks: [[interval: 10, threshold: 3]]
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
describe "drain_target/2" do
|
||||
test "it stops new requests being forwarded to the target" do
|
||||
listener_port = random_port()
|
||||
target_port = random_port()
|
||||
|
||||
{:ok, _} = HttpServer.start_link(target_port, 200, "OK")
|
||||
|
||||
{:ok, pid} =
|
||||
Server.start_link(
|
||||
module: Support.Example,
|
||||
listeners: [
|
||||
[
|
||||
scheme: :http,
|
||||
address: ~i"127.0.0.1",
|
||||
port: listener_port
|
||||
]
|
||||
],
|
||||
targets: [
|
||||
[
|
||||
scheme: :http,
|
||||
address: ~i"127.0.0.1",
|
||||
port: target_port,
|
||||
health_checks: [[interval: 10, threshold: 3]]
|
||||
]
|
||||
],
|
||||
routing_table: [
|
||||
{{:http, ~i"127.0.0.1", listener_port}, {:http, ~i"127.0.0.1", target_port},
|
||||
["example.com"], :round_robin}
|
||||
]
|
||||
)
|
||||
|
||||
Utils.wait_for_target_state({Support.Example, :http, ~i"127.0.0.1", target_port}, :healthy)
|
||||
|
||||
assert {:ok, %{status: 200}} = HttpRequest.request(:http, ~i"127.0.0.1", listener_port)
|
||||
|
||||
assert :ok = Server.drain_target(pid, {:http, ~i"127.0.0.1", target_port})
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
defmodule Wayfarer.Target.SelectorTest do
|
||||
@moduledoc false
|
||||
use ExUnit.Case, async: true
|
||||
use ExUnit.Case, async: false
|
||||
use Plug.Test
|
||||
use Mimic
|
||||
use Support.PortTracker
|
||||
|
|
|
@ -8,6 +8,7 @@ defmodule WayfarerTest do
|
|||
|
||||
use Support.HttpRequest
|
||||
use Support.PortTracker
|
||||
use Support.Utils
|
||||
|
||||
import IP.Sigil
|
||||
|
||||
|
@ -92,18 +93,4 @@ defmodule WayfarerTest do
|
|||
|> Enum.map(&elem(&1, 1))
|
||||
end
|
||||
end
|
||||
|
||||
defp wait_for_target_state(key, state) do
|
||||
case Wayfarer.Target.current_status(key) do
|
||||
{:ok, ^state} ->
|
||||
:ok
|
||||
|
||||
{:ok, _} ->
|
||||
Process.sleep(100)
|
||||
wait_for_target_state(key, state)
|
||||
|
||||
{:error, reason} ->
|
||||
raise reason
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue