feat: Add request telemetry (#114)
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #114
Co-authored-by: James Harton <james@harton.nz>
Co-committed-by: James Harton <james@harton.nz>
This commit is contained in:
James Harton 2024-08-20 16:44:08 +12:00 committed by James Harton
parent dfe52b6e74
commit 07c41153f5
7 changed files with 1093 additions and 85 deletions

3
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,3 @@
{
"cSpell.words": ["ntoa"]
}

View file

@ -3,7 +3,7 @@ defmodule Wayfarer.Server.Plug do
Plug pipeline to handle inbound HTTP connections.
"""
alias Wayfarer.{Router, Server.Proxy, Target.Selector}
alias Wayfarer.{Router, Server.Proxy, Target.Selector, Telemetry}
require Logger
import Plug.Conn
@ -17,22 +17,57 @@ defmodule Wayfarer.Server.Plug do
@doc false
@impl true
@spec call(Plug.Conn.t(), map) :: Plug.Conn.t()
def call(conn, config) when is_atom(config.module) do
conn = put_private(conn, :wayfarer, %{listener: config})
def call(conn, config) do
transport = get_transport(conn)
conn
|> put_private(:wayfarer, %{listener: config, transport: transport})
|> Telemetry.request_start()
|> do_call(config)
end
defp do_call(conn, config) when is_atom(config.module) do
listener = {config.scheme, config.address, config.port}
with {:ok, targets} <- Router.find_healthy_targets(config.module, listener, conn.host),
{:ok, targets, algorithm} <- split_targets_and_algorithms(targets),
{:ok, target} <- Selector.choose(conn, targets, algorithm) do
do_proxy(conn, target)
conn
|> Telemetry.request_routed(target, algorithm)
|> do_proxy(target)
else
:error -> bad_gateway(conn)
{:error, reason} -> internal_error(conn, reason)
:error ->
conn
|> Telemetry.request_exception(:error, :target_not_found)
|> bad_gateway()
{:error, reason} ->
conn
|> Telemetry.request_exception(:error, reason)
|> internal_error(reason)
end
rescue
exception ->
conn
|> Telemetry.request_exception(:exception, exception, __STACKTRACE__)
|> internal_error(exception)
catch
reason ->
conn
|> Telemetry.request_exception(:throw, reason)
|> internal_error(reason)
kind, reason ->
conn
|> Telemetry.request_exception(kind, reason)
|> internal_error(reason)
end
def call(conn, _config), do: bad_gateway(conn)
defp do_call(conn, _config) do
conn
|> Telemetry.request_exception(:error, :unrecognised_request)
|> bad_gateway()
end
defp internal_error(conn, reason) do
Logger.error("Internal error when routing proxy request: #{inspect(reason)}")
@ -64,4 +99,12 @@ defmodule Wayfarer.Server.Plug do
do: split_targets_and_algorithms(tail, [target | targets], algorithm)
defp split_targets_and_algorithms([], targets, algorithm), do: {:ok, targets, algorithm}
defp get_transport(%{adapter: {Bandit.Adapter, %{transport: %Bandit.HTTP1.Socket{}}}}),
do: :http1
defp get_transport(%{adapter: {Bandit.Adapter, %{transport: %Bandit.HTTP2.Stream{}}}}),
do: :http2
defp get_transport(_), do: :unknown
end

View file

@ -6,7 +6,7 @@ defmodule Wayfarer.Server.Proxy do
alias Mint.{HTTP, HTTP1, HTTP2}
alias Plug.Conn
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections}
alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections, Telemetry}
require Logger
@connect_timeout 5_000
@ -22,16 +22,18 @@ defmodule Wayfarer.Server.Proxy do
:ok <- TotalConnections.proxy_connect(target) do
handle_request(mint, conn, target)
else
error -> handle_error(error, conn, target)
error ->
conn
|> Telemetry.request_exception(:error, error)
|> handle_error(error, target)
end
end
defp handle_request(mint, conn, {proto, _, _, _}) do
with ["Upgrade"] <- Conn.get_req_header(conn, "connection"),
["websocket"] <- Conn.get_req_header(conn, "upgrade") do
if http1?(conn) && connection_wants_upgrade?(conn) && upgrade_is_websocket?(conn) do
handle_websocket_request(mint, conn, proto)
else
_ -> handle_http_request(mint, conn)
handle_http_request(mint, conn)
end
end
@ -40,13 +42,14 @@ defmodule Wayfarer.Server.Proxy do
{:ok, mint, conn} <- stream_request_body(conn, mint, req),
{:ok, conn, _mint} <- proxy_responses(conn, mint, req) do
conn
|> Telemetry.request_stop()
end
end
defp handle_websocket_request(mint, conn, proto) do
WebSockAdapter.upgrade(conn, Wayfarer.Server.WebSocketProxy, {mint, conn, proto},
compress: true
)
conn
|> WebSockAdapter.upgrade(Wayfarer.Server.WebSocketProxy, {mint, conn, proto}, compress: true)
|> Telemetry.request_upgraded()
end
defp connect(conn, {:ws, address, port, transport}),
@ -64,10 +67,10 @@ defmodule Wayfarer.Server.Proxy do
defp connect(conn, {scheme, address, port, :auto}) when is_tuple(address),
do: HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout)
defp handle_error({:error, _, reason}, conn, target),
do: handle_error({:error, reason}, conn, target)
defp handle_error(conn, {:error, _, reason}, target),
do: handle_error(conn, {:error, reason}, target)
defp handle_error({:error, reason}, conn, {scheme, address, port, transport}) do
defp handle_error(conn, {:error, reason}, {scheme, address, port, transport}) do
Logger.error(fn ->
phase = connection_phase(conn)
ip = :inet.ntoa(address)
@ -87,7 +90,54 @@ defmodule Wayfarer.Server.Proxy do
end
end
defp connection_phase(nil), do: "connect"
defp http1?(%{private: %{wayfarer: %{transport: :http1}}}), do: true
defp http1?(_), do: false
defp connection_wants_upgrade?(conn) do
case Conn.get_req_header(conn, "connection") do
["Upgrade"] ->
true
["upgrade"] ->
true
[] ->
false
maybe ->
maybe
|> Enum.flat_map(&String.split(&1, ~r/,\s*/))
|> Enum.map(fn chunk ->
chunk
|> String.trim()
|> String.downcase()
end)
|> Enum.member?("upgrade")
end
end
defp upgrade_is_websocket?(conn) do
case Conn.get_req_header(conn, "upgrade") do
["websocket"] ->
true
[] ->
false
maybe ->
maybe
|> Enum.flat_map(&String.split(&1, ~r/,\s*/))
|> Enum.map(fn chunk ->
chunk
|> String.trim()
|> String.split("/")
|> hd()
|> String.downcase()
end)
|> Enum.member?("websocket")
end
end
defp connection_phase(conn) when conn.state in [:chunked, :upgraded], do: "stream"
defp connection_phase(conn) when conn.halted, do: "done"
defp connection_phase(conn) when conn.state == :sent, do: "done"
@ -113,7 +163,7 @@ defmodule Wayfarer.Server.Proxy do
proxy_responses(conn, mint, req)
{:ok, mint, responses} ->
handle_responses(responses, conn, mint, req)
handle_responses(conn, responses, mint, req)
{:error, _, reason, _} ->
{:error, reason}
@ -123,62 +173,82 @@ defmodule Wayfarer.Server.Proxy do
end
end
defp handle_responses([], conn, mint, req), do: proxy_responses(conn, mint, req)
defp handle_responses(conn, [], mint, req), do: proxy_responses(conn, mint, req)
defp handle_responses([{:status, req, status} | responses], conn, mint, req),
do: handle_responses(responses, Conn.put_status(conn, status), mint, req)
defp handle_responses([{:headers, req, headers} | responses], conn, mint, req) do
conn =
headers
|> Enum.reduce(conn, &Conn.put_resp_header(&2, elem(&1, 0), elem(&1, 1)))
handle_responses(responses, conn, mint, req)
defp handle_responses(conn, [{:status, req, status} | responses], mint, req) do
conn
|> Conn.put_status(status)
|> Telemetry.request_received_status(status)
|> handle_responses(responses, mint, req)
end
defp handle_responses([{:data, req, body} | responses], conn, mint, req)
defp handle_responses(conn, [{:headers, req, headers} | responses], mint, req) do
headers
|> Enum.reduce(conn, fn {header_name, header_value}, conn ->
conn
|> Conn.put_resp_header(header_name, header_value)
end)
|> handle_responses(responses, mint, req)
end
defp handle_responses(conn, [{:data, req, body} | responses], mint, req)
when conn.state == :chunked do
case Conn.chunk(conn, body) do
{:ok, conn} -> handle_responses(responses, conn, mint, req)
{:error, reason} -> {:error, conn, reason}
{:ok, conn} ->
body_size = byte_size(body)
conn
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|> Telemetry.request_resp_body_chunk(body_size)
|> handle_responses(responses, mint, req)
{:error, reason} ->
{:error, conn, reason}
end
end
defp handle_responses([{:data, req, body} | responses], conn, mint, req) do
defp handle_responses(conn, [{:data, req, body} | responses], mint, req) do
# We need to check here for a content-length or transfer encoding header and
# deal with it. This should be refactored out into a proxy state rather
# than using the conn as our state.
body_size = byte_size(body)
case Conn.get_resp_header(conn, "content-length") do
[] ->
conn = Conn.send_chunked(conn, conn.status)
handle_responses([{:data, req, body} | responses], conn, mint, req)
conn
|> Conn.send_chunked(conn.status)
|> Telemetry.request_resp_started()
|> handle_responses([{:data, req, body} | responses], mint, req)
[length] ->
if String.to_integer(length) == byte_size(body) do
if String.to_integer(length) == body_size do
conn =
conn
|> Conn.delete_resp_header("content-length")
|> Conn.send_resp(conn.status, body)
|> Telemetry.request_resp_started()
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|> Telemetry.request_resp_body_chunk(body_size)
|> Conn.halt()
{:ok, conn, mint}
else
conn =
conn
|> Conn.delete_resp_header("content-length")
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|> Telemetry.request_resp_body_chunk(body_size)
|> Conn.send_chunked(conn.status)
handle_responses([{:data, req, body} | responses], conn, mint, req)
handle_responses(conn, [{:data, req, body} | responses], mint, req)
end
end
end
defp handle_responses([{:done, req} | _], conn, mint, req) do
defp handle_responses(conn, [{:done, req} | _], mint, req) do
{:ok, Conn.halt(conn), mint}
end
defp handle_responses([{:error, req, reason} | _], conn, _mint, req), do: {:error, conn, reason}
defp handle_responses(conn, [{:error, req, reason} | _], _mint, req), do: {:error, conn, reason}
defp send_request(conn, mint) do
request_path =
@ -199,15 +269,36 @@ defmodule Wayfarer.Server.Proxy do
defp stream_request_body(conn, mint, req) do
case Conn.read_body(conn) do
{:ok, <<>>, conn} ->
with {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
conn =
conn
|> Telemetry.set_metrics(%{req_body_bytes: 0, req_body_chunks: 0})
{:ok, mint, conn}
end
{:ok, chunk, conn} ->
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk),
{:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
chunk_size = byte_size(chunk)
conn =
conn
|> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|> Telemetry.request_req_body_chunk(chunk_size)
{:ok, mint, conn}
end
{:more, chunk, conn} ->
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do
stream_request_body(conn, mint, req)
chunk_size = byte_size(chunk)
conn
|> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|> Telemetry.request_req_body_chunk(chunk_size)
|> stream_request_body(mint, req)
end
{:error, reason} ->

View file

@ -10,6 +10,7 @@ defmodule Wayfarer.Server.WebSocketProxy do
alias Mint.WebSocket
alias Plug.Conn
alias Wayfarer.Telemetry
require Logger
@default_opts [extensions: [WebSocket.PerMessageDeflate]]
@ -25,7 +26,7 @@ defmodule Wayfarer.Server.WebSocketProxy do
end
case WebSocket.upgrade(proto, mint, request_path, proxy_headers(conn), @default_opts) do
{:ok, mint, ref} -> {:ok, %{mint: mint, ref: ref, status: :init, buffer: []}}
{:ok, mint, ref} -> {:ok, %{mint: mint, ref: ref, status: :init, buffer: [], conn: conn}}
{:error, _mint, reason} -> {:error, reason}
end
end
@ -35,27 +36,39 @@ defmodule Wayfarer.Server.WebSocketProxy do
@doc false
@impl true
def handle_control({payload, [{:opcode, :ping}]}, state) do
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {:ping, payload}),
def handle_control({frame, [{:opcode, :ping}]}, state) do
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {:ping, frame}),
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
{:ok, %{state | websocket: websocket, mint: mint}}
conn = request_client_frame(state.conn, {:ping, frame})
{:ok, %{state | websocket: websocket, mint: mint, conn: conn}}
else
error -> handle_error(error, state)
end
end
def handle_control(_, state), do: {:ok, state}
def handle_control({frame, [{:opcode, frame_type}]}, state) do
conn = request_client_frame(state.conn, {frame_type, frame})
{:ok, %{state | conn: conn}}
end
@doc false
@impl true
def handle_in({payload, [{:opcode, frame_type}]}, state) when state.status == :init do
{:ok, %{state | buffer: [{frame_type, payload} | state.buffer]}}
frame = {frame_type, payload}
buffer = [frame | state.buffer]
conn = request_client_frame(state.conn, frame)
{:ok, %{state | buffer: buffer, conn: conn}}
end
def handle_in({payload, [{:opcode, frame_type}]}, state) do
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {frame_type, payload}),
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
{:ok, %{state | websocket: websocket, mint: mint}}
conn = request_client_frame(state.conn, {frame_type, payload})
{:ok, %{state | websocket: websocket, mint: mint, conn: conn}}
else
error -> handle_error(error, state)
end
@ -150,7 +163,9 @@ defmodule Wayfarer.Server.WebSocketProxy do
defp do_empty_buffer([head | tail], state) do
with {:ok, websocket, data} <- WebSocket.encode(state.websocket, head),
{:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do
do_empty_buffer(tail, %{state | websocket: websocket, mint: mint})
conn = request_client_frame(state.conn, head)
do_empty_buffer(tail, %{state | websocket: websocket, mint: mint, conn: conn})
end
end
@ -181,15 +196,26 @@ defmodule Wayfarer.Server.WebSocketProxy do
defp decode_frames(frames, state) do
frames
|> Enum.reduce_while({:ok, [], state}, fn frame, {:ok, messages, state} ->
case WebSocket.decode(state.websocket, frame) do
{:ok, websocket, frames} when is_list(frames) ->
messages = Enum.concat(messages, frames)
{:cont, {:ok, messages, %{state | websocket: websocket}}}
{:error, websocket, reason} ->
{:halt, {:error, reason, %{state | websocket: websocket}}}
case decode_frame(state, frame) do
{:ok, new_messages, state} -> {:cont, {:ok, [new_messages, messages], state}}
{:error, reason, state} -> {:halt, {:error, reason, state}}
end
end)
|> case do
{:ok, messages, state} -> {:ok, List.flatten(messages), state}
{:error, reason, state} -> {:error, reason, state}
end
end
defp decode_frame(state, frame) do
case WebSocket.decode(state.websocket, frame) do
{:ok, websocket, frames} when is_list(frames) ->
conn = Enum.reduce(frames, state.conn, &request_server_frame(&2, &1))
{:ok, frames, %{state | websocket: websocket, conn: conn}}
{:error, websocket, reason} ->
{:error, reason, %{state | websocket: websocket}}
end
end
defp response_for_messages([], state), do: {:ok, state}
@ -200,4 +226,26 @@ defmodule Wayfarer.Server.WebSocketProxy do
{[{:close, code, _} | _], messages} -> {:stop, :normal, code, messages, state}
end
end
defp request_client_frame(conn, {frame_type, frame}) do
frame_size = byte_size(frame)
conn
|> Telemetry.increment_metrics(%{
client_frame_bytes: frame_size,
client_frame_count: 1
})
|> Telemetry.request_client_frame(frame_size, frame_type)
end
defp request_server_frame(conn, {frame_type, frame}) do
frame_size = byte_size(frame)
conn
|> Telemetry.increment_metrics(%{
server_frame_bytes: frame_size,
server_frame_count: 1
})
|> Telemetry.request_server_frame(frame_size, frame_type)
end
end

View file

@ -5,22 +5,47 @@ defmodule Wayfarer.Target.Check do
use GenServer, restart: :transient
alias Mint.{HTTP, HTTP1, HTTP2, WebSocket}
alias Wayfarer.{Target, Target.TotalConnections}
alias Wayfarer.{Target, Target.TotalConnections, Telemetry}
require Logger
@type state :: %{
conn: struct(),
req: reference(),
scheme: :http | :https | :ws | :wss,
address: :inet.ip_address(),
port: :socket.port_number(),
uri: URI.t(),
ref: any,
method: String.t(),
headers: [{String.t(), String.t()}],
hostname: String.t(),
transport: :http1 | :http2 | :auto,
span: map
}
@doc false
@impl true
def init(state), do: {:ok, state, {:continue, :start_check}}
@doc false
@impl true
@spec handle_continue(:start_check, state) :: {:noreply, state, timeout} | {:stop, :normal, nil}
def handle_continue(:start_check, state) do
with {:ok, conn} <- connect(state),
{:ok, conn, req} <- request(Map.put(state, :conn, conn)) do
state =
state
|> Map.merge(%{conn: conn, req: req})
state =
state
|> Map.put(:span, %{
metadata: %{
target: %{scheme: state.scheme, address: state.address, port: state.port},
method: state.method,
uri: state.uri,
hostname: state.hostname,
telemetry_span_context: make_ref()
}
})
|> Telemetry.health_check_start()
with {:ok, state} <- connect(state),
{:ok, state} <- request(state) do
{:noreply, state, state.response_timeout}
else
{:error, reason} ->
@ -33,7 +58,7 @@ defmodule Wayfarer.Target.Check do
@doc false
@impl true
def handle_info(:timeout, state), do: check_failed(state, "request timeout expired.")
def handle_info(:timeout, state), do: check_failed(state, :timeout)
def handle_info(message, state) do
with {:ok, conn, responses} <- WebSocket.stream(state.conn, message),
@ -44,13 +69,14 @@ defmodule Wayfarer.Target.Check do
{:ok, status} <- get_status_response(conn, responses) do
if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do
Target.check_passed(state.ref)
Telemetry.health_check_pass(state, status)
{:stop, :normal, nil}
else
check_failed(state, "received #{status} status code")
end
else
{:continue, conn} ->
{:noreply, %{state | conn: conn}}
{:noreply, Map.put(state, :conn, conn)}
:unknown ->
check_failed(state, "Received unknown message: `#{inspect(message)}`")
@ -67,53 +93,86 @@ defmodule Wayfarer.Target.Check do
do: connect(%{state | scheme: :https})
defp connect(state) when state.transport == :http1 do
HTTP1.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
)
with {:ok, conn} <-
HTTP1.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
) do
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http1)}
end
end
defp connect(state) when state.transport == :http2 do
HTTP2.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
)
with {:ok, conn} <-
HTTP2.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
) do
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http2)}
end
end
defp connect(state) do
HTTP.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
)
with {:ok, conn} <-
HTTP.connect(state.scheme, state.address, state.port,
timeout: state.connect_timeout,
hostname: state.hostname
) do
transport =
case conn do
%Mint.HTTP1{} -> :http1
%Mint.HTTP2{} -> :http2
end
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), transport)}
end
end
defp request(state) when state.scheme in [:ws, :wss] do
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, [])
with {:ok, conn, req} <-
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) do
state = Map.merge(state, %{conn: conn, req: req})
{:ok, Telemetry.health_check_request(state)}
end
end
defp request(state) do
HTTP.request(state.conn, state.method, state.path, state.headers, nil)
with {:ok, conn, req} <-
HTTP.request(state.conn, state.method, state.path, state.headers, nil) do
state = Map.merge(state, %{conn: conn, req: req})
{:ok, Telemetry.health_check_request(state)}
end
end
defp check_failed(state, reason) when is_binary(reason) do
Target.check_failed(state.ref)
Logger.warning("Health check failed for #{state.method} #{state.uri}: #{reason}.")
Telemetry.health_check_fail(state, reason)
Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{reason}." end)
{:stop, :normal, nil}
end
defp check_failed(state, exception) when is_exception(exception) do
Target.check_failed(state.ref)
Telemetry.health_check_fail(state, exception)
Logger.warning(
Logger.warning(fn ->
"Health check failed for #{state.method} #{state.uri}: #{Exception.message(exception)}"
)
end)
{:stop, :normal, nil}
end
defp check_failed(state, reason) do
Target.check_failed(state.ref)
Logger.warning("Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`")
Telemetry.health_check_fail(state, reason)
Logger.warning(fn ->
"Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`"
end)
{:stop, :normal, nil}
end

764
lib/wayfarer/telemetry.ex Normal file
View file

@ -0,0 +1,764 @@
defmodule Wayfarer.Telemetry do
@moduledoc """
Wayfarer emits a number of telemetry events and spans on top of the excellent
telemetry events emitted by `Bandit.Telemetry`.
"""
alias Plug.Conn
alias Wayfarer.{Router, Target.Check, Target.Selector}
@typedoc """
Information about the target a request has been routed to.
"""
@type target ::
%{
scheme: :http | :https | :ws | :wss,
address: :inet.ip_address(),
port: :socket.port_number()
}
| %{scheme: :plug, module: module, options: any}
@typedoc """
Information about the listener that received the request.
"""
@type listener ::
%{
scheme: :http | :https,
module: module(),
port: :socket.port_number(),
address: :inet.ip_address()
}
@typedoc """
The time that the event was emitted, in `:native` time units.
This is sources from `System.monotonic_time/0` which has some caveats but in
general is better for calculating durations as it should never go backwards.
"""
@type monotonic_time :: integer()
@typedoc """
The time passed since the beginning of the span, in `:native` time units.
The difference between the current `monotonic_time` and the first
`monotonic_time` at the start of the span.
"""
@type duration :: integer()
@typedoc "The HTTP protocol version of the request"
@type transport :: :http1 | :http2
@typedoc "A unique identifier for the span"
@type telemetry_span_context :: reference()
@typedoc "A convenience type for describing telemetry events"
@opaque event(measurements, metadata) :: {measurements, metadata}
@typedoc """
The `[:wayfarer, :request, :start]` event.
This event signals the start of a request span tracking a client request to
completion.
You can use the `telemetry_span_context` metadata value to correlate
subsequent events within the same span.
"""
@type request_start ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context()
}
)
@typedoc """
The `[:wayfarer, :request, :routed]` event.
This event signals that the routing process has completed and a target has
been chosen to serve the request.
"""
@type request_routed ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm()
}
)
@typedoc """
The `[:wayfarer, :request, :exception]` event.
This event signals that something went wrong while processing the event. You
will likely still receive other events (eg `:stop`) for this span however.
"""
@type request_exception ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
optional(:target) => target(),
optional(:algorithm) => Selector.algorithm(),
required(:kind) => :throw | :exit | :error | :exception,
required(:reason) => any,
required(:stacktrace) => Exception.stacktrace()
}
)
@typedoc """
The `[:wayfarer, :request, :stop]` event.
This event signals that the request has completed.
The measurements will contain any incrementing counters accumulated during the
course of the request.
"""
@type request_stop ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599,
optional(:req_body_bytes) => non_neg_integer(),
optional(:resp_body_bytes) => non_neg_integer(),
optional(:client_frame_bytes) => non_neg_integer(),
optional(:client_frame_count) => non_neg_integer(),
optional(:server_frame_bytes) => non_neg_integer(),
optional(:server_frame_count) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
optional(:target) => target(),
optional(:algorithm) => Selector.algorithm(),
optional(:status) => nil | 100..599,
optional(:kind) => :throw | :exit | :error | :exception,
optional(:reason) => any,
optional(:stacktrace) => Exception.stacktrace()
}
)
@typedoc """
The `[:wayfarer, :request, :received_status]` event.
This event signals that an HTTP status code has been received from the
upstream target.
"""
@type request_received_status ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599,
optional(:req_body_bytes) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@typedoc """
The `[:wayfarer, :request, :req_body_chunk]` event.
This event is emitted while streaming the request body from the client to the
target. Under the hood `Plug.Conn.read_body/2` is being called with the
default options, meaning that each chunk is likely to be up to 8MB in size.
If there is no request body then this event will not be emitted and the
`req_body_bytes` and `req_body_chunks` counters will both be set to zero for
this request.
"""
@type request_req_body_chunk ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599,
required(:req_body_bytes) => non_neg_integer(),
required(:req_body_chunks) => non_neg_integer(),
required(:chunk_bytes) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@typedoc """
The `[:wayfarer, :request, :resp_started]` event.
This event indicates that the HTTP status and headers have been received from
the target and the response will now start being sent to the target.
"""
@type request_resp_started ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599,
optional(:req_body_bytes) => non_neg_integer(),
optional(:req_body_chunks) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@typedoc """
The `[:wayfarer, :request, :resp_body_chunk]` event.
This event is emitted every time a chunk of response body is received from the
target for streaming to the client. Under the hood, these are emitted every
time `Mint.HTTP.stream/2` returns a data frame.
"""
@type request_resp_body_chunk ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
optional(:req_body_bytes) => non_neg_integer(),
optional(:req_body_chunks) => non_neg_integer(),
required(:resp_body_bytes) => non_neg_integer(),
required(:resp_body_chunks) => non_neg_integer(),
required(:chunk_bytes) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@typedoc """
The `[:wayfarer, :request, :upgraded]` event.
This event is emitted when a client connection is upgraded to a WebSocket
connection.
"""
@type request_upgraded ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@typedoc """
The `[:wayfarer, :request, :client_frame]` event.
This event is emitted any time a WebSocket frame is received from the client
for transmission to the target.
"""
@type request_client_frame ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:frame_size) => non_neg_integer(),
required(:client_frame_bytes) => non_neg_integer(),
required(:client_frame_count) => non_neg_integer(),
optional(:server_frame_bytes) => non_neg_integer(),
optional(:server_frame_count) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599,
required(:opcode) => :text | :binary | :ping | :pong | :close
}
)
@typedoc """
The `[:wayfarer, :request, :server_frame]` event.
This event is emitted any time a WebSocket frame is received from the target
for transmission to the client.
"""
@type request_server_frame ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:frame_size) => non_neg_integer(),
required(:server_frame_bytes) => non_neg_integer(),
required(:server_frame_count) => non_neg_integer(),
optional(:client_frame_bytes) => non_neg_integer(),
optional(:client_frame_count) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@typedoc """
All the event types that make up the `[:wayfarer, :request, :*]` span.
"""
@type request_span ::
request_start
| request_routed
| request_exception
| request_stop
| request_received_status
| request_req_body_chunk
| request_resp_started
| request_resp_body_chunk
| request_upgraded
| request_client_frame
| request_server_frame
@typedoc """
The `[:wayfarer, :health_check, :start]` event.
This event signals the start of a health check span.
You can use the `telemetry_span_context` metadata value to correlate
subsequent events within the same span.
"""
@type health_check_start ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t()
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t()
}
)
@typedoc """
The `[:wayfarer, :health_check, :connect]` event.
This event signals that the outgoing TCP connection has been made to the
target.
"""
@type health_check_connect ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration()
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
The `[:wayfarer, :health_check, :request]` event.
This event signals that the HTTP or WebSocket request has been sent.
"""
@type health_check_request ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration()
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
The `[:wayfarer, :health_check, :pass]` event.
This event signals that the HTTP status code returned by the target matches
one of the configured success codes.
It also signals the end of the span.
"""
@type health_check_pass ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration(),
required(:status) => 100..599
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
The `[:wayfarer, :health_check, :fail]` event.
This event signals that the HTTP status code returned by the target did not
match any of the configured success codes.
It also signals the end of the span.
"""
@type health_check_fail ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration(),
required(:reason) => any
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
All the event types that make up the `[:wayfarer, :health_check, :*]` span.
"""
@type health_check_span ::
health_check_start | health_check_request | health_check_pass | health_check_fail
@typedoc """
All the spans that can be emitted by Wayfarer.
"""
@type spans :: request_span | health_check_span
@doc false
@spec request_start(Conn.t()) :: Conn.t()
def request_start(conn) do
telemetry_span_context = make_ref()
metadata =
conn
|> Map.get(:private, %{})
|> Map.get(:wayfarer, %{})
|> Map.merge(%{telemetry_span_context: telemetry_span_context, conn: conn})
conn
|> execute_request_span_event(:start, %{}, metadata)
end
@doc false
@spec request_routed(Conn.t(), Router.target(), Router.algorithm()) :: Conn.t()
def request_routed(conn, target, algorithm) do
target =
case target do
{scheme, address, port, _transport} -> %{scheme: scheme, address: address, port: port}
{:plug, {module, opts}} -> %{scheme: :plug, module: module, options: opts}
{:plug, module} -> %{scheme: :plug, module: module, options: []}
end
conn
|> execute_request_span_event(:routed, %{}, %{
target: target,
algorithm: algorithm
})
end
@doc false
@spec request_exception(Conn.t(), kind :: any, reason :: any, stacktrace :: list | nil) ::
Conn.t()
def request_exception(conn, kind, reason, stacktrace \\ nil) do
conn
|> execute_request_span_event(:exception, %{}, %{
kind: kind,
reason: reason,
stacktrace: stacktrace || Process.info(self(), :current_stacktrace)
})
end
@doc false
def request_stop(conn) do
conn
|> execute_request_span_event(:stop, %{status: conn.status}, %{})
end
@doc false
@spec request_received_status(Conn.t(), non_neg_integer()) :: Conn.t()
def request_received_status(conn, status) do
conn
|> execute_request_span_event(:received_status, %{status: status}, %{status: status})
end
@doc false
@spec request_req_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
def request_req_body_chunk(conn, chunk_bytes) do
conn
|> execute_request_span_event(:req_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
end
@doc false
@spec request_resp_started(Conn.t()) :: Conn.t()
def request_resp_started(conn) do
metric = %{status: conn.status}
conn
|> execute_request_span_event(:resp_started, metric, metric)
end
@doc false
@spec request_resp_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
def request_resp_body_chunk(conn, chunk_bytes) do
conn
|> execute_request_span_event(:resp_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
end
@doc false
@spec request_upgraded(Conn.t()) :: Conn.t()
def request_upgraded(conn) do
conn
|> execute_request_span_event(:upgraded, %{}, %{})
end
@doc false
@spec request_client_frame(Conn.t(), non_neg_integer(), atom) :: Conn.t()
def request_client_frame(conn, bytes, opcode) do
conn
|> execute_request_span_event(:client_frame, %{frame_size: bytes}, %{opcode: opcode})
end
@doc false
@spec request_server_frame(Conn.t(), non_neg_integer(), atom) :: Conn.t()
def request_server_frame(conn, bytes, opcode) do
conn
|> execute_request_span_event(:server_frame, %{frame_size: bytes}, %{opcode: opcode})
end
@doc false
@spec set_metrics(Conn.t(), %{atom => number}) :: Conn.t()
def set_metrics(conn, metrics) do
update_metrics(conn, &Map.merge(&1, metrics))
end
@doc false
@spec increment_metrics(Conn.t(), %{atom => number}) :: Conn.t()
def increment_metrics(conn, to_increment) do
update_metrics(conn, fn metrics ->
Enum.reduce(to_increment, metrics, fn {metric_name, increment_by}, metrics ->
Map.update(metrics, metric_name, increment_by, &(&1 + increment_by))
end)
end)
end
@doc false
@spec health_check_start(Check.state()) :: Check.state()
def health_check_start(check) do
execute_check_span_event(check, :start, %{}, %{})
end
@doc false
@spec health_check_connect(Check.state(), atom) :: Check.state()
def health_check_connect(check, transport) do
execute_check_span_event(check, :connect, %{}, %{transport: transport})
end
@doc false
@spec health_check_request(Check.state()) :: Check.state()
def health_check_request(check) do
execute_check_span_event(check, :request, %{}, %{})
end
@doc false
@spec health_check_fail(Check.state(), any) :: Check.state()
def health_check_fail(check, reason) do
execute_check_span_event(check, :fail, %{}, %{reason: reason})
end
@doc false
@spec health_check_pass(Check.state(), 100..599) :: Check.state()
def health_check_pass(check, status) do
execute_check_span_event(check, :pass, %{}, %{status: status})
end
defp update_metrics(conn, callback) do
private =
conn
|> Map.get(:private, %{})
|> Map.get(:wayfarer, %{})
metrics =
private
|> Map.get(:metrics, %{})
|> callback.()
private = Map.put(private, :metrics, metrics)
conn
|> Conn.put_private(:wayfarer, private)
end
defp execute_request_span_event(conn, event, measurements, metadata) do
monotonic_time = System.monotonic_time()
now = DateTime.utc_now()
private =
conn
|> Map.get(:private, %{})
|> Map.get(:wayfarer, %{})
span_info =
private
|> Map.get(:request_span, %{})
metrics =
private
|> Map.get(:metrics, %{})
measurements =
if Map.has_key?(span_info, :start_time) do
%{
monotonic_time: monotonic_time,
duration: monotonic_time - span_info.start_time,
wallclock_time: now
}
else
%{
monotonic_time: monotonic_time,
wallclock_time: now
}
end
|> Map.merge(metrics)
|> Map.merge(measurements)
metadata =
span_info
|> Map.get(:metadata, %{})
|> Map.merge(metadata)
:telemetry.execute([:wayfarer, :request, event], measurements, Map.put(metadata, :conn, conn))
span_info =
span_info
|> Map.put(:metadata, metadata)
|> Map.put_new(:start_time, monotonic_time)
private =
private
|> Map.put(:request_span, span_info)
conn
|> Conn.put_private(:wayfarer, private)
end
defp execute_check_span_event(check, event, measurements, metadata) do
monotonic_time = System.monotonic_time()
now = DateTime.utc_now()
span_info = Map.get(check, :span, %{})
metrics = Map.get(span_info, :metrics, %{})
measurements =
if Map.has_key?(span_info, :start_time) do
%{
monotonic_time: monotonic_time,
duration: monotonic_time - span_info.start_time,
wallclock_time: now
}
else
%{
monotonic_time: monotonic_time,
wallclock_time: now
}
end
|> Map.merge(metrics)
|> Map.merge(measurements)
metadata =
span_info
|> Map.get(:metadata, %{})
|> Map.merge(metadata)
:telemetry.execute([:wayfarer, :health_check, event], measurements, metadata)
span_info =
span_info
|> Map.put(:metadata, metadata)
|> Map.put_new(:start_time, monotonic_time)
Map.put(check, :span, span_info)
end
end

View file

@ -73,7 +73,7 @@ defmodule Wayfarer.Server.ProxyTest do
{:ok, :fake_conn}
end)
|> expect(:stream_request_body, 2, fn mint, _, _ -> {:ok, mint} end)
|> expect(:stream_request_body, fn mint, _, _ -> {:ok, mint} end)
|> expect(:request, fn mint, _, _, _, _ ->
send(self(), :ignore)
{:ok, mint, req}