wip: try just using a simple genserver based pool. bad bad bad.
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
James Harton 2023-11-22 17:12:30 +13:00
parent c1749dd0d1
commit 01f446cc57
Signed by: james
GPG key ID: 90E82DAA13F624F4
6 changed files with 103 additions and 103 deletions

View file

@ -24,7 +24,7 @@ defmodule Wayfarer.Server.Proxy do
"""
@spec request(Conn.t(), Router.target()) :: Conn.t()
def request(conn, {scheme, address, port} = target) do
with {:ok, mint} <- connect(scheme, address, port, conn.host),
with {:ok, mint} <- ConnectionRecycler.checkout(scheme, address, port, conn.host),
:ok <- ActiveConnections.connect(target),
:ok <- TotalConnections.proxy_connect(target),
{:ok, body, conn} <- read_request_body(conn),
@ -37,15 +37,6 @@ defmodule Wayfarer.Server.Proxy do
end
end
defp connect(scheme, address, port, hostname) do
with {:ok, mint} <- ConnectionRecycler.try_checkout(scheme, address, port, hostname) do
{:ok, mint}
else
:error -> HTTP.connect(scheme, address, port, hostname: hostname, timeout: @connect_timeout)
{:error, reason} -> {:error, reason}
end
end
defp handle_error({:error, _, reason}, conn, target),
do: handle_error({:error, reason}, conn, target)
@ -200,8 +191,10 @@ defmodule Wayfarer.Server.Proxy do
[
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"},
{"connection", "keep-alive"}
{"connection", "keep-alive"},
{"keep-alive", "timeout=30"}
| headers
]
|> dbg()
end
end

View file

@ -19,93 +19,33 @@ defmodule Wayfarer.Target.ConnectionRecycler do
them.
"""
use GenServer
require Logger
alias Mint.HTTP
@type state :: %{table: :ets.tid(), timer: :timer.tref()}
def try_checkout(scheme, address, port, hostname) do
if acquire_lock(scheme, address, port, hostname) do
result =
with {:ok, mint} <- get_next_connection(scheme, address, port, hostname),
{:ok, mint} <- HTTP.controlling_process(mint, self()) |> dbg(),
{:ok, mint} <- HTTP.set_mode(mint, :active) do
dbg(mint: mint, open?: HTTP.open?(mint))
{:ok, mint}
end
release_lock(scheme, address, port, hostname)
result
else
:error
end
end
defp get_next_connection(scheme, address, port, hostname) do
# :ets.fun2ms(fn {{:http, {127, 0, 0, 1}, 80, "example.com"}, check_in_time, mint}
# when check_in_time >= 123 ->
# {check_in_time, mint}
# end)
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
match_spec = [
{{{scheme, address, port, hostname}, :"$1", :"$2"}, [{:>=, :"$1", horizon}],
[{{:"$1", :"$2"}}]}
]
case :ets.select(__MODULE__, match_spec, 1) do
{[{checked_in_at, mint} | _], _} ->
:ets.delete_object(
__MODULE__,
{{scheme, address, port, hostname}, checked_in_at, mint}
)
# mint |> dbg()
# :erlang.port_info(mint.socket) |> dbg()
{:ok, mint}
_ ->
:error
end
def checkout(scheme, address, port, hostname) do
GenServer.call(
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
{:get_connection, scheme, address, port, hostname, self()}
)
end
def checkin(scheme, address, port, hostname, mint) do
if HTTP.open?(mint, :read_write) do
do_checkin(scheme, address, port, hostname, mint)
else
:ok
end
end
defp do_checkin(scheme, address, port, hostname, mint) do
pid = Process.whereis(__MODULE__)
unless pid, do: raise("This should never happen!")
self() |> dbg()
mint |> dbg()
:erlang.port_info(mint.socket) |> dbg()
Process.unlink(mint.socket)
with {:ok, mint} <- HTTP.set_mode(mint, :passive) do
# {:ok, mint} <- HTTP.controlling_process(mint, pid) do
:ets.insert(
__MODULE__,
{{scheme, address, port, hostname}, System.monotonic_time(:millisecond), mint}
GenServer.cast(
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
{:checkin, scheme, address, port, hostname, mint}
)
else
:ok
end
end
@doc false
@spec start_link(any) :: GenServer.on_start()
def start_link(arg), do: GenServer.start_link(__MODULE__, arg, name: __MODULE__)
def start_link(arg), do: GenServer.start_link(__MODULE__, arg)
@doc false
@impl true
@ -113,15 +53,7 @@ defmodule Wayfarer.Target.ConnectionRecycler do
def init(_) do
case :timer.send_interval(@default_sweep_interval_ms, :tick) do
{:ok, timer} ->
table =
__MODULE__
|> :ets.new([
:public,
:named_table,
:bag,
read_concurrency: true,
write_concurrency: true
])
table = :ets.new(__MODULE__, [:bag])
{:ok, %{table: table, timer: timer}}
@ -146,19 +78,75 @@ defmodule Wayfarer.Target.ConnectionRecycler do
{:noreply, state}
end
defp acquire_lock(scheme, address, port, hostname, remaining \\ 15)
@doc false
@impl true
def handle_call({:get_connection, scheme, address, port, hostname, pid}, _from, state) do
reply =
with {:ok, mint} <- get_connection(state.table, scheme, address, port, hostname),
{:ok, mint} <- HTTP.controlling_process(mint, pid) do
HTTP.set_mode(mint, :active)
end
defp acquire_lock(_scheme, _address, _port, _hostname, 0), do: false
{:reply, reply, state}
end
defp acquire_lock(scheme, address, port, hostname, remaining) do
if Semaphore.acquire({__MODULE__, scheme, address, port, hostname}, 1) do
true
else
acquire_lock(scheme, address, port, hostname, remaining - 1)
@doc false
@impl true
def handle_cast({:checkin, scheme, address, port, hostname, mint}, state) do
case HTTP.set_mode(mint, :passive) do
{:ok, mint} ->
:ets.insert_new(
state.table,
{{scheme, address, port, hostname}, System.monotonic_time(:millisecond), mint}
)
{:error, reason} ->
Logger.debug("Error checking in #{inspect(reason)}")
end
{:noreply, state}
end
defp get_connection(table, scheme, address, port, hostname) do
case select_connection(table, scheme, address, port, hostname) do
{:ok, mint} ->
Logger.debug("Reusing connection #{inspect(mint)}")
{:ok, mint}
:error ->
Logger.debug("Creating new connection #{inspect({scheme, address, port, hostname})}")
HTTP.connect(scheme, address, port, hostname: hostname, timeout: 5000)
end
end
defp release_lock(scheme, address, port, hostname) do
Semaphore.release({__MODULE__, scheme, address, port, hostname})
defp select_connection(table, scheme, address, port, hostname) do
# :ets.fun2ms(fn {{:http, {127, 0, 0, 1}, 80, "example.com"}, check_in_time, mint}
# when check_in_time >= 123 ->
# {check_in_time, mint}
# end)
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
match_spec = [
{{{scheme, address, port, hostname}, :"$1", :"$2"}, [{:>=, :"$1", horizon}],
[{{:"$1", :"$2"}}]}
]
case :ets.select(table, match_spec, 1) do
{[{checked_in_at, mint} | _], _} ->
:ets.delete_object(
table,
{{scheme, address, port, hostname}, checked_in_at, mint}
)
if HTTP.open?(mint, :read_write) do
{:ok, mint}
else
select_connection(table, scheme, address, port, hostname)
end
_ ->
:error
end
end
end

View file

@ -12,7 +12,8 @@ defmodule Wayfarer.Target.Supervisor do
{DynamicSupervisor, name: Wayfarer.Target.DynamicSupervisor, strategy: :one_for_one},
Wayfarer.Target.ActiveConnections,
Wayfarer.Target.TotalConnections,
Wayfarer.Target.ConnectionRecycler
{PartitionSupervisor,
child_spec: Wayfarer.Target.ConnectionRecycler, name: Wayfarer.Target.ConnectionRecycler}
]
|> Supervisor.init(strategy: :one_for_one)
end

View file

@ -96,7 +96,6 @@ defmodule Wayfarer.MixProject do
{:mint, "~> 1.5"},
{:nimble_options, "~> 1.0"},
{:plug, "~> 1.15"},
{:semaphore, "~> 1.3"},
{:spark, "~> 1.1"},
{:telemetry, "~> 1.2"},
{:websock, "~> 0.5"},

View file

@ -29,7 +29,6 @@
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"plug": {:hex, :plug, "1.15.1", "b7efd81c1a1286f13efb3f769de343236bd8b7d23b4a9f40d3002fc39ad8f74c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "459497bd94d041d98d948054ec6c0b76feacd28eec38b219ca04c0de13c79d30"},
"plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"},
"semaphore": {:hex, :semaphore, "1.3.0", "f99a6e76a5d40420453387de85f3d26890dee935bb5875b5a9be52404334c2c6", [:mix], [], "hexpm", "09df576bc4928200defd2d82990e415d9d3701e92136742c4a8f9bf39eb61555"},
"sourceror": {:hex, :sourceror, "0.14.0", "b6b8552d0240400d66b6f107c1bab7ac1726e998efc797f178b7b517e928e314", [:mix], [], "hexpm", "809c71270ad48092d40bbe251a133e49ae229433ce103f762a2373b7a10a8d8b"},
"spark": {:hex, :spark, "1.1.48", "64b804711818526e371d12ea3acc886365b14239565e361001aad801a38bad85", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "3215a8b1bb1dc93945ce9a0f68430d7265ea596c6b911f7bd6dba77b65cee370"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},

View file

@ -25,4 +25,24 @@ defmodule Support.Example do
pattern "example.com"
end
end
config "Forgejo" do
listeners do
http "0.0.0.0", 8080
end
targets do
http "192.168.4.12", 80 do
health_checks do
check do
success_codes [301..301]
end
end
end
end
host_patterns do
pattern "code.harton.nz"
end
end
end