Compare commits

...

3 commits

Author SHA1 Message Date
01f446cc57
wip: try just using a simple genserver based pool. bad bad bad.
Some checks failed
continuous-integration/drone/push Build is failing
2023-11-22 17:12:30 +13:00
c1749dd0d1
wip: try various methods of doing this. 2023-11-22 15:17:51 +13:00
15fa0d1fac
wip: not sure why, but all the connections are closed when coming out of the recycler
Some checks failed
continuous-integration/drone/push Build is failing
2023-11-21 19:53:10 +13:00
7 changed files with 200 additions and 14 deletions

View file

@ -6,7 +6,14 @@ defmodule Wayfarer.Server.Proxy do
alias Mint.HTTP
alias Plug.Conn
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections}
alias Wayfarer.{
Router,
Target.ActiveConnections,
Target.ConnectionRecycler,
Target.TotalConnections
}
require Logger
@connect_timeout 5_000
@ -17,13 +24,13 @@ defmodule Wayfarer.Server.Proxy do
"""
@spec request(Conn.t(), Router.target()) :: Conn.t()
def request(conn, {scheme, address, port} = target) do
with {:ok, mint} <-
HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout),
with {:ok, mint} <- ConnectionRecycler.checkout(scheme, address, port, conn.host),
:ok <- ActiveConnections.connect(target),
:ok <- TotalConnections.proxy_connect(target),
{:ok, body, conn} <- read_request_body(conn),
{:ok, mint, req} <- send_request(conn, mint, body),
{:ok, conn, _mint} <- proxy_responses(conn, mint, req) do
{:ok, conn, mint} <- proxy_responses(conn, mint, req),
:ok <- ConnectionRecycler.checkin(scheme, address, port, conn.host, mint) do
conn
else
error -> handle_error(error, conn, target)
@ -180,9 +187,14 @@ defmodule Wayfarer.Server.Proxy do
"#{:inet.ntoa(address)}:#{port}"
end
headers = conn.req_headers |> Enum.reject(&(elem(&1, 0) == "connection"))
[
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"}
| conn.req_headers
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"},
{"connection", "keep-alive"},
{"keep-alive", "timeout=30"}
| headers
]
|> dbg()
end
end

View file

@ -36,8 +36,8 @@ defmodule Wayfarer.Target.ActiveConnections do
@impl true
@spec handle_info(:tick | {:DOWN, any, :process, any, pid, any}, state) :: {:noreply, state}
def handle_info(:tick, state) do
size = :ets.info(state.table, :size)
Logger.debug("Active connections: #{size}")
# size = :ets.info(state.table, :size)
# Logger.debug("Active connections: #{size}")
{:noreply, state}
end

View file

@ -0,0 +1,152 @@
defmodule Wayfarer.Target.ConnectionRecycler do
@default_ttl 30
@default_ttl_ms :timer.seconds(@default_ttl)
@default_sweep_interval 5
@default_sweep_interval_ms :timer.seconds(@default_sweep_interval)
@moduledoc """
A cache which recycles recently completed Mint connections rather than
throwing them away.
Unfortunately, we can't use a fixed-size pool for this job because we need the
number of connections to be able to grow when there is heavy demand.
When we need a connection we look in the cache, and if there is one available
then we re-use it, however if there is not we create a new one and check it in
to the cache when complete.
By default we keep connections around for #{@default_ttl}s before discarding
them.
"""
use GenServer
require Logger
alias Mint.HTTP
@type state :: %{table: :ets.tid(), timer: :timer.tref()}
def checkout(scheme, address, port, hostname) do
GenServer.call(
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
{:get_connection, scheme, address, port, hostname, self()}
)
end
def checkin(scheme, address, port, hostname, mint) do
if HTTP.open?(mint, :read_write) do
GenServer.cast(
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
{:checkin, scheme, address, port, hostname, mint}
)
else
:ok
end
end
@doc false
@spec start_link(any) :: GenServer.on_start()
def start_link(arg), do: GenServer.start_link(__MODULE__, arg)
@doc false
@impl true
@spec init(any) :: {:ok, state} | {:stop, any}
def init(_) do
case :timer.send_interval(@default_sweep_interval_ms, :tick) do
{:ok, timer} ->
table = :ets.new(__MODULE__, [:bag])
{:ok, %{table: table, timer: timer}}
{:error, reason} ->
{:stop, reason}
end
end
@doc false
@impl true
@spec handle_info(:tick, state) :: {:noreply, state}
def handle_info(:tick, state) do
# :ets.fun2ms(fn {_, checked_in_time, _} when checked_in_time < 123 -> true end)
# IO.puts(:ets.info(__MODULE__, :size))
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
match_spec = [{{:_, :"$1", :_}, [{:<, :"$1", horizon}], [true]}]
:ets.select_delete(state.table, match_spec)
{:noreply, state}
end
@doc false
@impl true
def handle_call({:get_connection, scheme, address, port, hostname, pid}, _from, state) do
reply =
with {:ok, mint} <- get_connection(state.table, scheme, address, port, hostname),
{:ok, mint} <- HTTP.controlling_process(mint, pid) do
HTTP.set_mode(mint, :active)
end
{:reply, reply, state}
end
@doc false
@impl true
def handle_cast({:checkin, scheme, address, port, hostname, mint}, state) do
case HTTP.set_mode(mint, :passive) do
{:ok, mint} ->
:ets.insert_new(
state.table,
{{scheme, address, port, hostname}, System.monotonic_time(:millisecond), mint}
)
{:error, reason} ->
Logger.debug("Error checking in #{inspect(reason)}")
end
{:noreply, state}
end
defp get_connection(table, scheme, address, port, hostname) do
case select_connection(table, scheme, address, port, hostname) do
{:ok, mint} ->
Logger.debug("Reusing connection #{inspect(mint)}")
{:ok, mint}
:error ->
Logger.debug("Creating new connection #{inspect({scheme, address, port, hostname})}")
HTTP.connect(scheme, address, port, hostname: hostname, timeout: 5000)
end
end
defp select_connection(table, scheme, address, port, hostname) do
# :ets.fun2ms(fn {{:http, {127, 0, 0, 1}, 80, "example.com"}, check_in_time, mint}
# when check_in_time >= 123 ->
# {check_in_time, mint}
# end)
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
match_spec = [
{{{scheme, address, port, hostname}, :"$1", :"$2"}, [{:>=, :"$1", horizon}],
[{{:"$1", :"$2"}}]}
]
case :ets.select(table, match_spec, 1) do
{[{checked_in_at, mint} | _], _} ->
:ets.delete_object(
table,
{{scheme, address, port, hostname}, checked_in_at, mint}
)
if HTTP.open?(mint, :read_write) do
{:ok, mint}
else
select_connection(table, scheme, address, port, hostname)
end
_ ->
:error
end
end
end

View file

@ -11,7 +11,9 @@ defmodule Wayfarer.Target.Supervisor do
{Registry, keys: :unique, name: Wayfarer.Target.Registry},
{DynamicSupervisor, name: Wayfarer.Target.DynamicSupervisor, strategy: :one_for_one},
Wayfarer.Target.ActiveConnections,
Wayfarer.Target.TotalConnections
Wayfarer.Target.TotalConnections,
{PartitionSupervisor,
child_spec: Wayfarer.Target.ConnectionRecycler, name: Wayfarer.Target.ConnectionRecycler}
]
|> Supervisor.init(strategy: :one_for_one)
end

View file

@ -96,9 +96,9 @@ defmodule Wayfarer.Target.TotalConnections do
@impl true
@spec handle_info(:tick, state) :: {:noreply, state}
def handle_info(:tick, state) do
for {target, count} <- :ets.tab2list(state.table) do
Logger.debug("Total connections for #{inspect(target)}: #{count}")
end
# for {target, count} <- :ets.tab2list(state.table) do
# Logger.debug("Total connections for #{inspect(target)}: #{count}")
# end
{:noreply, state}
end

View file

@ -9,7 +9,7 @@ defmodule Support.Example do
end
targets do
http "127.0.0.1", 8082
# http "127.0.0.1", 8082
http "192.168.4.26", 80
end
@ -25,4 +25,24 @@ defmodule Support.Example do
pattern "example.com"
end
end
config "Forgejo" do
listeners do
http "0.0.0.0", 8080
end
targets do
http "192.168.4.12", 80 do
health_checks do
check do
success_codes [301..301]
end
end
end
end
host_patterns do
pattern "code.harton.nz"
end
end
end

View file

@ -99,7 +99,7 @@ defmodule WayfarerTest do
:ok
{:ok, _} ->
Process.sleep(100)
Process.sleep(10)
wait_for_target_state(key, state)
{:error, reason} ->