diff --git a/lib/wayfarer/server/proxy.ex b/lib/wayfarer/server/proxy.ex index 1ba547c..26aa6ba 100644 --- a/lib/wayfarer/server/proxy.ex +++ b/lib/wayfarer/server/proxy.ex @@ -24,7 +24,7 @@ defmodule Wayfarer.Server.Proxy do """ @spec request(Conn.t(), Router.target()) :: Conn.t() def request(conn, {scheme, address, port} = target) do - with {:ok, mint} <- connect(scheme, address, port, conn.host), + with {:ok, mint} <- ConnectionRecycler.checkout(scheme, address, port, conn.host), :ok <- ActiveConnections.connect(target), :ok <- TotalConnections.proxy_connect(target), {:ok, body, conn} <- read_request_body(conn), @@ -37,15 +37,6 @@ defmodule Wayfarer.Server.Proxy do end end - defp connect(scheme, address, port, hostname) do - with {:ok, mint} <- ConnectionRecycler.try_checkout(scheme, address, port, hostname) do - {:ok, mint} - else - :error -> HTTP.connect(scheme, address, port, hostname: hostname, timeout: @connect_timeout) - {:error, reason} -> {:error, reason} - end - end - defp handle_error({:error, _, reason}, conn, target), do: handle_error({:error, reason}, conn, target) @@ -200,8 +191,10 @@ defmodule Wayfarer.Server.Proxy do [ {"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"}, - {"connection", "keep-alive"} + {"connection", "keep-alive"}, + {"keep-alive", "timeout=30"} | headers ] + |> dbg() end end diff --git a/lib/wayfarer/target/connection_recycler.ex b/lib/wayfarer/target/connection_recycler.ex index baedb36..432ed06 100644 --- a/lib/wayfarer/target/connection_recycler.ex +++ b/lib/wayfarer/target/connection_recycler.ex @@ -19,93 +19,33 @@ defmodule Wayfarer.Target.ConnectionRecycler do them. """ use GenServer + require Logger alias Mint.HTTP @type state :: %{table: :ets.tid(), timer: :timer.tref()} - def try_checkout(scheme, address, port, hostname) do - if acquire_lock(scheme, address, port, hostname) do - result = - with {:ok, mint} <- get_next_connection(scheme, address, port, hostname), - {:ok, mint} <- HTTP.controlling_process(mint, self()) |> dbg(), - {:ok, mint} <- HTTP.set_mode(mint, :active) do - dbg(mint: mint, open?: HTTP.open?(mint)) - - {:ok, mint} - end - - release_lock(scheme, address, port, hostname) - - result - else - :error - end - end - - defp get_next_connection(scheme, address, port, hostname) do - # :ets.fun2ms(fn {{:http, {127, 0, 0, 1}, 80, "example.com"}, check_in_time, mint} - # when check_in_time >= 123 -> - # {check_in_time, mint} - # end) - - horizon = System.monotonic_time(:millisecond) - @default_ttl_ms - - match_spec = [ - {{{scheme, address, port, hostname}, :"$1", :"$2"}, [{:>=, :"$1", horizon}], - [{{:"$1", :"$2"}}]} - ] - - case :ets.select(__MODULE__, match_spec, 1) do - {[{checked_in_at, mint} | _], _} -> - :ets.delete_object( - __MODULE__, - {{scheme, address, port, hostname}, checked_in_at, mint} - ) - - # mint |> dbg() - # :erlang.port_info(mint.socket) |> dbg() - - {:ok, mint} - - _ -> - :error - end + def checkout(scheme, address, port, hostname) do + GenServer.call( + {:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}}, + {:get_connection, scheme, address, port, hostname, self()} + ) end def checkin(scheme, address, port, hostname, mint) do if HTTP.open?(mint, :read_write) do - do_checkin(scheme, address, port, hostname, mint) - else - :ok - end - end - - defp do_checkin(scheme, address, port, hostname, mint) do - pid = Process.whereis(__MODULE__) - - unless pid, do: raise("This should never happen!") - - self() |> dbg() - mint |> dbg() - :erlang.port_info(mint.socket) |> dbg() - - Process.unlink(mint.socket) - - with {:ok, mint} <- HTTP.set_mode(mint, :passive) do - # {:ok, mint} <- HTTP.controlling_process(mint, pid) do - :ets.insert( - __MODULE__, - {{scheme, address, port, hostname}, System.monotonic_time(:millisecond), mint} + GenServer.cast( + {:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}}, + {:checkin, scheme, address, port, hostname, mint} ) - + else :ok end end @doc false @spec start_link(any) :: GenServer.on_start() - def start_link(arg), do: GenServer.start_link(__MODULE__, arg, name: __MODULE__) + def start_link(arg), do: GenServer.start_link(__MODULE__, arg) @doc false @impl true @@ -113,15 +53,7 @@ defmodule Wayfarer.Target.ConnectionRecycler do def init(_) do case :timer.send_interval(@default_sweep_interval_ms, :tick) do {:ok, timer} -> - table = - __MODULE__ - |> :ets.new([ - :public, - :named_table, - :bag, - read_concurrency: true, - write_concurrency: true - ]) + table = :ets.new(__MODULE__, [:bag]) {:ok, %{table: table, timer: timer}} @@ -146,19 +78,75 @@ defmodule Wayfarer.Target.ConnectionRecycler do {:noreply, state} end - defp acquire_lock(scheme, address, port, hostname, remaining \\ 15) + @doc false + @impl true + def handle_call({:get_connection, scheme, address, port, hostname, pid}, _from, state) do + reply = + with {:ok, mint} <- get_connection(state.table, scheme, address, port, hostname), + {:ok, mint} <- HTTP.controlling_process(mint, pid) do + HTTP.set_mode(mint, :active) + end - defp acquire_lock(_scheme, _address, _port, _hostname, 0), do: false + {:reply, reply, state} + end - defp acquire_lock(scheme, address, port, hostname, remaining) do - if Semaphore.acquire({__MODULE__, scheme, address, port, hostname}, 1) do - true - else - acquire_lock(scheme, address, port, hostname, remaining - 1) + @doc false + @impl true + def handle_cast({:checkin, scheme, address, port, hostname, mint}, state) do + case HTTP.set_mode(mint, :passive) do + {:ok, mint} -> + :ets.insert_new( + state.table, + {{scheme, address, port, hostname}, System.monotonic_time(:millisecond), mint} + ) + + {:error, reason} -> + Logger.debug("Error checking in #{inspect(reason)}") + end + + {:noreply, state} + end + + defp get_connection(table, scheme, address, port, hostname) do + case select_connection(table, scheme, address, port, hostname) do + {:ok, mint} -> + Logger.debug("Reusing connection #{inspect(mint)}") + {:ok, mint} + + :error -> + Logger.debug("Creating new connection #{inspect({scheme, address, port, hostname})}") + HTTP.connect(scheme, address, port, hostname: hostname, timeout: 5000) end end - defp release_lock(scheme, address, port, hostname) do - Semaphore.release({__MODULE__, scheme, address, port, hostname}) + defp select_connection(table, scheme, address, port, hostname) do + # :ets.fun2ms(fn {{:http, {127, 0, 0, 1}, 80, "example.com"}, check_in_time, mint} + # when check_in_time >= 123 -> + # {check_in_time, mint} + # end) + + horizon = System.monotonic_time(:millisecond) - @default_ttl_ms + + match_spec = [ + {{{scheme, address, port, hostname}, :"$1", :"$2"}, [{:>=, :"$1", horizon}], + [{{:"$1", :"$2"}}]} + ] + + case :ets.select(table, match_spec, 1) do + {[{checked_in_at, mint} | _], _} -> + :ets.delete_object( + table, + {{scheme, address, port, hostname}, checked_in_at, mint} + ) + + if HTTP.open?(mint, :read_write) do + {:ok, mint} + else + select_connection(table, scheme, address, port, hostname) + end + + _ -> + :error + end end end diff --git a/lib/wayfarer/target/supervisor.ex b/lib/wayfarer/target/supervisor.ex index c45a1d2..e768be9 100644 --- a/lib/wayfarer/target/supervisor.ex +++ b/lib/wayfarer/target/supervisor.ex @@ -12,7 +12,8 @@ defmodule Wayfarer.Target.Supervisor do {DynamicSupervisor, name: Wayfarer.Target.DynamicSupervisor, strategy: :one_for_one}, Wayfarer.Target.ActiveConnections, Wayfarer.Target.TotalConnections, - Wayfarer.Target.ConnectionRecycler + {PartitionSupervisor, + child_spec: Wayfarer.Target.ConnectionRecycler, name: Wayfarer.Target.ConnectionRecycler} ] |> Supervisor.init(strategy: :one_for_one) end diff --git a/mix.exs b/mix.exs index 4c838de..62ba0e8 100644 --- a/mix.exs +++ b/mix.exs @@ -96,7 +96,6 @@ defmodule Wayfarer.MixProject do {:mint, "~> 1.5"}, {:nimble_options, "~> 1.0"}, {:plug, "~> 1.15"}, - {:semaphore, "~> 1.3"}, {:spark, "~> 1.1"}, {:telemetry, "~> 1.2"}, {:websock, "~> 0.5"}, diff --git a/mix.lock b/mix.lock index 130af4b..b98f0fc 100644 --- a/mix.lock +++ b/mix.lock @@ -29,7 +29,6 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "plug": {:hex, :plug, "1.15.1", "b7efd81c1a1286f13efb3f769de343236bd8b7d23b4a9f40d3002fc39ad8f74c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "459497bd94d041d98d948054ec6c0b76feacd28eec38b219ca04c0de13c79d30"}, "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, - "semaphore": {:hex, :semaphore, "1.3.0", "f99a6e76a5d40420453387de85f3d26890dee935bb5875b5a9be52404334c2c6", [:mix], [], "hexpm", "09df576bc4928200defd2d82990e415d9d3701e92136742c4a8f9bf39eb61555"}, "sourceror": {:hex, :sourceror, "0.14.0", "b6b8552d0240400d66b6f107c1bab7ac1726e998efc797f178b7b517e928e314", [:mix], [], "hexpm", "809c71270ad48092d40bbe251a133e49ae229433ce103f762a2373b7a10a8d8b"}, "spark": {:hex, :spark, "1.1.48", "64b804711818526e371d12ea3acc886365b14239565e361001aad801a38bad85", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "3215a8b1bb1dc93945ce9a0f68430d7265ea596c6b911f7bd6dba77b65cee370"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, diff --git a/test/support/example.ex b/test/support/example.ex index 5e9e345..49a6a7a 100644 --- a/test/support/example.ex +++ b/test/support/example.ex @@ -25,4 +25,24 @@ defmodule Support.Example do pattern "example.com" end end + + config "Forgejo" do + listeners do + http "0.0.0.0", 8080 + end + + targets do + http "192.168.4.12", 80 do + health_checks do + check do + success_codes [301..301] + end + end + end + end + + host_patterns do + pattern "code.harton.nz" + end + end end