Compare commits
3 commits
main
...
connection
Author | SHA1 | Date | |
---|---|---|---|
01f446cc57 | |||
c1749dd0d1 | |||
15fa0d1fac |
7 changed files with 200 additions and 14 deletions
|
@ -6,7 +6,14 @@ defmodule Wayfarer.Server.Proxy do
|
|||
|
||||
alias Mint.HTTP
|
||||
alias Plug.Conn
|
||||
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections}
|
||||
|
||||
alias Wayfarer.{
|
||||
Router,
|
||||
Target.ActiveConnections,
|
||||
Target.ConnectionRecycler,
|
||||
Target.TotalConnections
|
||||
}
|
||||
|
||||
require Logger
|
||||
|
||||
@connect_timeout 5_000
|
||||
|
@ -17,13 +24,13 @@ defmodule Wayfarer.Server.Proxy do
|
|||
"""
|
||||
@spec request(Conn.t(), Router.target()) :: Conn.t()
|
||||
def request(conn, {scheme, address, port} = target) do
|
||||
with {:ok, mint} <-
|
||||
HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout),
|
||||
with {:ok, mint} <- ConnectionRecycler.checkout(scheme, address, port, conn.host),
|
||||
:ok <- ActiveConnections.connect(target),
|
||||
:ok <- TotalConnections.proxy_connect(target),
|
||||
{:ok, body, conn} <- read_request_body(conn),
|
||||
{:ok, mint, req} <- send_request(conn, mint, body),
|
||||
{:ok, conn, _mint} <- proxy_responses(conn, mint, req) do
|
||||
{:ok, conn, mint} <- proxy_responses(conn, mint, req),
|
||||
:ok <- ConnectionRecycler.checkin(scheme, address, port, conn.host, mint) do
|
||||
conn
|
||||
else
|
||||
error -> handle_error(error, conn, target)
|
||||
|
@ -180,9 +187,14 @@ defmodule Wayfarer.Server.Proxy do
|
|||
"#{:inet.ntoa(address)}:#{port}"
|
||||
end
|
||||
|
||||
headers = conn.req_headers |> Enum.reject(&(elem(&1, 0) == "connection"))
|
||||
|
||||
[
|
||||
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"}
|
||||
| conn.req_headers
|
||||
{"forwarded", "by=#{listener};for=#{client};host=#{conn.host};proto=#{conn.scheme}"},
|
||||
{"connection", "keep-alive"},
|
||||
{"keep-alive", "timeout=30"}
|
||||
| headers
|
||||
]
|
||||
|> dbg()
|
||||
end
|
||||
end
|
||||
|
|
|
@ -36,8 +36,8 @@ defmodule Wayfarer.Target.ActiveConnections do
|
|||
@impl true
|
||||
@spec handle_info(:tick | {:DOWN, any, :process, any, pid, any}, state) :: {:noreply, state}
|
||||
def handle_info(:tick, state) do
|
||||
size = :ets.info(state.table, :size)
|
||||
Logger.debug("Active connections: #{size}")
|
||||
# size = :ets.info(state.table, :size)
|
||||
# Logger.debug("Active connections: #{size}")
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
|
152
lib/wayfarer/target/connection_recycler.ex
Normal file
152
lib/wayfarer/target/connection_recycler.ex
Normal file
|
@ -0,0 +1,152 @@
|
|||
defmodule Wayfarer.Target.ConnectionRecycler do
|
||||
@default_ttl 30
|
||||
@default_ttl_ms :timer.seconds(@default_ttl)
|
||||
@default_sweep_interval 5
|
||||
@default_sweep_interval_ms :timer.seconds(@default_sweep_interval)
|
||||
|
||||
@moduledoc """
|
||||
A cache which recycles recently completed Mint connections rather than
|
||||
throwing them away.
|
||||
|
||||
Unfortunately, we can't use a fixed-size pool for this job because we need the
|
||||
number of connections to be able to grow when there is heavy demand.
|
||||
|
||||
When we need a connection we look in the cache, and if there is one available
|
||||
then we re-use it, however if there is not we create a new one and check it in
|
||||
to the cache when complete.
|
||||
|
||||
By default we keep connections around for #{@default_ttl}s before discarding
|
||||
them.
|
||||
"""
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias Mint.HTTP
|
||||
|
||||
@type state :: %{table: :ets.tid(), timer: :timer.tref()}
|
||||
|
||||
def checkout(scheme, address, port, hostname) do
|
||||
GenServer.call(
|
||||
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
|
||||
{:get_connection, scheme, address, port, hostname, self()}
|
||||
)
|
||||
end
|
||||
|
||||
def checkin(scheme, address, port, hostname, mint) do
|
||||
if HTTP.open?(mint, :read_write) do
|
||||
GenServer.cast(
|
||||
{:via, PartitionSupervisor, {__MODULE__, {scheme, address, port, hostname}}},
|
||||
{:checkin, scheme, address, port, hostname, mint}
|
||||
)
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec start_link(any) :: GenServer.on_start()
|
||||
def start_link(arg), do: GenServer.start_link(__MODULE__, arg)
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
@spec init(any) :: {:ok, state} | {:stop, any}
|
||||
def init(_) do
|
||||
case :timer.send_interval(@default_sweep_interval_ms, :tick) do
|
||||
{:ok, timer} ->
|
||||
table = :ets.new(__MODULE__, [:bag])
|
||||
|
||||
{:ok, %{table: table, timer: timer}}
|
||||
|
||||
{:error, reason} ->
|
||||
{:stop, reason}
|
||||
end
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
@spec handle_info(:tick, state) :: {:noreply, state}
|
||||
def handle_info(:tick, state) do
|
||||
# :ets.fun2ms(fn {_, checked_in_time, _} when checked_in_time < 123 -> true end)
|
||||
|
||||
# IO.puts(:ets.info(__MODULE__, :size))
|
||||
|
||||
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
|
||||
match_spec = [{{:_, :"$1", :_}, [{:<, :"$1", horizon}], [true]}]
|
||||
|
||||
:ets.select_delete(state.table, match_spec)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def handle_call({:get_connection, scheme, address, port, hostname, pid}, _from, state) do
|
||||
reply =
|
||||
with {:ok, mint} <- get_connection(state.table, scheme, address, port, hostname),
|
||||
{:ok, mint} <- HTTP.controlling_process(mint, pid) do
|
||||
HTTP.set_mode(mint, :active)
|
||||
end
|
||||
|
||||
{:reply, reply, state}
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def handle_cast({:checkin, scheme, address, port, hostname, mint}, state) do
|
||||
case HTTP.set_mode(mint, :passive) do
|
||||
{:ok, mint} ->
|
||||
:ets.insert_new(
|
||||
state.table,
|
||||
{{scheme, address, port, hostname}, System.monotonic_time(:millisecond), mint}
|
||||
)
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.debug("Error checking in #{inspect(reason)}")
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp get_connection(table, scheme, address, port, hostname) do
|
||||
case select_connection(table, scheme, address, port, hostname) do
|
||||
{:ok, mint} ->
|
||||
Logger.debug("Reusing connection #{inspect(mint)}")
|
||||
{:ok, mint}
|
||||
|
||||
:error ->
|
||||
Logger.debug("Creating new connection #{inspect({scheme, address, port, hostname})}")
|
||||
HTTP.connect(scheme, address, port, hostname: hostname, timeout: 5000)
|
||||
end
|
||||
end
|
||||
|
||||
defp select_connection(table, scheme, address, port, hostname) do
|
||||
# :ets.fun2ms(fn {{:http, {127, 0, 0, 1}, 80, "example.com"}, check_in_time, mint}
|
||||
# when check_in_time >= 123 ->
|
||||
# {check_in_time, mint}
|
||||
# end)
|
||||
|
||||
horizon = System.monotonic_time(:millisecond) - @default_ttl_ms
|
||||
|
||||
match_spec = [
|
||||
{{{scheme, address, port, hostname}, :"$1", :"$2"}, [{:>=, :"$1", horizon}],
|
||||
[{{:"$1", :"$2"}}]}
|
||||
]
|
||||
|
||||
case :ets.select(table, match_spec, 1) do
|
||||
{[{checked_in_at, mint} | _], _} ->
|
||||
:ets.delete_object(
|
||||
table,
|
||||
{{scheme, address, port, hostname}, checked_in_at, mint}
|
||||
)
|
||||
|
||||
if HTTP.open?(mint, :read_write) do
|
||||
{:ok, mint}
|
||||
else
|
||||
select_connection(table, scheme, address, port, hostname)
|
||||
end
|
||||
|
||||
_ ->
|
||||
:error
|
||||
end
|
||||
end
|
||||
end
|
|
@ -11,7 +11,9 @@ defmodule Wayfarer.Target.Supervisor do
|
|||
{Registry, keys: :unique, name: Wayfarer.Target.Registry},
|
||||
{DynamicSupervisor, name: Wayfarer.Target.DynamicSupervisor, strategy: :one_for_one},
|
||||
Wayfarer.Target.ActiveConnections,
|
||||
Wayfarer.Target.TotalConnections
|
||||
Wayfarer.Target.TotalConnections,
|
||||
{PartitionSupervisor,
|
||||
child_spec: Wayfarer.Target.ConnectionRecycler, name: Wayfarer.Target.ConnectionRecycler}
|
||||
]
|
||||
|> Supervisor.init(strategy: :one_for_one)
|
||||
end
|
||||
|
|
|
@ -96,9 +96,9 @@ defmodule Wayfarer.Target.TotalConnections do
|
|||
@impl true
|
||||
@spec handle_info(:tick, state) :: {:noreply, state}
|
||||
def handle_info(:tick, state) do
|
||||
for {target, count} <- :ets.tab2list(state.table) do
|
||||
Logger.debug("Total connections for #{inspect(target)}: #{count}")
|
||||
end
|
||||
# for {target, count} <- :ets.tab2list(state.table) do
|
||||
# Logger.debug("Total connections for #{inspect(target)}: #{count}")
|
||||
# end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
|
|
@ -9,7 +9,7 @@ defmodule Support.Example do
|
|||
end
|
||||
|
||||
targets do
|
||||
http "127.0.0.1", 8082
|
||||
# http "127.0.0.1", 8082
|
||||
|
||||
http "192.168.4.26", 80
|
||||
end
|
||||
|
@ -25,4 +25,24 @@ defmodule Support.Example do
|
|||
pattern "example.com"
|
||||
end
|
||||
end
|
||||
|
||||
config "Forgejo" do
|
||||
listeners do
|
||||
http "0.0.0.0", 8080
|
||||
end
|
||||
|
||||
targets do
|
||||
http "192.168.4.12", 80 do
|
||||
health_checks do
|
||||
check do
|
||||
success_codes [301..301]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
host_patterns do
|
||||
pattern "code.harton.nz"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -99,7 +99,7 @@ defmodule WayfarerTest do
|
|||
:ok
|
||||
|
||||
{:ok, _} ->
|
||||
Process.sleep(100)
|
||||
Process.sleep(10)
|
||||
wait_for_target_state(key, state)
|
||||
|
||||
{:error, reason} ->
|
||||
|
|
Loading…
Reference in a new issue