feat: Add support for proxying websockets.
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
James Harton 2024-08-17 18:50:27 +12:00
parent de160bd70c
commit 4ad256acdd
Signed by: james
GPG key ID: 90E82DAA13F624F4
22 changed files with 867 additions and 141 deletions

View file

@ -1,6 +1,5 @@
[
{"lib/wayfarer/server/plug.ex", :call},
{"lib/wayfarer/server/proxy.ex", :invalid_contract},
{"test/support/http_request.ex", :call},
{"test/support/http_request.ex", :unused_fun}
{"lib/wayfarer/server/proxy.ex", :unknown_function},
{"lib/wayfarer/target/check.ex", :unknown_function},
{"test/support/http_request.ex", :unknown_function}
]

View file

@ -38,7 +38,12 @@ spark_locals_without_parens = [
targets: 1,
thousand_island_options: 1,
threshold: 1,
websocket_options: 1
transport: 1,
websocket_options: 1,
ws: 2,
ws: 3,
wss: 2,
wss: 3
]
[

File diff suppressed because one or more lines are too long

View file

@ -85,7 +85,7 @@ defmodule Wayfarer.Dsl.HealthCheck do
doc: "Path"
],
success_codes: [
type: {:wrap_list, {:struct, Range}},
type: {:wrap_list, {:or, [{:struct, Range}, {:in, 100..500}]}},
required: false,
default: @defaults[:success_codes],
doc: "HTTP status codes which are considered successful."
@ -125,22 +125,34 @@ defmodule Wayfarer.Dsl.HealthCheck do
@doc false
@spec transform(t) :: {:ok, t} | {:error, any}
def transform(check) do
with :ok <- verify_success_codes(check.success_codes) do
maybe_set_name(check)
with {:ok, success_codes} <- transform_success_codes(check.success_codes) do
maybe_set_name(%{check | success_codes: success_codes})
end
end
defguardp is_valid_status_code?(code) when is_integer(code) and code >= 100 and code <= 599
defguardp is_valid_range?(range)
when is_struct(range, Range) and is_integer(range.first) and range.first >= 100 and
is_integer(range.last) and range.last <= 500
when is_struct(range, Range) and is_valid_status_code?(range.first) and
is_valid_status_code?(range.last)
defp verify_success_codes(range) when is_valid_range?(range), do: :ok
defp verify_success_codes([]), do: :ok
defp transform_success_codes(range) when is_valid_range?(range), do: {:ok, [range]}
defp transform_success_codes([]), do: {:ok, []}
defp verify_success_codes([head | tail]) when is_valid_range?(head),
do: verify_success_codes(tail)
defp transform_success_codes([head | tail]) when is_valid_range?(head) do
with {:ok, tail} <- transform_success_codes(tail) do
{:ok, [head | tail]}
end
end
defp verify_success_codes([range | _]),
defp transform_success_codes([head | tail])
when is_integer(head) and is_valid_status_code?(head) do
with {:ok, tail} <- transform_success_codes(tail) do
{:ok, [head..head | tail]}
end
end
defp transform_success_codes([range | _]),
do: {:error, "Value `#{inspect(range)}` is not valid. Must be a range between 100..599"}
defp maybe_set_name(check) when is_binary(check.name), do: {:ok, check}

View file

@ -13,6 +13,7 @@ defmodule Wayfarer.Dsl.Target do
name: nil,
port: nil,
scheme: :http,
transport: :auto,
uri: nil
@type t :: %__MODULE__{
@ -21,7 +22,8 @@ defmodule Wayfarer.Dsl.Target do
module: nil | module,
name: nil | String.t(),
port: :inet.port_number(),
scheme: :http | :https | :plug,
scheme: :http | :https | :plug | :ws | :wss,
transport: :http1 | :http2 | :auto,
uri: URI.t()
}
@ -40,6 +42,12 @@ defmodule Wayfarer.Dsl.Target do
type: :pos_integer,
required: true,
doc: "The TCP port on which to listen for incoming connections."
],
transport: [
type: {:in, [:http1, :http2, :auto]},
required: false,
default: :auto,
doc: "Which HTTP protocol to use."
]
]
@ -81,6 +89,28 @@ defmodule Wayfarer.Dsl.Target do
],
auto_set_fields: [scheme: :plug],
args: [:module]
},
%Entity{
name: :ws,
target: __MODULE__,
schema: @shared_schema,
auto_set_fields: [scheme: :ws],
args: [:address, :port],
imports: [IP.Sigil],
transform: {__MODULE__, :transform, []},
entities: [health_checks: HealthChecks.entities()],
singleton_entity_keys: [:health_checks]
},
%Entity{
name: :wss,
target: __MODULE__,
schema: @shared_schema,
auto_set_fields: [scheme: :wss],
args: [:address, :port],
imports: [IP.Sigil],
transform: {__MODULE__, :transform, []},
entities: [health_checks: HealthChecks.entities()],
singleton_entity_keys: [:health_checks]
}
]
end
@ -102,9 +132,9 @@ defmodule Wayfarer.Dsl.Target do
|> Options.Helpers.make_optional!(:port)
|> Keyword.merge(
scheme: [
type: {:in, [:http, :https, :plug]},
type: {:in, [:http, :https, :plug, :ws, :wss]},
required: true,
doc: "The protocol used to talk to the target."
doc: "The connection type for the target."
],
plug: [
type: {:behaviour, Plug},

View file

@ -98,7 +98,7 @@ defmodule Wayfarer.Dsl.Transformer do
{target.scheme, target.module}
target ->
{target.scheme, IP.Address.to_tuple(target.address), target.port}
{target.scheme, IP.Address.to_tuple(target.address), target.port, target.transport}
end)
end
@ -113,7 +113,7 @@ defmodule Wayfarer.Dsl.Transformer do
|> Map.drop([:address, :health_checks, :port, :uri])
|> Enum.to_list()
target when target.scheme in [:http, :https] ->
target when target.scheme in [:http, :https, :ws, :wss] ->
health_checks =
target.health_checks.health_checks
|> Enum.map(&HealthCheck.to_options/1)

View file

@ -51,7 +51,17 @@ defmodule Wayfarer.Router do
local `Plug`.
"""
@type target ::
{scheme, :inet.ip_address(), :socket.port_number()}
{:http | :https | :ws | :wss, :inet.ip_address(), :socket.port_number(),
:http1 | :http2 | :auto}
| {:plug, module}
| {:plug, {module, any}}
@typedoc """
Like `t:target/0` except that it can contain user input for the address portion.
"""
@type target_input ::
{:http | :https | :ws | :wss, Wayfarer.Utils.address_input(), :socket.port_number(),
:http1 | :http2 | :auto}
| {:plug, module}
| {:plug, {module, any}}
@ -85,7 +95,8 @@ defmodule Wayfarer.Router do
This should only ever be called by `Wayfarer.Server` directly.
"""
@spec add_route(:ets.tid(), listener, target, [host_name], algorithm) :: :ok | {:error, any}
@spec add_route(:ets.tid(), listener, target_input, [host_name], algorithm) ::
:ok | {:error, any}
def add_route(table, listener, target, host_names, algorithm) do
with {:ok, entries} <- route_to_entries(table, listener, target, host_names, algorithm) do
:ets.insert(table, entries)
@ -96,9 +107,9 @@ defmodule Wayfarer.Router do
end
@doc """
Add a number of router into the routing table.
Add a number of routes into the routing table.
"""
@spec import_routes(:ets.tid(), [{listener, target, [host_name], algorithm}]) :: :ok
@spec import_routes(:ets.tid(), [{listener, target_input, [host_name], algorithm}]) :: :ok
def import_routes(table, routes) do
with {:ok, entries} <- routes_to_entries(table, routes) do
:ets.insert(table, entries)
@ -142,15 +153,15 @@ defmodule Wayfarer.Router do
Change a target's health state.
"""
@spec update_target_health_status(:ets.tid(), target, health) :: :ok
def update_target_health_status(table, {scheme, address, port}, status) do
def update_target_health_status(table, {scheme, address, port, transport}, status) do
# Match spec generated using:
# :ets.fun2ms(fn {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, _} ->
# {listener, host_pattern, {:http, {192, 168, 4, 26}, 80}, algorithm, :healthy}
# :ets.fun2ms(fn {listener, host_pattern, {:http, {192, 168, 4, 26}, 80, transport}, algorithm, _} ->
# {listener, host_pattern, {:http, {192, 168, 4, 26}, 80, transport}, algorithm, :healthy}
# end)
match_spec = [
{{:"$1", :"$2", {scheme, address, port}, :"$3", :_}, [],
[{{:"$1", :"$2", {{scheme, {address}, port}}, :"$3", status}}]}
{{:"$1", :"$2", {scheme, address, port, transport}, :"$3", :_}, [],
[{{:"$1", :"$2", {{scheme, {address}, port, transport}}, :"$3", status}}]}
]
:ets.select_replace(table, match_spec)
@ -223,12 +234,12 @@ defmodule Wayfarer.Router do
message: "Value `#{inspect(algorithm)}` is not a valid load balancing algorithm."
)}
defp current_health_state(table, {scheme, address, port}) do
defp current_health_state(table, {scheme, address, port, transport}) do
# Generated using
# :ets.fun2ms(fn {_, _, :target, :_, health} -> health end)
match_spec = [
{{:_, :_, {scheme, address, port}, :_, :"$1"}, [], [:"$1"]}
{{:_, :_, {scheme, address, port, transport}, :_, :"$1"}, [], [:"$1"]}
]
case :ets.select(table, match_spec, 1) do
@ -277,6 +288,14 @@ defmodule Wayfarer.Router do
defp sanitise_listener(listener),
do: {:error, ArgumentError.exception(message: "Not a valid listener: `#{inspect(listener)}")}
defp sanitise_transport(transport) when transport in [:http1, :http2, :auto],
do: {:ok, transport}
defp sanitise_transport(transport),
do:
{:error,
ArgumentError.exception(message: "Not a valid target transport: `#{inspect(transport)}`")}
defp sanitise_target({:plug, module}), do: sanitise_target({:plug, module, []})
defp sanitise_target({:plug, module, _}) do
@ -290,7 +309,14 @@ defmodule Wayfarer.Router do
end
end
defp sanitise_target({scheme, address, port}), do: sanitise_listener({scheme, address, port})
defp sanitise_target({scheme, address, port, transport}) do
with {:ok, scheme} <- sanitise_scheme(scheme),
{:ok, address} <- sanitise_ip_address(address),
{:ok, port} <- sanitise_port(port),
{:ok, transport} <- sanitise_transport(transport) do
{:ok, {scheme, address, port, transport}}
end
end
defp sanitise_target(target),
do: {:error, ArgumentError.exception(message: "Not a valid target: `#{inspect(target)}")}

View file

@ -7,8 +7,9 @@ defmodule Wayfarer.Server do
@callback child_spec(keyword()) :: Supervisor.child_spec()
@callback start_link(keyword()) :: GenServer.on_start()
@scheme_type {:in, [:http, :https]}
@scheme_type {:in, [:http, :https, :ws, :wss]}
@port_type {:in, 0..0xFFFF}
@transport_type {:in, [:http1, :http2, :auto]}
@ip_type {:or,
[
{:tuple, [{:in, 0..0xFF}, {:in, 0..0xFF}, {:in, 0..0xFF}, {:in, 0..0xFF}]},
@ -50,7 +51,7 @@ defmodule Wayfarer.Server do
{:tuple,
[
{:tuple, [@scheme_type, @ip_type, @port_type]},
{:tuple, [@scheme_type, @ip_type, @port_type]},
{:tuple, [@scheme_type, @ip_type, @port_type, @transport_type]},
{:list, :string},
{:in, [:round_robin, :sticky, :random, :least_connections]}
]}},
@ -117,13 +118,14 @@ defmodule Wayfarer.Server do
@doc false
@spec target_status_change(
{module, :http | :https, IP.Address.t(), :socket.port_number()},
{module, :http | :https, IP.Address.t(), :socket.port_number(),
:http1 | :http2 | :auto},
Router.health()
) :: :ok
def target_status_change({module, scheme, address, port}, status) do
def target_status_change({module, scheme, address, port, transport}, status) do
GenServer.cast(
{:via, Registry, {Wayfarer.Server.Registry, module}},
{:target_status_change, scheme, address, port, status}
{:target_status_change, scheme, address, port, transport, status}
)
end
@ -165,10 +167,10 @@ defmodule Wayfarer.Server do
@doc false
@impl true
@spec handle_cast(any, map) :: {:noreply, map}
def handle_cast({:target_status_change, scheme, address, port, status}, state) do
def handle_cast({:target_status_change, scheme, address, port, transport, status}, state) do
Router.update_target_health_status(
state.routing_table,
{scheme, IP.Address.to_tuple(address), port},
{scheme, IP.Address.to_tuple(address), port, transport},
status
)

View file

@ -4,7 +4,7 @@ defmodule Wayfarer.Server.Proxy do
specific target.
"""
alias Mint.HTTP
alias Mint.{HTTP, HTTP1, HTTP2}
alias Plug.Conn
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections}
require Logger
@ -16,27 +16,64 @@ defmodule Wayfarer.Server.Proxy do
Convert the request conn into an HTTP request to the specified target.
"""
@spec request(Conn.t(), Router.target()) :: Conn.t()
def request(conn, {scheme, address, port} = target) do
with {:ok, mint} <-
HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout),
def request(conn, target) do
with {:ok, mint} <- connect(conn, target),
:ok <- ActiveConnections.connect(target),
:ok <- TotalConnections.proxy_connect(target),
{:ok, body, conn} <- read_request_body(conn),
{:ok, mint, req} <- send_request(conn, mint, body),
{:ok, conn, _mint} <- proxy_responses(conn, mint, req) do
conn
:ok <- TotalConnections.proxy_connect(target) do
handle_request(mint, conn, target)
else
error -> handle_error(error, conn, target)
end
end
defp handle_request(mint, conn, {proto, _, _, _}) do
with ["Upgrade"] <- Conn.get_req_header(conn, "connection"),
["websocket"] <- Conn.get_req_header(conn, "upgrade") do
handle_websocket_request(mint, conn, proto)
else
_ -> handle_http_request(mint, conn)
end
end
defp handle_http_request(mint, conn) do
with {:ok, mint, req} <- send_request(conn, mint),
{:ok, mint, conn} <- stream_request_body(conn, mint, req),
{:ok, conn, _mint} <- proxy_responses(conn, mint, req) do
conn
end
end
defp handle_websocket_request(mint, conn, proto) do
WebSockAdapter.upgrade(conn, Wayfarer.Server.WebSocketProxy, {mint, conn, proto},
compress: true
)
end
defp connect(conn, {:ws, address, port, transport}),
do: connect(conn, {:http, address, port, transport})
defp connect(conn, {:wss, address, port, transport}),
do: connect(conn, {:https, address, port, transport})
defp connect(conn, {scheme, address, port, :http1}) when is_tuple(address),
do: HTTP1.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout)
defp connect(conn, {scheme, address, port, :http2}) when is_tuple(address),
do: HTTP2.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout)
defp connect(conn, {scheme, address, port, :auto}) when is_tuple(address),
do: HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout)
defp handle_error({:error, _, reason}, conn, target),
do: handle_error({:error, reason}, conn, target)
defp handle_error({:error, reason}, conn, {scheme, address, port}) do
Logger.error(
"Proxy error [phase=#{connection_phase(conn)},ip=#{:inet.ntoa(address)},port=#{port},proto=#{scheme}]: #{message(reason)}"
)
defp handle_error({:error, reason}, conn, {scheme, address, port, transport}) do
Logger.error(fn ->
phase = connection_phase(conn)
ip = :inet.ntoa(address)
"Proxy error [phase=#{phase},ip=#{ip},port=#{port},proto=#{scheme},trans=#{transport}]: #{message(reason)}"
end)
if conn.halted || conn.state in [:sent, :chunked, :upgraded] do
# Sadly there's not much more we can do here.
@ -72,12 +109,17 @@ defmodule Wayfarer.Server.Proxy do
receive do
message ->
case HTTP.stream(mint, message) do
:unknown -> proxy_responses(conn, mint, req)
{:ok, mint, responses} -> handle_responses(responses, conn, mint, req)
{:error, _, reason, _} -> {:error, reason}
:unknown ->
proxy_responses(conn, mint, req)
{:ok, mint, responses} ->
handle_responses(responses, conn, mint, req)
{:error, _, reason, _} ->
{:error, reason}
end
after
@idle_timeout -> {:error, conn, :idle_timeout}
@idle_timeout -> {:error, :idle_timeout}
end
end
@ -138,26 +180,41 @@ defmodule Wayfarer.Server.Proxy do
defp handle_responses([{:error, req, reason} | _], conn, _mint, req), do: {:error, conn, reason}
# This is bad - we need to figure out how to stream the body, but it's fine
# for now.
defp read_request_body(conn, body \\ <<>>) do
case Conn.read_body(conn) do
{:ok, chunk, conn} -> {:ok, body <> chunk, conn}
{:more, chunk, conn} -> read_request_body(conn, body <> chunk)
{:error, reason} -> {:error, conn, reason}
end
end
defp send_request(conn, mint) do
request_path =
case {conn.request_path, conn.query_string} do
{path, nil} -> path
{path, ""} -> path
{path, query} -> path <> "?" <> query
end
defp send_request(conn, mint, body) do
HTTP.request(
mint,
conn.method,
conn.request_path,
request_path,
proxy_headers(conn),
body
:stream
)
end
defp stream_request_body(conn, mint, req) do
case Conn.read_body(conn) do
{:ok, chunk, conn} ->
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk),
{:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
{:ok, mint, conn}
end
{:more, chunk, conn} ->
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do
stream_request_body(conn, mint, req)
end
{:error, reason} ->
{:error, conn, reason}
end
end
defp proxy_headers(conn) do
listener =
conn.private.wayfarer.listener

View file

@ -0,0 +1,203 @@
defmodule Wayfarer.Server.WebSocketProxy do
@moduledoc """
When a connection is upgraded to a websocket, we switch from handing via
`Plug` to `WebSock` via `WebSockAdapter`.
The outgoing connection is made using `Mint.WebSocket`.
"""
@behaviour WebSock
alias Mint.WebSocket
alias Plug.Conn
require Logger
@default_opts [extensions: [WebSocket.PerMessageDeflate]]
@doc false
@impl true
def init({mint, conn, proto}) when proto in [:ws, :wss] do
request_path =
case {conn.request_path, conn.query_string} do
{path, nil} -> path
{path, ""} -> path
{path, query} -> path <> "?" <> query
end
case WebSocket.upgrade(proto, mint, request_path, proxy_headers(conn), @default_opts) do
{:ok, mint, ref} -> {:ok, %{mint: mint, ref: ref, status: :init, buffer: []}}
{:error, _mint, reason} -> {:error, reason}
end
end
def init({mint, conn, :https}), do: init({mint, conn, :wss})
def init({mint, conn, :http}), do: init({mint, conn, :ws})
@doc false
@impl true
def handle_control({payload, [{:opcode, :ping}]}, state) do
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {:ping, payload}),
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
{:ok, %{state | websocket: websocket, mint: mint}}
else
error -> handle_error(error, state)
end
end
def handle_control(_, state), do: {:ok, state}
@doc false
@impl true
def handle_in({payload, [{:opcode, frame_type}]}, state) when state.status == :init do
{:ok, %{state | buffer: [{frame_type, payload} | state.buffer]}}
end
def handle_in({payload, [{:opcode, frame_type}]}, state) do
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {frame_type, payload}),
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
{:ok, %{state | websocket: websocket, mint: mint}}
else
error -> handle_error(error, state)
end
end
@doc false
@impl true
def handle_info(msg, state) when state.status == :init do
with {:ok, mint, result} <- WebSocket.stream(state.mint, msg),
{:ok, result} <- handle_upgrade_response(result, state.ref),
{:ok, mint, websocket} <- WebSocket.new(mint, state.ref, result.status, result.headers),
state <- Map.merge(state, %{status: :connected, websocket: websocket, mint: mint}),
{:ok, state} <- empty_buffer(state),
{:ok, messages, state} <- decode_frames(result.data, state) do
response_for_messages(messages, state)
else
error -> handle_error(error, state)
end
end
def handle_info(msg, state) when state.status == :connected do
with {:ok, mint, result} <- WebSocket.stream(state.mint, msg),
{:ok, frames} <- handle_websocket_data(result, state.ref),
{:ok, messages, state} <- decode_frames(frames, %{state | mint: mint}) do
response_for_messages(messages, state)
else
error -> handle_error(error, state)
end
end
@doc false
@impl true
def terminate(_reason, _state), do: :ok
defp handle_error({:error, _, %{reason: reason}, _}, state),
do: handle_error({:error, reason}, state)
defp handle_error({:error, reason, state}, _state), do: handle_error({:error, reason}, state)
defp handle_error({:error, reason}, state) do
Logger.debug(fn ->
"Dropping WebSocket connection for reason: #{inspect(reason)}"
end)
{:stop, :normal, state}
end
defp proxy_headers(conn) do
listener =
conn.private.wayfarer.listener
|> case do
%{address: address, port: port} when tuple_size(address) == 8 ->
"[#{:inet.ntoa(address)}]:#{port}"
%{address: address, port: port} ->
"#{:inet.ntoa(address)}:#{port}"
end
client =
conn
|> Conn.get_peer_data()
|> case do
%{address: address, port: port} when tuple_size(address) == 8 ->
"[#{:inet.ntoa(address)}]:#{port}"
%{address: address, port: port} ->
"#{:inet.ntoa(address)}:#{port}"
end
req_headers =
conn.req_headers
|> Enum.reject(
&(elem(&1, 0) in [
"connection",
"upgrade",
"sec-websocket-extensions",
"sec-websocket-key",
"sec-websocket-version"
])
)
[
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"}
| req_headers
]
end
defp empty_buffer(state) when state.buffer == [], do: {:ok, state}
defp empty_buffer(state), do: do_empty_buffer(Enum.reverse(state.buffer), %{state | buffer: []})
defp do_empty_buffer([], state), do: {:ok, state}
defp do_empty_buffer([head | tail], state) do
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, head),
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
do_empty_buffer(tail, %{state | websocket: websocket, mint: mint})
end
end
defp handle_upgrade_response(result, ref), do: handle_upgrade_response(result, ref, %{data: []})
defp handle_upgrade_response([{:done, ref}], ref, result), do: {:ok, result}
defp handle_upgrade_response([{:status, ref, status} | tail], ref, result) do
handle_upgrade_response(tail, ref, Map.put(result, :status, status))
end
defp handle_upgrade_response([{:headers, ref, headers} | tail], ref, result) do
handle_upgrade_response(tail, ref, Map.put(result, :headers, headers))
end
defp handle_upgrade_response([{:data, ref, data} | tail], ref, result) do
result = Map.update!(result, :data, &[data | &1])
handle_upgrade_response(tail, ref, result)
end
defp handle_websocket_data(data, ref),
do: handle_websocket_data(data, ref, [])
defp handle_websocket_data([], _ref, messages), do: {:ok, Enum.reverse(messages)}
defp handle_websocket_data([{:data, ref, data} | tail], ref, messages),
do: handle_websocket_data(tail, ref, [data | messages])
defp decode_frames(frames, state) do
frames
|> Enum.reduce_while({:ok, [], state}, fn frame, {:ok, messages, state} ->
case WebSocket.decode(state.websocket, frame) do
{:ok, websocket, frames} when is_list(frames) ->
messages = Enum.concat(messages, frames)
{:cont, {:ok, messages, %{state | websocket: websocket}}}
{:error, websocket, reason} ->
{:halt, {:error, reason, %{state | websocket: websocket}}}
end
end)
end
defp response_for_messages([], state), do: {:ok, state}
defp response_for_messages(messages, state) do
case Enum.split_with(messages, &(elem(&1, 0) == :close)) do
{[], messages} -> {:push, messages, state}
{[{:close, code, _} | _], messages} -> {:stop, :normal, code, messages, state}
end
end
end

View file

@ -9,8 +9,8 @@ defmodule Wayfarer.Target do
@options_schema [
scheme: [
type: {:in, [:http, :https]},
doc: "The connection protocol.",
type: {:in, [:http, :https, :ws, :wss]},
doc: "The connection scheme.",
required: true
],
port: [
@ -33,6 +33,12 @@ defmodule Wayfarer.Target do
doc: "An optional name for the target.",
required: false
],
transport: [
type: {:in, [:http1, :http2, :auto]},
required: false,
default: :auto,
doc: "The connection protocol."
],
health_checks: [
type: {:list, {:keyword_list, HealthCheck.schema()}},
required: false,
@ -92,7 +98,7 @@ defmodule Wayfarer.Target do
def init(options) do
with {:ok, options} <- Options.validate(options, @options_schema),
{:ok, uri} <- to_uri(options[:scheme], options[:address], options[:port]) do
target = options |> Keyword.take(~w[scheme address port]a) |> Map.new()
target = options |> Keyword.take(~w[scheme address port transport]a) |> Map.new()
module = options[:module]
key = {module, target.scheme, target.address, target.port}
@ -117,6 +123,7 @@ defmodule Wayfarer.Target do
method: check[:method] |> to_string() |> String.upcase(),
headers: @default_headers,
hostname: check[:hostname] || uri.host,
transport: target.transport,
passes: 0
})
@ -170,7 +177,8 @@ defmodule Wayfarer.Target do
end)
Server.target_status_change(
{state.module, state.target.scheme, state.target.address, state.target.port},
{state.module, state.target.scheme, state.target.address, state.target.port,
state.target.transport},
:unhealthy
)
@ -205,7 +213,8 @@ defmodule Wayfarer.Target do
if target_became_healthy? do
Server.target_status_change(
{state.module, state.target.scheme, state.target.address, state.target.port},
{state.module, state.target.scheme, state.target.address, state.target.port,
state.target.transport},
:healthy
)

View file

@ -36,8 +36,8 @@ defmodule Wayfarer.Target.ActiveConnections do
@impl true
@spec handle_info(:tick | {:DOWN, any, :process, any, pid, any}, state) :: {:noreply, state}
def handle_info(:tick, state) do
size = :ets.info(state.table, :size)
Logger.debug("Active connections: #{size}")
# size = :ets.info(state.table, :size)
# Logger.debug("Active connections: #{size}")
{:noreply, state}
end

View file

@ -4,7 +4,7 @@ defmodule Wayfarer.Target.Check do
"""
use GenServer, restart: :transient
alias Mint.HTTP
alias Mint.{HTTP, HTTP1, HTTP2, WebSocket}
alias Wayfarer.{Target, Target.TotalConnections}
require Logger
@ -36,8 +36,11 @@ defmodule Wayfarer.Target.Check do
def handle_info(:timeout, state), do: check_failed(state, "request timeout expired.")
def handle_info(message, state) do
with {:ok, conn, responses} <- Mint.HTTP.stream(state.conn, message),
:ok <- TotalConnections.health_check_connect({state.scheme, state.address, state.port}),
with {:ok, conn, responses} <- WebSocket.stream(state.conn, message),
:ok <-
TotalConnections.health_check_connect(
{state.scheme, state.address, state.port, state.transport}
),
{:ok, status} <- get_status_response(conn, responses) do
if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do
Target.check_passed(state.ref)
@ -57,15 +60,40 @@ defmodule Wayfarer.Target.Check do
end
end
defp connect(state),
do:
HTTP.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
)
defp connect(state) when state.scheme == :ws,
do: connect(%{state | scheme: :http})
defp request(state),
do: HTTP.request(state.conn, state.method, state.path, state.headers, nil)
defp connect(state) when state.scheme == :wss,
do: connect(%{state | scheme: :https})
defp connect(state) when state.transport == :http1 do
HTTP1.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
)
end
defp connect(state) when state.transport == :http2 do
HTTP2.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
)
end
defp connect(state) do
HTTP.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
)
end
defp request(state) when state.scheme in [:ws, :wss] do
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, [])
end
defp request(state) do
HTTP.request(state.conn, state.method, state.path, state.headers, nil)
end
defp check_failed(state, reason) when is_binary(reason) do
Target.check_failed(state.ref)

View file

@ -5,7 +5,8 @@ defmodule Wayfarer.Utils do
@type address_input :: IP.Address.t() | String.t() | :inet.ip_address()
@type port_number :: 1..0xFFFF
@type scheme :: :http | :https
@type scheme :: :http | :https | :ws | :wss
@type transport :: :http1 | :http2 | :auto
@doc """
Verify an IP address and convert it into a tuple.
@ -46,7 +47,7 @@ defmodule Wayfarer.Utils do
Verify a scheme.
"""
@spec sanitise_scheme(scheme) :: {:ok, scheme} | {:error, any}
def sanitise_scheme(scheme) when scheme in [:http, :https], do: {:ok, scheme}
def sanitise_scheme(scheme) when scheme in [:http, :https, :ws, :wss], do: {:ok, scheme}
def sanitise_scheme(scheme),
do:
@ -58,7 +59,7 @@ defmodule Wayfarer.Utils do
@doc """
Convert a scheme, address, port tuple into a `URI`.
"""
@spec to_uri(:http | :https, address_input, port_number) :: {:ok, URI.t()} | {:error, any}
@spec to_uri(scheme, address_input, port_number) :: {:ok, URI.t()} | {:error, any}
def to_uri(scheme, address, port) do
with {:ok, scheme} <- sanitise_scheme(scheme),
{:ok, address} <- sanitise_ip_address(address),
@ -76,7 +77,7 @@ defmodule Wayfarer.Utils do
@doc """
Convert a list of targets into a match spec guard.
"""
@spec targets_to_ms_guard(atom, [{:http | :https, address_input, port_number}]) :: [
@spec targets_to_ms_guard(atom, [{scheme, :inet.ip_address(), port_number, transport}]) :: [
{atom, any, any}
]
def targets_to_ms_guard(_var, []), do: []
@ -92,11 +93,8 @@ defmodule Wayfarer.Utils do
@doc """
Convert a target tuple into a tuple safe for injection into a match spec.
"""
@spec target_to_ms({:http | :https, address_input, port_number}) ::
{{:http | :https, address_input, port_number}}
| {{:http | :https, {:inet.ip_address()}, port_number}}
def target_to_ms({scheme, address, port}) when is_tuple(address),
do: {{scheme, {address}, port}}
def target_to_ms({scheme, address, port}), do: {{scheme, address, port}}
@spec target_to_ms({scheme, :inet.ip_address(), port_number, transport}) ::
{{scheme, {:inet.ip_address()}, port_number, transport}}
def target_to_ms({scheme, address, port, transport}) when is_tuple(address),
do: {{scheme, {address}, port, transport}}
end

View file

@ -21,7 +21,7 @@ defmodule Wayfarer.MixProject do
source_url: "https://harton.dev/james/wayfarer",
homepage_url: "https://harton.dev/james/wayfarer",
aliases: aliases(),
dialyzer: [plt_add_apps: []],
dialyzer: [plt_ignore_apps: [:mint]],
docs: [
main: "Wayfarer",
formatters: ["html"],
@ -96,11 +96,13 @@ defmodule Wayfarer.MixProject do
{:castore, "~> 1.0"},
{:ip, "~> 2.0"},
{:mint, "~> 1.5"},
{:mint_web_socket, "~> 1.0"},
{:nimble_options, "~> 1.0"},
{:plug, "~> 1.15"},
{:spark, "~> 2.0"},
{:telemetry, "~> 1.2"},
{:websock, "~> 0.5"},
{:websock_adapter, "~> 0.5"},
# Dev/test
{:credo, "~> 1.7", opts},

View file

@ -27,6 +27,7 @@
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
"mimic": {:hex, :mimic, "1.9.0", "c96367749a884556718f64657a4bdc99ce0cb5d19333aa04308fbd061c31b8b7", [:mix], [], "hexpm", "92107697938490b300566317c2a1490ef52e23aeac16632c0e56740721189116"},
"mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
"mint_web_socket": {:hex, :mint_web_socket, "1.0.4", "0b539116dbb3d3f861cdf5e15e269a933cb501c113a14db7001a3157d96ffafd", [:mix], [{:mint, ">= 1.4.1 and < 2.0.0-0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "027d4c5529c45a4ba0ce27a01c0f35f284a5468519c045ca15f43decb360a991"},
"mix_audit": {:hex, :mix_audit, "2.1.4", "0a23d5b07350cdd69001c13882a4f5fb9f90fbd4cbf2ebc190a2ee0d187ea3e9", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "fd807653cc8c1cada2911129c7eb9e985e3cc76ebf26f4dd628bb25bbcaa7099"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
@ -41,6 +42,7 @@
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"ucwidth": {:hex, :ucwidth, "0.2.0", "1f0a440f541d895dff142275b96355f7e91e15bca525d4a0cc788ea51f0e3441", [:mix], [], "hexpm", "c1efd1798b8eeb11fb2bec3cafa3dd9c0c3647bee020543f0340b996177355bf"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.7", "65fa74042530064ef0570b75b43f5c49bb8b235d6515671b3d250022cb8a1f9e", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d0f478ee64deddfec64b800673fd6e0c8888b079d9f3444dd96d2a98383bdbd1"},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
"yaml_elixir": {:hex, :yaml_elixir, "2.11.0", "9e9ccd134e861c66b84825a3542a1c22ba33f338d82c07282f4f1f52d847bd50", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "53cc28357ee7eb952344995787f4bb8cc3cecbf189652236e9b163e8ce1bc242"},
}

View file

@ -4,25 +4,22 @@ defmodule Support.Example do
config "Example" do
listeners do
# http "127.0.0.1", 8080
http "0.0.0.0", 8080
http "0.0.0.0", 8000
end
targets do
http "127.0.0.1", 8082
http "192.168.4.26", 80
http "127.0.0.1", 4000
end
health_checks do
check do
interval :timer.seconds(5)
success_codes 200..399
end
end
host_patterns do
pattern "*.example.com"
pattern "example.com"
pattern "localhost"
end
end
end

View file

@ -48,7 +48,7 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
assert :ok = Router.add_route(table, listener, target, ["*.example.com"], :round_robin)
@ -61,7 +61,7 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
assert :ok = Router.add_route(table, listener, target, ["www.example.com"], :round_robin)
@ -74,7 +74,7 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
assert :ok =
Router.add_route(
@ -114,9 +114,9 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port(), :auto}
listener1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port(), :auto}
assert :ok =
Router.import_routes(table, [
@ -142,9 +142,9 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port(), :auto}
listener1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port(), :auto}
Router.import_routes(table, [
{listener0, target0, ["0.example.com"], :round_robin},
@ -164,9 +164,9 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port(), :auto}
listener1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port(), :auto}
Router.import_routes(table, [
{listener0, target0, ["0.example.com"], :round_robin},
@ -186,9 +186,9 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port(), :auto}
listener1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port(), :auto}
Router.import_routes(table, [
{listener0, target0, ["0.example.com"], :round_robin},
@ -215,8 +215,8 @@ defmodule Wayfarer.RouterTest do
{:ok, table} = Router.init(Support.Example)
listener = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port()}
target1 = {:http, {127, 0, 0, 1}, random_port()}
target0 = {:http, {127, 0, 0, 1}, random_port(), :auto}
target1 = {:http, {127, 0, 0, 1}, random_port(), :auto}
:ok =
Router.import_routes(table, [

View file

@ -52,7 +52,7 @@ defmodule Wayfarer.Server.PlugTest do
test "it looks for healthy targets in the router" do
listener = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
{:ok, _} = HttpServer.start_link(elem(target, 2), 200, "OK")
@ -76,7 +76,7 @@ defmodule Wayfarer.Server.PlugTest do
test "it selects a target to proxy to" do
listener = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
{:ok, _} = HttpServer.start_link(elem(target, 2), 200, "OK")

View file

@ -49,6 +49,7 @@ defmodule Wayfarer.Server.ProxyTest do
HTTP
|> stub(:connect, fn _, _, _, _ -> {:ok, :fake_conn} end)
|> stub(:stream_request_body, fn mint, _, _ -> {:ok, mint} end)
|> stub(:request, fn mint, _, _, _, _ ->
send(self(), :ignore)
{:ok, mint, req}
@ -59,7 +60,7 @@ defmodule Wayfarer.Server.ProxyTest do
describe "request/2" do
test "it opens an HTTP connection to the target and sends the request", %{conn: conn} do
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
req = make_ref()
HTTP
@ -72,6 +73,7 @@ defmodule Wayfarer.Server.ProxyTest do
{:ok, :fake_conn}
end)
|> expect(:stream_request_body, 2, fn mint, _, _ -> {:ok, mint} end)
|> expect(:request, fn mint, _, _, _, _ ->
send(self(), :ignore)
{:ok, mint, req}
@ -85,7 +87,7 @@ defmodule Wayfarer.Server.ProxyTest do
end
test "it records the outgoing connection", %{conn: conn} do
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
ActiveConnections
|> expect(:connect, fn ^target -> :ok end)
@ -103,7 +105,7 @@ defmodule Wayfarer.Server.ProxyTest do
{:error, %Mint.TransportError{reason: :protocol_not_negotiated}}
end)
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
assert conn = Proxy.request(conn, target)
assert conn.status == 502
end
@ -116,7 +118,7 @@ defmodule Wayfarer.Server.ProxyTest do
{:error, %Mint.TransportError{reason: :timeout}}
end)
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
assert conn = Proxy.request(conn, target)
assert conn.status == 504
end
@ -141,7 +143,7 @@ defmodule Wayfarer.Server.ProxyTest do
{:error, :ignore}
end)
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
Proxy.request(conn, target)
end
@ -176,7 +178,7 @@ defmodule Wayfarer.Server.ProxyTest do
{:error, :ignore}
end)
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
Proxy.request(conn, target)
end
@ -205,7 +207,7 @@ defmodule Wayfarer.Server.ProxyTest do
{:error, :ignore}
end)
target = {:http, {127, 0, 0, 1}, random_port()}
target = {:http, {127, 0, 0, 1}, random_port(), :auto}
Proxy.request(conn, target)
end
end

View file

@ -54,7 +54,7 @@ defmodule Wayfarer.ServerTest do
routing_table: [
{
{:http, ~i"127.0.0.1", listen_port},
{:http, ~i"127.0.0.1", target_port},
{:http, ~i"127.0.0.1", target_port, :auto},
["example.com"],
:round_robin
}
@ -63,7 +63,7 @@ defmodule Wayfarer.ServerTest do
assert :ets.tab2list(Support.Example) == [
{{:http, {127, 0, 0, 1}, listen_port}, {"example", "com"},
{:http, {127, 0, 0, 1}, target_port}, :round_robin, :initial}
{:http, {127, 0, 0, 1}, target_port, :auto}, :round_robin, :initial}
]
end
end

View file

@ -51,9 +51,9 @@ defmodule WayfarerTest do
]
],
routing_table: [
{{:http, ~i"127.0.0.1", listener_port}, {:http, ~i"127.0.0.1", target0_port},
{{:http, ~i"127.0.0.1", listener_port}, {:http, ~i"127.0.0.1", target0_port, :auto},
["www.example.com"], :round_robin},
{{:http, ~i"127.0.0.1", listener_port}, {:http, ~i"127.0.0.1", target1_port},
{{:http, ~i"127.0.0.1", listener_port}, {:http, ~i"127.0.0.1", target1_port, :auto},
["www.example.com"], :round_robin}
]
)
@ -69,8 +69,8 @@ defmodule WayfarerTest do
assert [1, 1] =
[
{:http, {127, 0, 0, 1}, target0_port},
{:http, {127, 0, 0, 1}, target1_port}
{:http, {127, 0, 0, 1}, target0_port, :auto},
{:http, {127, 0, 0, 1}, target1_port, :auto}
]
|> TotalConnections.proxy_count()
|> Enum.map(&elem(&1, 1))
@ -85,8 +85,8 @@ defmodule WayfarerTest do
assert [11, 11] =
[
{:http, {127, 0, 0, 1}, target0_port},
{:http, {127, 0, 0, 1}, target1_port}
{:http, {127, 0, 0, 1}, target0_port, :auto},
{:http, {127, 0, 0, 1}, target1_port, :auto}
]
|> TotalConnections.proxy_count()
|> Enum.map(&elem(&1, 1))