Compare commits
3 commits
main
...
connection
Author | SHA1 | Date | |
---|---|---|---|
01f446cc57 | |||
c1749dd0d1 | |||
15fa0d1fac |
7 changed files with 200 additions and 14 deletions
|
@ -6,7 +6,14 @@ defmodule Wayfarer.Server.Proxy do
|
||||||
|
|
||||||
alias Mint.HTTP
|
alias Mint.HTTP
|
||||||
alias Plug.Conn
|
alias Plug.Conn
|
||||||
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections}
|
|
||||||
|
alias Wayfarer.{
|
||||||
|
Router,
|
||||||
|
Target.ActiveConnections,
|
||||||
|
Target.ConnectionRecycler,
|
||||||
|
Target.TotalConnections
|
||||||
|
}
|
||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
@connect_timeout 5_000
|
@connect_timeout 5_000
|
||||||
|
@ -17,13 +24,13 @@ defmodule Wayfarer.Server.Proxy do
|
||||||
"""
|
"""
|
||||||
@spec request(Conn.t(), Router.target()) :: Conn.t()
|
@spec request(Conn.t(), Router.target()) :: Conn.t()
|
||||||
def request(conn, {scheme, address, port} = target) do
|
def request(conn, {scheme, address, port} = target) do
|
||||||
with {:ok, mint} <-
|
with {:ok, mint} <- ConnectionRecycler.checkout(scheme, address, port, conn.host),
|
||||||
HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout),
|
|
||||||
:ok <- ActiveConnections.connect(target),
|
:ok <- ActiveConnections.connect(target),
|
||||||
:ok <- TotalConnections.proxy_connect(target),
|
:ok <- TotalConnections.proxy_connect(target),
|
||||||
{:ok, body, conn} <- read_request_body(conn),
|
{:ok, body, conn} <- read_request_body(conn),
|
||||||
{:ok, mint, req} <- send_request(conn, mint, body),
|
{:ok, mint, req} <- send_request(conn, mint, body),
|
||||||
{:ok, conn, _mint} <- proxy_responses(conn, mint, req) do
|
{:ok, conn, mint} <- proxy_responses(conn, mint, req),
|
||||||
|
:ok <- ConnectionRecycler.checkin(scheme, address, port, conn.host, mint) do
|
||||||
conn
|
conn
|
||||||
else
|
else
|
||||||
error -> handle_error(error, conn, target)
|
error -> handle_error(error, conn, target)
|
||||||
|
@ -180,9 +187,14 @@ defmodule Wayfarer.Server.Proxy do
|
||||||
"#{:inet.ntoa(address)}:#{port}"
|
"#{:inet.ntoa(address)}:#{port}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
headers = conn.req_headers |> Enum.reject(&(elem(&1, 0) == "connection"))
|
||||||
|
|
||||||
[
|
[
|
||||||
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"}
|
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"},
|
||||||
| conn.req_headers
|
{"connection", "keep-alive"},
|
||||||
|
{"keep-alive", "timeout=30"}
|
||||||
|
| headers
|
||||||
]
|
]
|
||||||
|
|> dbg()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -36,8 +36,8 @@ defmodule Wayfarer.Target.ActiveConnections do
|
||||||
@impl true
|
@impl true
|
||||||
@spec handle_info(:tick | {:DOWN, any, :process, any, pid, any}, state) :: {:noreply, state}
|
@spec handle_info(:tick | {:DOWN, any, :process, any, pid, any}, state) :: {:noreply, state}
|
||||||
def handle_info(:tick, state) do
|
def handle_info(:tick, state) do
|
||||||
size = :ets.info(state.table, :size)
|
# size = :ets.info(state.table, :size)
|
||||||
Logger.debug("Active connections: #{size}")
|
# Logger.debug("Active connections: #{size}")
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
152
lib/wayfarer/target/connection_recycler.ex
Normal file
152
lib/wayfarer/target/connection_recycler.ex
Normal file
|
@ -0,0 +1,152 @@
|
||||||
|
defmodule Wayfarer.Target.ConnectionRecycler do
|
||||||
|
@default_ttl 30
|
||||||
|
@default_ttl_ms :timer.seconds(@default_ttl)
|
||||||
|
@default_sweep_interval 5
|
||||||
|
@default_sweep_interval_ms :timer.seconds(@default_sweep_interval)
|
||||||
|
|
||||||
|
@moduledoc """
|
||||||
|
A cache which recycles recently completed Mint connections rather than
|
||||||
|
throwing them away.
|
||||||
|
|
||||||
|
Unfortunately, we can't use a fixed-size pool for this job because we need the
|
||||||
|
number of connections to be able to grow when there is heavy demand.
|
||||||
|
|
||||||
|
When we need a connection we look in the cache, and if there is one available
|
||||||
|
then we re-use it, however if there is not we create a new one and check it in
|
||||||
|
to the cache when complete.
|
||||||
|
|
||||||
|
By default we keep connections around for #{@default_ttl}s before discarding
|
||||||
|
them.
|
||||||
|
"""
|
||||||
|
use GenServer
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Mint.HTTP
|
||||||
|
|
||||||
|
@type state :: %{table: :ets.tid(), timer: :timer.tref()}
|
||||||
|
|
||||||
|
def checkout(scheme, address, port, hostname) do
|
||||||
|
GenServer.call(
|
||||||
|
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
|
||||||
|
{:get_connection, scheme, address, port, hostname, self()}
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
def checkin(scheme, address, port, hostname, mint) do
|
||||||
|
if HTTP.open?(mint, :read_write) do
|
||||||
|
GenServer.cast(
|
||||||
|
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
|
||||||
|
{:checkin, scheme, address, port, hostname, mint}
|
||||||
|
)
|
||||||
|
else
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@spec start_link(any) :: GenServer.on_start()
|
||||||
|
def start_link(arg), do: GenServer.start_link(__MODULE__, arg)
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl true
|
||||||
|
@spec init(any) :: {:ok, state} | {:stop, any}
|
||||||
|
def init(_) do
|
||||||
|
case :timer.send_interval(@default_sweep_interval_ms, :tick) do
|
||||||
|
{:ok, timer} ->
|
||||||
|
table = :ets.new(__MODULE__, [:bag])
|
||||||
|
|
||||||
|
{:ok, %{table: table, timer: timer}}
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
{:stop, reason}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl true
|
||||||
|
@spec handle_info(:tick, state) :: {:noreply, state}
|
||||||
|
def handle_info(:tick, state) do
|
||||||
|
# :ets.fun2ms(fn {_, checked_in_time, _} when checked_in_time < 123 -> true end)
|
||||||
|
|
||||||
|
# IO.puts(:ets.info(__MODULE__, :size))
|
||||||
|
|
||||||
|
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
|
||||||
|
match_spec = [{{:_, :"$1", :_}, [{:<, :"$1", horizon}], [true]}]
|
||||||
|
|
||||||
|
:ets.select_delete(state.table, match_spec)
|
||||||
|
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl true
|
||||||
|
def handle_call({:get_connection, scheme, address, port, hostname, pid}, _from, state) do
|
||||||
|
reply =
|
||||||
|
with {:ok, mint} <- get_connection(state.table, scheme, address, port, hostname),
|
||||||
|
{:ok, mint} <- HTTP.controlling_process(mint, pid) do
|
||||||
|
HTTP.set_mode(mint, :active)
|
||||||
|
end
|
||||||
|
|
||||||
|
{:reply, reply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl true
|
||||||
|
def handle_cast({:checkin, scheme, address, port, hostname, mint}, state) do
|
||||||
|
case HTTP.set_mode(mint, :passive) do
|
||||||
|
{:ok, mint} ->
|
||||||
|
:ets.insert_new(
|
||||||
|
state.table,
|
||||||
|
{{scheme, address, port, hostname}, System.monotonic_time(:millisecond), mint}
|
||||||
|
)
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.debug("Error checking in #{inspect(reason)}")
|
||||||
|
end
|
||||||
|
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_connection(table, scheme, address, port, hostname) do
|
||||||
|
case select_connection(table, scheme, address, port, hostname) do
|
||||||
|
{:ok, mint} ->
|
||||||
|
Logger.debug("Reusing connection #{inspect(mint)}")
|
||||||
|
{:ok, mint}
|
||||||
|
|
||||||
|
:error ->
|
||||||
|
Logger.debug("Creating new connection #{inspect({scheme, address, port, hostname})}")
|
||||||
|
HTTP.connect(scheme, address, port, hostname: hostname, timeout: 5000)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp select_connection(table, scheme, address, port, hostname) do
|
||||||
|
# :ets.fun2ms(fn {{:http, {127, 0, 0, 1}, 80, "example.com"}, check_in_time, mint}
|
||||||
|
# when check_in_time >= 123 ->
|
||||||
|
# {check_in_time, mint}
|
||||||
|
# end)
|
||||||
|
|
||||||
|
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
|
||||||
|
|
||||||
|
match_spec = [
|
||||||
|
{{{scheme, address, port, hostname}, :"$1", :"$2"}, [{:>=, :"$1", horizon}],
|
||||||
|
[{{:"$1", :"$2"}}]}
|
||||||
|
]
|
||||||
|
|
||||||
|
case :ets.select(table, match_spec, 1) do
|
||||||
|
{[{checked_in_at, mint} | _], _} ->
|
||||||
|
:ets.delete_object(
|
||||||
|
table,
|
||||||
|
{{scheme, address, port, hostname}, checked_in_at, mint}
|
||||||
|
)
|
||||||
|
|
||||||
|
if HTTP.open?(mint, :read_write) do
|
||||||
|
{:ok, mint}
|
||||||
|
else
|
||||||
|
select_connection(table, scheme, address, port, hostname)
|
||||||
|
end
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
:error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -11,7 +11,9 @@ defmodule Wayfarer.Target.Supervisor do
|
||||||
{Registry, keys: :unique, name: Wayfarer.Target.Registry},
|
{Registry, keys: :unique, name: Wayfarer.Target.Registry},
|
||||||
{DynamicSupervisor, name: Wayfarer.Target.DynamicSupervisor, strategy: :one_for_one},
|
{DynamicSupervisor, name: Wayfarer.Target.DynamicSupervisor, strategy: :one_for_one},
|
||||||
Wayfarer.Target.ActiveConnections,
|
Wayfarer.Target.ActiveConnections,
|
||||||
Wayfarer.Target.TotalConnections
|
Wayfarer.Target.TotalConnections,
|
||||||
|
{PartitionSupervisor,
|
||||||
|
child_spec: Wayfarer.Target.ConnectionRecycler, name: Wayfarer.Target.ConnectionRecycler}
|
||||||
]
|
]
|
||||||
|> Supervisor.init(strategy: :one_for_one)
|
|> Supervisor.init(strategy: :one_for_one)
|
||||||
end
|
end
|
||||||
|
|
|
@ -96,9 +96,9 @@ defmodule Wayfarer.Target.TotalConnections do
|
||||||
@impl true
|
@impl true
|
||||||
@spec handle_info(:tick, state) :: {:noreply, state}
|
@spec handle_info(:tick, state) :: {:noreply, state}
|
||||||
def handle_info(:tick, state) do
|
def handle_info(:tick, state) do
|
||||||
for {target, count} <- :ets.tab2list(state.table) do
|
# for {target, count} <- :ets.tab2list(state.table) do
|
||||||
Logger.debug("Total connections for #{inspect(target)}: #{count}")
|
# Logger.debug("Total connections for #{inspect(target)}: #{count}")
|
||||||
end
|
# end
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,7 +9,7 @@ defmodule Support.Example do
|
||||||
end
|
end
|
||||||
|
|
||||||
targets do
|
targets do
|
||||||
http "127.0.0.1", 8082
|
# http "127.0.0.1", 8082
|
||||||
|
|
||||||
http "192.168.4.26", 80
|
http "192.168.4.26", 80
|
||||||
end
|
end
|
||||||
|
@ -25,4 +25,24 @@ defmodule Support.Example do
|
||||||
pattern "example.com"
|
pattern "example.com"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
config "Forgejo" do
|
||||||
|
listeners do
|
||||||
|
http "0.0.0.0", 8080
|
||||||
|
end
|
||||||
|
|
||||||
|
targets do
|
||||||
|
http "192.168.4.12", 80 do
|
||||||
|
health_checks do
|
||||||
|
check do
|
||||||
|
success_codes [301..301]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
host_patterns do
|
||||||
|
pattern "code.harton.nz"
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -99,7 +99,7 @@ defmodule WayfarerTest do
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
{:ok, _} ->
|
{:ok, _} ->
|
||||||
Process.sleep(100)
|
Process.sleep(10)
|
||||||
wait_for_target_state(key, state)
|
wait_for_target_state(key, state)
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
|
|
Loading…
Reference in a new issue