Add request telemetry #114
7 changed files with 1093 additions and 85 deletions
3
.vscode/settings.json
vendored
Normal file
3
.vscode/settings.json
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
{
|
||||
"cSpell.words": ["ntoa"]
|
||||
}
|
|
@ -3,7 +3,7 @@ defmodule Wayfarer.Server.Plug do
|
|||
Plug pipeline to handle inbound HTTP connections.
|
||||
"""
|
||||
|
||||
alias Wayfarer.{Router, Server.Proxy, Target.Selector}
|
||||
alias Wayfarer.{Router, Server.Proxy, Target.Selector, Telemetry}
|
||||
require Logger
|
||||
|
||||
import Plug.Conn
|
||||
|
@ -17,22 +17,57 @@ defmodule Wayfarer.Server.Plug do
|
|||
@doc false
|
||||
@impl true
|
||||
@spec call(Plug.Conn.t(), map) :: Plug.Conn.t()
|
||||
def call(conn, config) when is_atom(config.module) do
|
||||
conn = put_private(conn, :wayfarer, %{listener: config})
|
||||
def call(conn, config) do
|
||||
transport = get_transport(conn)
|
||||
|
||||
conn
|
||||
|> put_private(:wayfarer, %{listener: config, transport: transport})
|
||||
|> Telemetry.request_start()
|
||||
|> do_call(config)
|
||||
end
|
||||
|
||||
defp do_call(conn, config) when is_atom(config.module) do
|
||||
listener = {config.scheme, config.address, config.port}
|
||||
|
||||
with {:ok, targets} <- Router.find_healthy_targets(config.module, listener, conn.host),
|
||||
{:ok, targets, algorithm} <- split_targets_and_algorithms(targets),
|
||||
{:ok, target} <- Selector.choose(conn, targets, algorithm) do
|
||||
do_proxy(conn, target)
|
||||
conn
|
||||
|> Telemetry.request_routed(target, algorithm)
|
||||
|> do_proxy(target)
|
||||
else
|
||||
:error -> bad_gateway(conn)
|
||||
{:error, reason} -> internal_error(conn, reason)
|
||||
:error ->
|
||||
conn
|
||||
|> Telemetry.request_exception(:error, :target_not_found)
|
||||
|> bad_gateway()
|
||||
|
||||
{:error, reason} ->
|
||||
conn
|
||||
|> Telemetry.request_exception(:error, reason)
|
||||
|> internal_error(reason)
|
||||
end
|
||||
rescue
|
||||
exception ->
|
||||
conn
|
||||
|> Telemetry.request_exception(:exception, exception, __STACKTRACE__)
|
||||
|> internal_error(exception)
|
||||
catch
|
||||
reason ->
|
||||
conn
|
||||
|> Telemetry.request_exception(:throw, reason)
|
||||
|> internal_error(reason)
|
||||
|
||||
kind, reason ->
|
||||
conn
|
||||
|> Telemetry.request_exception(kind, reason)
|
||||
|> internal_error(reason)
|
||||
end
|
||||
|
||||
def call(conn, _config), do: bad_gateway(conn)
|
||||
defp do_call(conn, _config) do
|
||||
conn
|
||||
|> Telemetry.request_exception(:error, :unrecognised_request)
|
||||
|> bad_gateway()
|
||||
end
|
||||
|
||||
defp internal_error(conn, reason) do
|
||||
Logger.error("Internal error when routing proxy request: #{inspect(reason)}")
|
||||
|
@ -64,4 +99,12 @@ defmodule Wayfarer.Server.Plug do
|
|||
do: split_targets_and_algorithms(tail, [target | targets], algorithm)
|
||||
|
||||
defp split_targets_and_algorithms([], targets, algorithm), do: {:ok, targets, algorithm}
|
||||
|
||||
defp get_transport(%{adapter: {Bandit.Adapter, %{transport: %Bandit.HTTP1.Socket{}}}}),
|
||||
do: :http1
|
||||
|
||||
defp get_transport(%{adapter: {Bandit.Adapter, %{transport: %Bandit.HTTP2.Stream{}}}}),
|
||||
do: :http2
|
||||
|
||||
defp get_transport(_), do: :unknown
|
||||
end
|
||||
|
|
|
@ -6,7 +6,7 @@ defmodule Wayfarer.Server.Proxy do
|
|||
|
||||
alias Mint.{HTTP, HTTP1, HTTP2}
|
||||
alias Plug.Conn
|
||||
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections}
|
||||
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections, Telemetry}
|
||||
require Logger
|
||||
|
||||
@connect_timeout 5_000
|
||||
|
@ -22,16 +22,18 @@ defmodule Wayfarer.Server.Proxy do
|
|||
:ok <- TotalConnections.proxy_connect(target) do
|
||||
handle_request(mint, conn, target)
|
||||
else
|
||||
error -> handle_error(error, conn, target)
|
||||
error ->
|
||||
conn
|
||||
|> Telemetry.request_exception(:error, error)
|
||||
|> handle_error(error, target)
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_request(mint, conn, {proto, _, _, _}) do
|
||||
with ["Upgrade"] <- Conn.get_req_header(conn, "connection"),
|
||||
["websocket"] <- Conn.get_req_header(conn, "upgrade") do
|
||||
if http1?(conn) && connection_wants_upgrade?(conn) && upgrade_is_websocket?(conn) do
|
||||
handle_websocket_request(mint, conn, proto)
|
||||
else
|
||||
_ -> handle_http_request(mint, conn)
|
||||
handle_http_request(mint, conn)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -40,13 +42,14 @@ defmodule Wayfarer.Server.Proxy do
|
|||
{:ok, mint, conn} <- stream_request_body(conn, mint, req),
|
||||
{:ok, conn, _mint} <- proxy_responses(conn, mint, req) do
|
||||
conn
|
||||
|> Telemetry.request_stop()
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_websocket_request(mint, conn, proto) do
|
||||
WebSockAdapter.upgrade(conn, Wayfarer.Server.WebSocketProxy, {mint, conn, proto},
|
||||
compress: true
|
||||
)
|
||||
conn
|
||||
|> WebSockAdapter.upgrade(Wayfarer.Server.WebSocketProxy, {mint, conn, proto}, compress: true)
|
||||
|> Telemetry.request_upgraded()
|
||||
end
|
||||
|
||||
defp connect(conn, {:ws, address, port, transport}),
|
||||
|
@ -64,10 +67,10 @@ defmodule Wayfarer.Server.Proxy do
|
|||
defp connect(conn, {scheme, address, port, :auto}) when is_tuple(address),
|
||||
do: HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout)
|
||||
|
||||
defp handle_error({:error, _, reason}, conn, target),
|
||||
do: handle_error({:error, reason}, conn, target)
|
||||
defp handle_error(conn, {:error, _, reason}, target),
|
||||
do: handle_error(conn, {:error, reason}, target)
|
||||
|
||||
defp handle_error({:error, reason}, conn, {scheme, address, port, transport}) do
|
||||
defp handle_error(conn, {:error, reason}, {scheme, address, port, transport}) do
|
||||
Logger.error(fn ->
|
||||
phase = connection_phase(conn)
|
||||
ip = :inet.ntoa(address)
|
||||
|
@ -87,7 +90,54 @@ defmodule Wayfarer.Server.Proxy do
|
|||
end
|
||||
end
|
||||
|
||||
defp connection_phase(nil), do: "connect"
|
||||
defp http1?(%{private: %{wayfarer: %{transport: :http1}}}), do: true
|
||||
defp http1?(_), do: false
|
||||
|
||||
defp connection_wants_upgrade?(conn) do
|
||||
case Conn.get_req_header(conn, "connection") do
|
||||
["Upgrade"] ->
|
||||
true
|
||||
|
||||
["upgrade"] ->
|
||||
true
|
||||
|
||||
[] ->
|
||||
false
|
||||
|
||||
maybe ->
|
||||
maybe
|
||||
|> Enum.flat_map(&String.split(&1, ~r/,\s*/))
|
||||
|> Enum.map(fn chunk ->
|
||||
chunk
|
||||
|> String.trim()
|
||||
|> String.downcase()
|
||||
end)
|
||||
|> Enum.member?("upgrade")
|
||||
end
|
||||
end
|
||||
|
||||
defp upgrade_is_websocket?(conn) do
|
||||
case Conn.get_req_header(conn, "upgrade") do
|
||||
["websocket"] ->
|
||||
true
|
||||
|
||||
[] ->
|
||||
false
|
||||
|
||||
maybe ->
|
||||
maybe
|
||||
|> Enum.flat_map(&String.split(&1, ~r/,\s*/))
|
||||
|> Enum.map(fn chunk ->
|
||||
chunk
|
||||
|> String.trim()
|
||||
|> String.split("/")
|
||||
|> hd()
|
||||
|> String.downcase()
|
||||
end)
|
||||
|> Enum.member?("websocket")
|
||||
end
|
||||
end
|
||||
|
||||
defp connection_phase(conn) when conn.state in [:chunked, :upgraded], do: "stream"
|
||||
defp connection_phase(conn) when conn.halted, do: "done"
|
||||
defp connection_phase(conn) when conn.state == :sent, do: "done"
|
||||
|
@ -113,7 +163,7 @@ defmodule Wayfarer.Server.Proxy do
|
|||
proxy_responses(conn, mint, req)
|
||||
|
||||
{:ok, mint, responses} ->
|
||||
handle_responses(responses, conn, mint, req)
|
||||
handle_responses(conn, responses, mint, req)
|
||||
|
||||
{:error, _, reason, _} ->
|
||||
{:error, reason}
|
||||
|
@ -123,62 +173,82 @@ defmodule Wayfarer.Server.Proxy do
|
|||
end
|
||||
end
|
||||
|
||||
defp handle_responses([], conn, mint, req), do: proxy_responses(conn, mint, req)
|
||||
defp handle_responses(conn, [], mint, req), do: proxy_responses(conn, mint, req)
|
||||
|
||||
defp handle_responses([{:status, req, status} | responses], conn, mint, req),
|
||||
do: handle_responses(responses, Conn.put_status(conn, status), mint, req)
|
||||
|
||||
defp handle_responses([{:headers, req, headers} | responses], conn, mint, req) do
|
||||
conn =
|
||||
headers
|
||||
|> Enum.reduce(conn, &Conn.put_resp_header(&2, elem(&1, 0), elem(&1, 1)))
|
||||
|
||||
handle_responses(responses, conn, mint, req)
|
||||
defp handle_responses(conn, [{:status, req, status} | responses], mint, req) do
|
||||
conn
|
||||
|> Conn.put_status(status)
|
||||
|> Telemetry.request_received_status(status)
|
||||
|> handle_responses(responses, mint, req)
|
||||
end
|
||||
|
||||
defp handle_responses([{:data, req, body} | responses], conn, mint, req)
|
||||
defp handle_responses(conn, [{:headers, req, headers} | responses], mint, req) do
|
||||
headers
|
||||
|> Enum.reduce(conn, fn {header_name, header_value}, conn ->
|
||||
conn
|
||||
|> Conn.put_resp_header(header_name, header_value)
|
||||
end)
|
||||
|> handle_responses(responses, mint, req)
|
||||
end
|
||||
|
||||
defp handle_responses(conn, [{:data, req, body} | responses], mint, req)
|
||||
when conn.state == :chunked do
|
||||
case Conn.chunk(conn, body) do
|
||||
{:ok, conn} -> handle_responses(responses, conn, mint, req)
|
||||
{:error, reason} -> {:error, conn, reason}
|
||||
{:ok, conn} ->
|
||||
body_size = byte_size(body)
|
||||
|
||||
conn
|
||||
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|
||||
|> Telemetry.request_resp_body_chunk(body_size)
|
||||
|> handle_responses(responses, mint, req)
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, conn, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_responses([{:data, req, body} | responses], conn, mint, req) do
|
||||
defp handle_responses(conn, [{:data, req, body} | responses], mint, req) do
|
||||
# We need to check here for a content-length or transfer encoding header and
|
||||
# deal with it. This should be refactored out into a proxy state rather
|
||||
# than using the conn as our state.
|
||||
|
||||
body_size = byte_size(body)
|
||||
|
||||
case Conn.get_resp_header(conn, "content-length") do
|
||||
[] ->
|
||||
conn = Conn.send_chunked(conn, conn.status)
|
||||
handle_responses([{:data, req, body} | responses], conn, mint, req)
|
||||
conn
|
||||
|> Conn.send_chunked(conn.status)
|
||||
|> Telemetry.request_resp_started()
|
||||
|> handle_responses([{:data, req, body} | responses], mint, req)
|
||||
|
||||
[length] ->
|
||||
if String.to_integer(length) == byte_size(body) do
|
||||
if String.to_integer(length) == body_size do
|
||||
conn =
|
||||
conn
|
||||
|> Conn.delete_resp_header("content-length")
|
||||
|> Conn.send_resp(conn.status, body)
|
||||
|> Telemetry.request_resp_started()
|
||||
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|
||||
|> Telemetry.request_resp_body_chunk(body_size)
|
||||
|> Conn.halt()
|
||||
|
||||
{:ok, conn, mint}
|
||||
else
|
||||
conn =
|
||||
conn
|
||||
|> Conn.delete_resp_header("content-length")
|
||||
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|
||||
|> Telemetry.request_resp_body_chunk(body_size)
|
||||
|> Conn.send_chunked(conn.status)
|
||||
|
||||
handle_responses([{:data, req, body} | responses], conn, mint, req)
|
||||
handle_responses(conn, [{:data, req, body} | responses], mint, req)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_responses([{:done, req} | _], conn, mint, req) do
|
||||
defp handle_responses(conn, [{:done, req} | _], mint, req) do
|
||||
{:ok, Conn.halt(conn), mint}
|
||||
end
|
||||
|
||||
defp handle_responses([{:error, req, reason} | _], conn, _mint, req), do: {:error, conn, reason}
|
||||
defp handle_responses(conn, [{:error, req, reason} | _], _mint, req), do: {:error, conn, reason}
|
||||
|
||||
defp send_request(conn, mint) do
|
||||
request_path =
|
||||
|
@ -199,15 +269,36 @@ defmodule Wayfarer.Server.Proxy do
|
|||
|
||||
defp stream_request_body(conn, mint, req) do
|
||||
case Conn.read_body(conn) do
|
||||
{:ok, <<>>, conn} ->
|
||||
with {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
|
||||
conn =
|
||||
conn
|
||||
|> Telemetry.set_metrics(%{req_body_bytes: 0, req_body_chunks: 0})
|
||||
|
||||
{:ok, mint, conn}
|
||||
end
|
||||
|
||||
{:ok, chunk, conn} ->
|
||||
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk),
|
||||
{:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
|
||||
chunk_size = byte_size(chunk)
|
||||
|
||||
conn =
|
||||
conn
|
||||
|> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|
||||
|> Telemetry.request_req_body_chunk(chunk_size)
|
||||
|
||||
{:ok, mint, conn}
|
||||
end
|
||||
|
||||
{:more, chunk, conn} ->
|
||||
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do
|
||||
stream_request_body(conn, mint, req)
|
||||
chunk_size = byte_size(chunk)
|
||||
|
||||
conn
|
||||
|> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|
||||
|> Telemetry.request_req_body_chunk(chunk_size)
|
||||
|> stream_request_body(mint, req)
|
||||
end
|
||||
|
||||
{:error, reason} ->
|
||||
|
|
|
@ -10,6 +10,7 @@ defmodule Wayfarer.Server.WebSocketProxy do
|
|||
|
||||
alias Mint.WebSocket
|
||||
alias Plug.Conn
|
||||
alias Wayfarer.Telemetry
|
||||
require Logger
|
||||
|
||||
@default_opts [extensions: [WebSocket.PerMessageDeflate]]
|
||||
|
@ -25,7 +26,7 @@ defmodule Wayfarer.Server.WebSocketProxy do
|
|||
end
|
||||
|
||||
case WebSocket.upgrade(proto, mint, request_path, proxy_headers(conn), @default_opts) do
|
||||
{:ok, mint, ref} -> {:ok, %{mint: mint, ref: ref, status: :init, buffer: []}}
|
||||
{:ok, mint, ref} -> {:ok, %{mint: mint, ref: ref, status: :init, buffer: [], conn: conn}}
|
||||
{:error, _mint, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
@ -35,27 +36,39 @@ defmodule Wayfarer.Server.WebSocketProxy do
|
|||
|
||||
@doc false
|
||||
@impl true
|
||||
def handle_control({payload, [{:opcode, :ping}]}, state) do
|
||||
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {:ping, payload}),
|
||||
def handle_control({frame, [{:opcode, :ping}]}, state) do
|
||||
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {:ping, frame}),
|
||||
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
|
||||
{:ok, %{state | websocket: websocket, mint: mint}}
|
||||
conn = request_client_frame(state.conn, {:ping, frame})
|
||||
|
||||
{:ok, %{state | websocket: websocket, mint: mint, conn: conn}}
|
||||
else
|
||||
error -> handle_error(error, state)
|
||||
end
|
||||
end
|
||||
|
||||
def handle_control(_, state), do: {:ok, state}
|
||||
def handle_control({frame, [{:opcode, frame_type}]}, state) do
|
||||
conn = request_client_frame(state.conn, {frame_type, frame})
|
||||
|
||||
{:ok, %{state | conn: conn}}
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def handle_in({payload, [{:opcode, frame_type}]}, state) when state.status == :init do
|
||||
{:ok, %{state | buffer: [{frame_type, payload} | state.buffer]}}
|
||||
frame = {frame_type, payload}
|
||||
buffer = [frame | state.buffer]
|
||||
conn = request_client_frame(state.conn, frame)
|
||||
|
||||
{:ok, %{state | buffer: buffer, conn: conn}}
|
||||
end
|
||||
|
||||
def handle_in({payload, [{:opcode, frame_type}]}, state) do
|
||||
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {frame_type, payload}),
|
||||
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
|
||||
{:ok, %{state | websocket: websocket, mint: mint}}
|
||||
conn = request_client_frame(state.conn, {frame_type, payload})
|
||||
|
||||
{:ok, %{state | websocket: websocket, mint: mint, conn: conn}}
|
||||
else
|
||||
error -> handle_error(error, state)
|
||||
end
|
||||
|
@ -150,7 +163,9 @@ defmodule Wayfarer.Server.WebSocketProxy do
|
|||
defp do_empty_buffer([head | tail], state) do
|
||||
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, head),
|
||||
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
|
||||
do_empty_buffer(tail, %{state | websocket: websocket, mint: mint})
|
||||
conn = request_client_frame(state.conn, head)
|
||||
|
||||
do_empty_buffer(tail, %{state | websocket: websocket, mint: mint, conn: conn})
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -181,15 +196,26 @@ defmodule Wayfarer.Server.WebSocketProxy do
|
|||
defp decode_frames(frames, state) do
|
||||
frames
|
||||
|> Enum.reduce_while({:ok, [], state}, fn frame, {:ok, messages, state} ->
|
||||
case WebSocket.decode(state.websocket, frame) do
|
||||
{:ok, websocket, frames} when is_list(frames) ->
|
||||
messages = Enum.concat(messages, frames)
|
||||
{:cont, {:ok, messages, %{state | websocket: websocket}}}
|
||||
|
||||
{:error, websocket, reason} ->
|
||||
{:halt, {:error, reason, %{state | websocket: websocket}}}
|
||||
case decode_frame(state, frame) do
|
||||
{:ok, new_messages, state} -> {:cont, {:ok, [new_messages, messages], state}}
|
||||
{:error, reason, state} -> {:halt, {:error, reason, state}}
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
{:ok, messages, state} -> {:ok, List.flatten(messages), state}
|
||||
{:error, reason, state} -> {:error, reason, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp decode_frame(state, frame) do
|
||||
case WebSocket.decode(state.websocket, frame) do
|
||||
{:ok, websocket, frames} when is_list(frames) ->
|
||||
conn = Enum.reduce(frames, state.conn, &request_server_frame(&2, &1))
|
||||
{:ok, frames, %{state | websocket: websocket, conn: conn}}
|
||||
|
||||
{:error, websocket, reason} ->
|
||||
{:error, reason, %{state | websocket: websocket}}
|
||||
end
|
||||
end
|
||||
|
||||
defp response_for_messages([], state), do: {:ok, state}
|
||||
|
@ -200,4 +226,26 @@ defmodule Wayfarer.Server.WebSocketProxy do
|
|||
{[{:close, code, _} | _], messages} -> {:stop, :normal, code, messages, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp request_client_frame(conn, {frame_type, frame}) do
|
||||
frame_size = byte_size(frame)
|
||||
|
||||
conn
|
||||
|> Telemetry.increment_metrics(%{
|
||||
client_frame_bytes: frame_size,
|
||||
client_frame_count: 1
|
||||
})
|
||||
|> Telemetry.request_client_frame(frame_size, frame_type)
|
||||
end
|
||||
|
||||
defp request_server_frame(conn, {frame_type, frame}) do
|
||||
frame_size = byte_size(frame)
|
||||
|
||||
conn
|
||||
|> Telemetry.increment_metrics(%{
|
||||
server_frame_bytes: frame_size,
|
||||
server_frame_count: 1
|
||||
})
|
||||
|> Telemetry.request_server_frame(frame_size, frame_type)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,22 +5,47 @@ defmodule Wayfarer.Target.Check do
|
|||
|
||||
use GenServer, restart: :transient
|
||||
alias Mint.{HTTP, HTTP1, HTTP2, WebSocket}
|
||||
alias Wayfarer.{Target, Target.TotalConnections}
|
||||
alias Wayfarer.{Target, Target.TotalConnections, Telemetry}
|
||||
require Logger
|
||||
|
||||
@type state :: %{
|
||||
conn: struct(),
|
||||
req: reference(),
|
||||
scheme: :http | :https | :ws | :wss,
|
||||
address: :inet.ip_address(),
|
||||
port: :socket.port_number(),
|
||||
uri: URI.t(),
|
||||
ref: any,
|
||||
method: String.t(),
|
||||
headers: [{String.t(), String.t()}],
|
||||
hostname: String.t(),
|
||||
transport: :http1 | :http2 | :auto,
|
||||
span: map
|
||||
}
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def init(state), do: {:ok, state, {:continue, :start_check}}
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
@spec handle_continue(:start_check, state) :: {:noreply, state, timeout} | {:stop, :normal, nil}
|
||||
def handle_continue(:start_check, state) do
|
||||
with {:ok, conn} <- connect(state),
|
||||
{:ok, conn, req} <- request(Map.put(state, :conn, conn)) do
|
||||
state =
|
||||
state
|
||||
|> Map.merge(%{conn: conn, req: req})
|
||||
|> Map.put(:span, %{
|
||||
metadata: %{
|
||||
target: %{scheme: state.scheme, address: state.address, port: state.port},
|
||||
method: state.method,
|
||||
uri: state.uri,
|
||||
hostname: state.hostname,
|
||||
telemetry_span_context: make_ref()
|
||||
}
|
||||
})
|
||||
|> Telemetry.health_check_start()
|
||||
|
||||
with {:ok, state} <- connect(state),
|
||||
{:ok, state} <- request(state) do
|
||||
{:noreply, state, state.response_timeout}
|
||||
else
|
||||
{:error, reason} ->
|
||||
|
@ -33,7 +58,7 @@ defmodule Wayfarer.Target.Check do
|
|||
|
||||
@doc false
|
||||
@impl true
|
||||
def handle_info(:timeout, state), do: check_failed(state, "request timeout expired.")
|
||||
def handle_info(:timeout, state), do: check_failed(state, :timeout)
|
||||
|
||||
def handle_info(message, state) do
|
||||
with {:ok, conn, responses} <- WebSocket.stream(state.conn, message),
|
||||
|
@ -44,13 +69,14 @@ defmodule Wayfarer.Target.Check do
|
|||
{:ok, status} <- get_status_response(conn, responses) do
|
||||
if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do
|
||||
Target.check_passed(state.ref)
|
||||
Telemetry.health_check_pass(state, status)
|
||||
{:stop, :normal, nil}
|
||||
else
|
||||
check_failed(state, "received #{status} status code")
|
||||
end
|
||||
else
|
||||
{:continue, conn} ->
|
||||
{:noreply, %{state | conn: conn}}
|
||||
{:noreply, Map.put(state, :conn, conn)}
|
||||
|
||||
:unknown ->
|
||||
check_failed(state, "Received unknown message: `#{inspect(message)}`")
|
||||
|
@ -67,53 +93,86 @@ defmodule Wayfarer.Target.Check do
|
|||
do: connect(%{state | scheme: :https})
|
||||
|
||||
defp connect(state) when state.transport == :http1 do
|
||||
with {:ok, conn} <-
|
||||
HTTP1.connect(state.scheme, state.address, state.port,
|
||||
timeout: state.connect_timeout,
|
||||
hostname: state.hostname
|
||||
)
|
||||
) do
|
||||
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http1)}
|
||||
end
|
||||
end
|
||||
|
||||
defp connect(state) when state.transport == :http2 do
|
||||
with {:ok, conn} <-
|
||||
HTTP2.connect(state.scheme, state.address, state.port,
|
||||
timeout: state.connect_timeout,
|
||||
hostname: state.hostname
|
||||
)
|
||||
) do
|
||||
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http2)}
|
||||
end
|
||||
end
|
||||
|
||||
defp connect(state) do
|
||||
with {:ok, conn} <-
|
||||
HTTP.connect(state.scheme, state.address, state.port,
|
||||
timeout: state.connect_timeout,
|
||||
hostname: state.hostname
|
||||
)
|
||||
) do
|
||||
transport =
|
||||
case conn do
|
||||
%Mint.HTTP1{} -> :http1
|
||||
%Mint.HTTP2{} -> :http2
|
||||
end
|
||||
|
||||
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), transport)}
|
||||
end
|
||||
end
|
||||
|
||||
defp request(state) when state.scheme in [:ws, :wss] do
|
||||
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, [])
|
||||
with {:ok, conn, req} <-
|
||||
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) do
|
||||
state = Map.merge(state, %{conn: conn, req: req})
|
||||
|
||||
{:ok, Telemetry.health_check_request(state)}
|
||||
end
|
||||
end
|
||||
|
||||
defp request(state) do
|
||||
HTTP.request(state.conn, state.method, state.path, state.headers, nil)
|
||||
with {:ok, conn, req} <-
|
||||
HTTP.request(state.conn, state.method, state.path, state.headers, nil) do
|
||||
state = Map.merge(state, %{conn: conn, req: req})
|
||||
|
||||
{:ok, Telemetry.health_check_request(state)}
|
||||
end
|
||||
end
|
||||
|
||||
defp check_failed(state, reason) when is_binary(reason) do
|
||||
Target.check_failed(state.ref)
|
||||
Logger.warning("Health check failed for #{state.method} #{state.uri}: #{reason}.")
|
||||
Telemetry.health_check_fail(state, reason)
|
||||
|
||||
Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{reason}." end)
|
||||
{:stop, :normal, nil}
|
||||
end
|
||||
|
||||
defp check_failed(state, exception) when is_exception(exception) do
|
||||
Target.check_failed(state.ref)
|
||||
Telemetry.health_check_fail(state, exception)
|
||||
|
||||
Logger.warning(
|
||||
Logger.warning(fn ->
|
||||
"Health check failed for #{state.method} #{state.uri}: #{Exception.message(exception)}"
|
||||
)
|
||||
end)
|
||||
|
||||
{:stop, :normal, nil}
|
||||
end
|
||||
|
||||
defp check_failed(state, reason) do
|
||||
Target.check_failed(state.ref)
|
||||
Logger.warning("Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`")
|
||||
Telemetry.health_check_fail(state, reason)
|
||||
|
||||
Logger.warning(fn ->
|
||||
"Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`"
|
||||
end)
|
||||
|
||||
{:stop, :normal, nil}
|
||||
end
|
||||
|
||||
|
|
764
lib/wayfarer/telemetry.ex
Normal file
764
lib/wayfarer/telemetry.ex
Normal file
|
@ -0,0 +1,764 @@
|
|||
defmodule Wayfarer.Telemetry do
|
||||
@moduledoc """
|
||||
Wayfarer emits a number of telemetry events and spans on top of the excellent
|
||||
telemetry events emitted by `Bandit.Telemetry`.
|
||||
"""
|
||||
|
||||
alias Plug.Conn
|
||||
alias Wayfarer.{Router, Target.Check, Target.Selector}
|
||||
|
||||
@typedoc """
|
||||
Information about the target a request has been routed to.
|
||||
"""
|
||||
@type target ::
|
||||
%{
|
||||
scheme: :http | :https | :ws | :wss,
|
||||
address: :inet.ip_address(),
|
||||
port: :socket.port_number()
|
||||
}
|
||||
| %{scheme: :plug, module: module, options: any}
|
||||
|
||||
@typedoc """
|
||||
Information about the listener that received the request.
|
||||
"""
|
||||
@type listener ::
|
||||
%{
|
||||
scheme: :http | :https,
|
||||
module: module(),
|
||||
port: :socket.port_number(),
|
||||
address: :inet.ip_address()
|
||||
}
|
||||
|
||||
@typedoc """
|
||||
The time that the event was emitted, in `:native` time units.
|
||||
|
||||
This is sources from `System.monotonic_time/0` which has some caveats but in
|
||||
general is better for calculating durations as it should never go backwards.
|
||||
"""
|
||||
@type monotonic_time :: integer()
|
||||
|
||||
@typedoc """
|
||||
The time passed since the beginning of the span, in `:native` time units.
|
||||
|
||||
The difference between the current `monotonic_time` and the first
|
||||
`monotonic_time` at the start of the span.
|
||||
"""
|
||||
@type duration :: integer()
|
||||
|
||||
@typedoc "The HTTP protocol version of the request"
|
||||
@type transport :: :http1 | :http2
|
||||
|
||||
@typedoc "A unique identifier for the span"
|
||||
@type telemetry_span_context :: reference()
|
||||
|
||||
@typedoc "A convenience type for describing telemetry events"
|
||||
@opaque event(measurements, metadata) :: {measurements, metadata}
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :start]` event.
|
||||
|
||||
This event signals the start of a request span tracking a client request to
|
||||
completion.
|
||||
|
||||
You can use the `telemetry_span_context` metadata value to correlate
|
||||
subsequent events within the same span.
|
||||
"""
|
||||
@type request_start ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :routed]` event.
|
||||
|
||||
This event signals that the routing process has completed and a target has
|
||||
been chosen to serve the request.
|
||||
"""
|
||||
@type request_routed ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :exception]` event.
|
||||
|
||||
This event signals that something went wrong while processing the event. You
|
||||
will likely still receive other events (eg `:stop`) for this span however.
|
||||
"""
|
||||
@type request_exception ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
optional(:target) => target(),
|
||||
optional(:algorithm) => Selector.algorithm(),
|
||||
required(:kind) => :throw | :exit | :error | :exception,
|
||||
required(:reason) => any,
|
||||
required(:stacktrace) => Exception.stacktrace()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :stop]` event.
|
||||
|
||||
This event signals that the request has completed.
|
||||
|
||||
The measurements will contain any incrementing counters accumulated during the
|
||||
course of the request.
|
||||
"""
|
||||
@type request_stop ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
optional(:req_body_bytes) => non_neg_integer(),
|
||||
optional(:resp_body_bytes) => non_neg_integer(),
|
||||
optional(:client_frame_bytes) => non_neg_integer(),
|
||||
optional(:client_frame_count) => non_neg_integer(),
|
||||
optional(:server_frame_bytes) => non_neg_integer(),
|
||||
optional(:server_frame_count) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
optional(:target) => target(),
|
||||
optional(:algorithm) => Selector.algorithm(),
|
||||
optional(:status) => nil | 100..599,
|
||||
optional(:kind) => :throw | :exit | :error | :exception,
|
||||
optional(:reason) => any,
|
||||
optional(:stacktrace) => Exception.stacktrace()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :received_status]` event.
|
||||
|
||||
This event signals that an HTTP status code has been received from the
|
||||
upstream target.
|
||||
"""
|
||||
@type request_received_status ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
optional(:req_body_bytes) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :req_body_chunk]` event.
|
||||
|
||||
This event is emitted while streaming the request body from the client to the
|
||||
target. Under the hood `Plug.Conn.read_body/2` is being called with the
|
||||
default options, meaning that each chunk is likely to be up to 8MB in size.
|
||||
|
||||
If there is no request body then this event will not be emitted and the
|
||||
`req_body_bytes` and `req_body_chunks` counters will both be set to zero for
|
||||
this request.
|
||||
"""
|
||||
@type request_req_body_chunk ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
required(:req_body_bytes) => non_neg_integer(),
|
||||
required(:req_body_chunks) => non_neg_integer(),
|
||||
required(:chunk_bytes) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :resp_started]` event.
|
||||
|
||||
This event indicates that the HTTP status and headers have been received from
|
||||
the target and the response will now start being sent to the target.
|
||||
"""
|
||||
@type request_resp_started ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
optional(:req_body_bytes) => non_neg_integer(),
|
||||
optional(:req_body_chunks) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :resp_body_chunk]` event.
|
||||
|
||||
This event is emitted every time a chunk of response body is received from the
|
||||
target for streaming to the client. Under the hood, these are emitted every
|
||||
time `Mint.HTTP.stream/2` returns a data frame.
|
||||
"""
|
||||
@type request_resp_body_chunk ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
optional(:req_body_bytes) => non_neg_integer(),
|
||||
optional(:req_body_chunks) => non_neg_integer(),
|
||||
required(:resp_body_bytes) => non_neg_integer(),
|
||||
required(:resp_body_chunks) => non_neg_integer(),
|
||||
required(:chunk_bytes) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :upgraded]` event.
|
||||
|
||||
This event is emitted when a client connection is upgraded to a WebSocket
|
||||
connection.
|
||||
"""
|
||||
@type request_upgraded ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :client_frame]` event.
|
||||
|
||||
This event is emitted any time a WebSocket frame is received from the client
|
||||
for transmission to the target.
|
||||
"""
|
||||
@type request_client_frame ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:frame_size) => non_neg_integer(),
|
||||
required(:client_frame_bytes) => non_neg_integer(),
|
||||
required(:client_frame_count) => non_neg_integer(),
|
||||
optional(:server_frame_bytes) => non_neg_integer(),
|
||||
optional(:server_frame_count) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599,
|
||||
required(:opcode) => :text | :binary | :ping | :pong | :close
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :server_frame]` event.
|
||||
|
||||
This event is emitted any time a WebSocket frame is received from the target
|
||||
for transmission to the client.
|
||||
"""
|
||||
@type request_server_frame ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:frame_size) => non_neg_integer(),
|
||||
required(:server_frame_bytes) => non_neg_integer(),
|
||||
required(:server_frame_count) => non_neg_integer(),
|
||||
optional(:client_frame_bytes) => non_neg_integer(),
|
||||
optional(:client_frame_count) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
All the event types that make up the `[:wayfarer, :request, :*]` span.
|
||||
"""
|
||||
@type request_span ::
|
||||
request_start
|
||||
| request_routed
|
||||
| request_exception
|
||||
| request_stop
|
||||
| request_received_status
|
||||
| request_req_body_chunk
|
||||
| request_resp_started
|
||||
| request_resp_body_chunk
|
||||
| request_upgraded
|
||||
| request_client_frame
|
||||
| request_server_frame
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :start]` event.
|
||||
|
||||
This event signals the start of a health check span.
|
||||
|
||||
You can use the `telemetry_span_context` metadata value to correlate
|
||||
subsequent events within the same span.
|
||||
"""
|
||||
@type health_check_start ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :connect]` event.
|
||||
|
||||
This event signals that the outgoing TCP connection has been made to the
|
||||
target.
|
||||
"""
|
||||
@type health_check_connect ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration()
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :request]` event.
|
||||
|
||||
This event signals that the HTTP or WebSocket request has been sent.
|
||||
"""
|
||||
@type health_check_request ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration()
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :pass]` event.
|
||||
|
||||
This event signals that the HTTP status code returned by the target matches
|
||||
one of the configured success codes.
|
||||
|
||||
It also signals the end of the span.
|
||||
"""
|
||||
@type health_check_pass ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration(),
|
||||
required(:status) => 100..599
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :fail]` event.
|
||||
|
||||
This event signals that the HTTP status code returned by the target did not
|
||||
match any of the configured success codes.
|
||||
|
||||
It also signals the end of the span.
|
||||
"""
|
||||
@type health_check_fail ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration(),
|
||||
required(:reason) => any
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
All the event types that make up the `[:wayfarer, :health_check, :*]` span.
|
||||
"""
|
||||
@type health_check_span ::
|
||||
health_check_start | health_check_request | health_check_pass | health_check_fail
|
||||
|
||||
@typedoc """
|
||||
All the spans that can be emitted by Wayfarer.
|
||||
"""
|
||||
@type spans :: request_span | health_check_span
|
||||
|
||||
@doc false
|
||||
@spec request_start(Conn.t()) :: Conn.t()
|
||||
def request_start(conn) do
|
||||
telemetry_span_context = make_ref()
|
||||
|
||||
metadata =
|
||||
conn
|
||||
|> Map.get(:private, %{})
|
||||
|> Map.get(:wayfarer, %{})
|
||||
|> Map.merge(%{telemetry_span_context: telemetry_span_context, conn: conn})
|
||||
|
||||
conn
|
||||
|> execute_request_span_event(:start, %{}, metadata)
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_routed(Conn.t(), Router.target(), Router.algorithm()) :: Conn.t()
|
||||
def request_routed(conn, target, algorithm) do
|
||||
target =
|
||||
case target do
|
||||
{scheme, address, port, _transport} -> %{scheme: scheme, address: address, port: port}
|
||||
{:plug, {module, opts}} -> %{scheme: :plug, module: module, options: opts}
|
||||
{:plug, module} -> %{scheme: :plug, module: module, options: []}
|
||||
end
|
||||
|
||||
conn
|
||||
|> execute_request_span_event(:routed, %{}, %{
|
||||
target: target,
|
||||
algorithm: algorithm
|
||||
})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_exception(Conn.t(), kind :: any, reason :: any, stacktrace :: list | nil) ::
|
||||
Conn.t()
|
||||
def request_exception(conn, kind, reason, stacktrace \\ nil) do
|
||||
conn
|
||||
|> execute_request_span_event(:exception, %{}, %{
|
||||
kind: kind,
|
||||
reason: reason,
|
||||
stacktrace: stacktrace || Process.info(self(), :current_stacktrace)
|
||||
})
|
||||
end
|
||||
|
||||
@doc false
|
||||
def request_stop(conn) do
|
||||
conn
|
||||
|> execute_request_span_event(:stop, %{status: conn.status}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_received_status(Conn.t(), non_neg_integer()) :: Conn.t()
|
||||
def request_received_status(conn, status) do
|
||||
conn
|
||||
|> execute_request_span_event(:received_status, %{status: status}, %{status: status})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_req_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
|
||||
def request_req_body_chunk(conn, chunk_bytes) do
|
||||
conn
|
||||
|> execute_request_span_event(:req_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_resp_started(Conn.t()) :: Conn.t()
|
||||
def request_resp_started(conn) do
|
||||
metric = %{status: conn.status}
|
||||
|
||||
conn
|
||||
|> execute_request_span_event(:resp_started, metric, metric)
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_resp_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
|
||||
def request_resp_body_chunk(conn, chunk_bytes) do
|
||||
conn
|
||||
|> execute_request_span_event(:resp_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_upgraded(Conn.t()) :: Conn.t()
|
||||
def request_upgraded(conn) do
|
||||
conn
|
||||
|> execute_request_span_event(:upgraded, %{}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_client_frame(Conn.t(), non_neg_integer(), atom) :: Conn.t()
|
||||
def request_client_frame(conn, bytes, opcode) do
|
||||
conn
|
||||
|> execute_request_span_event(:client_frame, %{frame_size: bytes}, %{opcode: opcode})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec request_server_frame(Conn.t(), non_neg_integer(), atom) :: Conn.t()
|
||||
def request_server_frame(conn, bytes, opcode) do
|
||||
conn
|
||||
|> execute_request_span_event(:server_frame, %{frame_size: bytes}, %{opcode: opcode})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec set_metrics(Conn.t(), %{atom => number}) :: Conn.t()
|
||||
def set_metrics(conn, metrics) do
|
||||
update_metrics(conn, &Map.merge(&1, metrics))
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec increment_metrics(Conn.t(), %{atom => number}) :: Conn.t()
|
||||
def increment_metrics(conn, to_increment) do
|
||||
update_metrics(conn, fn metrics ->
|
||||
Enum.reduce(to_increment, metrics, fn {metric_name, increment_by}, metrics ->
|
||||
Map.update(metrics, metric_name, increment_by, &(&1 + increment_by))
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_start(Check.state()) :: Check.state()
|
||||
def health_check_start(check) do
|
||||
execute_check_span_event(check, :start, %{}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_connect(Check.state(), atom) :: Check.state()
|
||||
def health_check_connect(check, transport) do
|
||||
execute_check_span_event(check, :connect, %{}, %{transport: transport})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_request(Check.state()) :: Check.state()
|
||||
def health_check_request(check) do
|
||||
execute_check_span_event(check, :request, %{}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_fail(Check.state(), any) :: Check.state()
|
||||
def health_check_fail(check, reason) do
|
||||
execute_check_span_event(check, :fail, %{}, %{reason: reason})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_pass(Check.state(), 100..599) :: Check.state()
|
||||
def health_check_pass(check, status) do
|
||||
execute_check_span_event(check, :pass, %{}, %{status: status})
|
||||
end
|
||||
|
||||
defp update_metrics(conn, callback) do
|
||||
private =
|
||||
conn
|
||||
|> Map.get(:private, %{})
|
||||
|> Map.get(:wayfarer, %{})
|
||||
|
||||
metrics =
|
||||
private
|
||||
|> Map.get(:metrics, %{})
|
||||
|> callback.()
|
||||
|
||||
private = Map.put(private, :metrics, metrics)
|
||||
|
||||
conn
|
||||
|> Conn.put_private(:wayfarer, private)
|
||||
end
|
||||
|
||||
defp execute_request_span_event(conn, event, measurements, metadata) do
|
||||
monotonic_time = System.monotonic_time()
|
||||
now = DateTime.utc_now()
|
||||
|
||||
private =
|
||||
conn
|
||||
|> Map.get(:private, %{})
|
||||
|> Map.get(:wayfarer, %{})
|
||||
|
||||
span_info =
|
||||
private
|
||||
|> Map.get(:request_span, %{})
|
||||
|
||||
metrics =
|
||||
private
|
||||
|> Map.get(:metrics, %{})
|
||||
|
||||
measurements =
|
||||
if Map.has_key?(span_info, :start_time) do
|
||||
%{
|
||||
monotonic_time: monotonic_time,
|
||||
duration: monotonic_time - span_info.start_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
else
|
||||
%{
|
||||
monotonic_time: monotonic_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
end
|
||||
|> Map.merge(metrics)
|
||||
|> Map.merge(measurements)
|
||||
|
||||
metadata =
|
||||
span_info
|
||||
|> Map.get(:metadata, %{})
|
||||
|> Map.merge(metadata)
|
||||
|
||||
:telemetry.execute([:wayfarer, :request, event], measurements, Map.put(metadata, :conn, conn))
|
||||
|
||||
span_info =
|
||||
span_info
|
||||
|> Map.put(:metadata, metadata)
|
||||
|> Map.put_new(:start_time, monotonic_time)
|
||||
|
||||
private =
|
||||
private
|
||||
|> Map.put(:request_span, span_info)
|
||||
|
||||
conn
|
||||
|> Conn.put_private(:wayfarer, private)
|
||||
end
|
||||
|
||||
defp execute_check_span_event(check, event, measurements, metadata) do
|
||||
monotonic_time = System.monotonic_time()
|
||||
now = DateTime.utc_now()
|
||||
span_info = Map.get(check, :span, %{})
|
||||
|
||||
metrics = Map.get(span_info, :metrics, %{})
|
||||
|
||||
measurements =
|
||||
if Map.has_key?(span_info, :start_time) do
|
||||
%{
|
||||
monotonic_time: monotonic_time,
|
||||
duration: monotonic_time - span_info.start_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
else
|
||||
%{
|
||||
monotonic_time: monotonic_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
end
|
||||
|> Map.merge(metrics)
|
||||
|> Map.merge(measurements)
|
||||
|
||||
metadata =
|
||||
span_info
|
||||
|> Map.get(:metadata, %{})
|
||||
|> Map.merge(metadata)
|
||||
|
||||
:telemetry.execute([:wayfarer, :health_check, event], measurements, metadata)
|
||||
|
||||
span_info =
|
||||
span_info
|
||||
|> Map.put(:metadata, metadata)
|
||||
|> Map.put_new(:start_time, monotonic_time)
|
||||
|
||||
Map.put(check, :span, span_info)
|
||||
end
|
||||
end
|
|
@ -73,7 +73,7 @@ defmodule Wayfarer.Server.ProxyTest do
|
|||
|
||||
{:ok, :fake_conn}
|
||||
end)
|
||||
|> expect(:stream_request_body, 2, fn mint, _, _ -> {:ok, mint} end)
|
||||
|> expect(:stream_request_body, fn mint, _, _ -> {:ok, mint} end)
|
||||
|> expect(:request, fn mint, _, _, _, _ ->
|
||||
send(self(), :ignore)
|
||||
{:ok, mint, req}
|
||||
|
|
Loading…
Reference in a new issue