From 07c41153f58707d91c8b0859353db818fe2afbb6 Mon Sep 17 00:00:00 2001 From: James Harton Date: Tue, 20 Aug 2024 16:44:08 +1200 Subject: [PATCH] feat: Add request telemetry (#114) Reviewed-on: https://harton.dev/james/wayfarer/pulls/114 Co-authored-by: James Harton Co-committed-by: James Harton --- .vscode/settings.json | 3 + lib/wayfarer/server/plug.ex | 57 +- lib/wayfarer/server/proxy.ex | 163 ++++-- lib/wayfarer/server/websocket_proxy.ex | 78 ++- lib/wayfarer/target/check.ex | 111 +++- lib/wayfarer/telemetry.ex | 764 +++++++++++++++++++++++++ test/wayfarer/server/proxy_test.exs | 2 +- 7 files changed, 1093 insertions(+), 85 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 lib/wayfarer/telemetry.ex diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..9b82d73 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "cSpell.words": ["ntoa"] +} diff --git a/lib/wayfarer/server/plug.ex b/lib/wayfarer/server/plug.ex index c683fcf..ebec090 100644 --- a/lib/wayfarer/server/plug.ex +++ b/lib/wayfarer/server/plug.ex @@ -3,7 +3,7 @@ defmodule Wayfarer.Server.Plug do Plug pipeline to handle inbound HTTP connections. """ - alias Wayfarer.{Router, Server.Proxy, Target.Selector} + alias Wayfarer.{Router, Server.Proxy, Target.Selector, Telemetry} require Logger import Plug.Conn @@ -17,22 +17,57 @@ defmodule Wayfarer.Server.Plug do @doc false @impl true @spec call(Plug.Conn.t(), map) :: Plug.Conn.t() - def call(conn, config) when is_atom(config.module) do - conn = put_private(conn, :wayfarer, %{listener: config}) + def call(conn, config) do + transport = get_transport(conn) + conn + |> put_private(:wayfarer, %{listener: config, transport: transport}) + |> Telemetry.request_start() + |> do_call(config) + end + + defp do_call(conn, config) when is_atom(config.module) do listener = {config.scheme, config.address, config.port} with {:ok, targets} <- Router.find_healthy_targets(config.module, listener, conn.host), {:ok, targets, algorithm} <- split_targets_and_algorithms(targets), {:ok, target} <- Selector.choose(conn, targets, algorithm) do - do_proxy(conn, target) + conn + |> Telemetry.request_routed(target, algorithm) + |> do_proxy(target) else - :error -> bad_gateway(conn) - {:error, reason} -> internal_error(conn, reason) + :error -> + conn + |> Telemetry.request_exception(:error, :target_not_found) + |> bad_gateway() + + {:error, reason} -> + conn + |> Telemetry.request_exception(:error, reason) + |> internal_error(reason) end + rescue + exception -> + conn + |> Telemetry.request_exception(:exception, exception, __STACKTRACE__) + |> internal_error(exception) + catch + reason -> + conn + |> Telemetry.request_exception(:throw, reason) + |> internal_error(reason) + + kind, reason -> + conn + |> Telemetry.request_exception(kind, reason) + |> internal_error(reason) end - def call(conn, _config), do: bad_gateway(conn) + defp do_call(conn, _config) do + conn + |> Telemetry.request_exception(:error, :unrecognised_request) + |> bad_gateway() + end defp internal_error(conn, reason) do Logger.error("Internal error when routing proxy request: #{inspect(reason)}") @@ -64,4 +99,12 @@ defmodule Wayfarer.Server.Plug do do: split_targets_and_algorithms(tail, [target | targets], algorithm) defp split_targets_and_algorithms([], targets, algorithm), do: {:ok, targets, algorithm} + + defp get_transport(%{adapter: {Bandit.Adapter, %{transport: %Bandit.HTTP1.Socket{}}}}), + do: :http1 + + defp get_transport(%{adapter: {Bandit.Adapter, %{transport: %Bandit.HTTP2.Stream{}}}}), + do: :http2 + + defp get_transport(_), do: :unknown end diff --git a/lib/wayfarer/server/proxy.ex b/lib/wayfarer/server/proxy.ex index c34c331..788d7b8 100644 --- a/lib/wayfarer/server/proxy.ex +++ b/lib/wayfarer/server/proxy.ex @@ -6,7 +6,7 @@ defmodule Wayfarer.Server.Proxy do alias Mint.{HTTP, HTTP1, HTTP2} alias Plug.Conn - alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections} + alias Wayfarer.{Router, Target.ActiveConnections, Target.TotalConnections, Telemetry} require Logger @connect_timeout 5_000 @@ -22,16 +22,18 @@ defmodule Wayfarer.Server.Proxy do :ok <- TotalConnections.proxy_connect(target) do handle_request(mint, conn, target) else - error -> handle_error(error, conn, target) + error -> + conn + |> Telemetry.request_exception(:error, error) + |> handle_error(error, target) end end defp handle_request(mint, conn, {proto, _, _, _}) do - with ["Upgrade"] <- Conn.get_req_header(conn, "connection"), - ["websocket"] <- Conn.get_req_header(conn, "upgrade") do + if http1?(conn) && connection_wants_upgrade?(conn) && upgrade_is_websocket?(conn) do handle_websocket_request(mint, conn, proto) else - _ -> handle_http_request(mint, conn) + handle_http_request(mint, conn) end end @@ -40,13 +42,14 @@ defmodule Wayfarer.Server.Proxy do {:ok, mint, conn} <- stream_request_body(conn, mint, req), {:ok, conn, _mint} <- proxy_responses(conn, mint, req) do conn + |> Telemetry.request_stop() end end defp handle_websocket_request(mint, conn, proto) do - WebSockAdapter.upgrade(conn, Wayfarer.Server.WebSocketProxy, {mint, conn, proto}, - compress: true - ) + conn + |> WebSockAdapter.upgrade(Wayfarer.Server.WebSocketProxy, {mint, conn, proto}, compress: true) + |> Telemetry.request_upgraded() end defp connect(conn, {:ws, address, port, transport}), @@ -64,10 +67,10 @@ defmodule Wayfarer.Server.Proxy do defp connect(conn, {scheme, address, port, :auto}) when is_tuple(address), do: HTTP.connect(scheme, address, port, hostname: conn.host, timeout: @connect_timeout) - defp handle_error({:error, _, reason}, conn, target), - do: handle_error({:error, reason}, conn, target) + defp handle_error(conn, {:error, _, reason}, target), + do: handle_error(conn, {:error, reason}, target) - defp handle_error({:error, reason}, conn, {scheme, address, port, transport}) do + defp handle_error(conn, {:error, reason}, {scheme, address, port, transport}) do Logger.error(fn -> phase = connection_phase(conn) ip = :inet.ntoa(address) @@ -87,7 +90,54 @@ defmodule Wayfarer.Server.Proxy do end end - defp connection_phase(nil), do: "connect" + defp http1?(%{private: %{wayfarer: %{transport: :http1}}}), do: true + defp http1?(_), do: false + + defp connection_wants_upgrade?(conn) do + case Conn.get_req_header(conn, "connection") do + ["Upgrade"] -> + true + + ["upgrade"] -> + true + + [] -> + false + + maybe -> + maybe + |> Enum.flat_map(&String.split(&1, ~r/,\s*/)) + |> Enum.map(fn chunk -> + chunk + |> String.trim() + |> String.downcase() + end) + |> Enum.member?("upgrade") + end + end + + defp upgrade_is_websocket?(conn) do + case Conn.get_req_header(conn, "upgrade") do + ["websocket"] -> + true + + [] -> + false + + maybe -> + maybe + |> Enum.flat_map(&String.split(&1, ~r/,\s*/)) + |> Enum.map(fn chunk -> + chunk + |> String.trim() + |> String.split("/") + |> hd() + |> String.downcase() + end) + |> Enum.member?("websocket") + end + end + defp connection_phase(conn) when conn.state in [:chunked, :upgraded], do: "stream" defp connection_phase(conn) when conn.halted, do: "done" defp connection_phase(conn) when conn.state == :sent, do: "done" @@ -113,7 +163,7 @@ defmodule Wayfarer.Server.Proxy do proxy_responses(conn, mint, req) {:ok, mint, responses} -> - handle_responses(responses, conn, mint, req) + handle_responses(conn, responses, mint, req) {:error, _, reason, _} -> {:error, reason} @@ -123,62 +173,82 @@ defmodule Wayfarer.Server.Proxy do end end - defp handle_responses([], conn, mint, req), do: proxy_responses(conn, mint, req) + defp handle_responses(conn, [], mint, req), do: proxy_responses(conn, mint, req) - defp handle_responses([{:status, req, status} | responses], conn, mint, req), - do: handle_responses(responses, Conn.put_status(conn, status), mint, req) - - defp handle_responses([{:headers, req, headers} | responses], conn, mint, req) do - conn = - headers - |> Enum.reduce(conn, &Conn.put_resp_header(&2, elem(&1, 0), elem(&1, 1))) - - handle_responses(responses, conn, mint, req) + defp handle_responses(conn, [{:status, req, status} | responses], mint, req) do + conn + |> Conn.put_status(status) + |> Telemetry.request_received_status(status) + |> handle_responses(responses, mint, req) end - defp handle_responses([{:data, req, body} | responses], conn, mint, req) + defp handle_responses(conn, [{:headers, req, headers} | responses], mint, req) do + headers + |> Enum.reduce(conn, fn {header_name, header_value}, conn -> + conn + |> Conn.put_resp_header(header_name, header_value) + end) + |> handle_responses(responses, mint, req) + end + + defp handle_responses(conn, [{:data, req, body} | responses], mint, req) when conn.state == :chunked do case Conn.chunk(conn, body) do - {:ok, conn} -> handle_responses(responses, conn, mint, req) - {:error, reason} -> {:error, conn, reason} + {:ok, conn} -> + body_size = byte_size(body) + + conn + |> Telemetry.increment_metrics(%{resp_body_bytes: body_size}) + |> Telemetry.request_resp_body_chunk(body_size) + |> handle_responses(responses, mint, req) + + {:error, reason} -> + {:error, conn, reason} end end - defp handle_responses([{:data, req, body} | responses], conn, mint, req) do + defp handle_responses(conn, [{:data, req, body} | responses], mint, req) do # We need to check here for a content-length or transfer encoding header and # deal with it. This should be refactored out into a proxy state rather # than using the conn as our state. + body_size = byte_size(body) + case Conn.get_resp_header(conn, "content-length") do [] -> - conn = Conn.send_chunked(conn, conn.status) - handle_responses([{:data, req, body} | responses], conn, mint, req) + conn + |> Conn.send_chunked(conn.status) + |> Telemetry.request_resp_started() + |> handle_responses([{:data, req, body} | responses], mint, req) [length] -> - if String.to_integer(length) == byte_size(body) do + if String.to_integer(length) == body_size do conn = conn - |> Conn.delete_resp_header("content-length") |> Conn.send_resp(conn.status, body) + |> Telemetry.request_resp_started() + |> Telemetry.increment_metrics(%{resp_body_bytes: body_size}) + |> Telemetry.request_resp_body_chunk(body_size) |> Conn.halt() {:ok, conn, mint} else conn = conn - |> Conn.delete_resp_header("content-length") + |> Telemetry.increment_metrics(%{resp_body_bytes: body_size}) + |> Telemetry.request_resp_body_chunk(body_size) |> Conn.send_chunked(conn.status) - handle_responses([{:data, req, body} | responses], conn, mint, req) + handle_responses(conn, [{:data, req, body} | responses], mint, req) end end end - defp handle_responses([{:done, req} | _], conn, mint, req) do + defp handle_responses(conn, [{:done, req} | _], mint, req) do {:ok, Conn.halt(conn), mint} end - defp handle_responses([{:error, req, reason} | _], conn, _mint, req), do: {:error, conn, reason} + defp handle_responses(conn, [{:error, req, reason} | _], _mint, req), do: {:error, conn, reason} defp send_request(conn, mint) do request_path = @@ -199,15 +269,36 @@ defmodule Wayfarer.Server.Proxy do defp stream_request_body(conn, mint, req) do case Conn.read_body(conn) do + {:ok, <<>>, conn} -> + with {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do + conn = + conn + |> Telemetry.set_metrics(%{req_body_bytes: 0, req_body_chunks: 0}) + + {:ok, mint, conn} + end + {:ok, chunk, conn} -> with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk), {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do + chunk_size = byte_size(chunk) + + conn = + conn + |> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1}) + |> Telemetry.request_req_body_chunk(chunk_size) + {:ok, mint, conn} end {:more, chunk, conn} -> with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do - stream_request_body(conn, mint, req) + chunk_size = byte_size(chunk) + + conn + |> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1}) + |> Telemetry.request_req_body_chunk(chunk_size) + |> stream_request_body(mint, req) end {:error, reason} -> diff --git a/lib/wayfarer/server/websocket_proxy.ex b/lib/wayfarer/server/websocket_proxy.ex index 0f6840d..278bda9 100644 --- a/lib/wayfarer/server/websocket_proxy.ex +++ b/lib/wayfarer/server/websocket_proxy.ex @@ -10,6 +10,7 @@ defmodule Wayfarer.Server.WebSocketProxy do alias Mint.WebSocket alias Plug.Conn + alias Wayfarer.Telemetry require Logger @default_opts [extensions: [WebSocket.PerMessageDeflate]] @@ -25,7 +26,7 @@ defmodule Wayfarer.Server.WebSocketProxy do end case WebSocket.upgrade(proto, mint, request_path, proxy_headers(conn), @default_opts) do - {:ok, mint, ref} -> {:ok, %{mint: mint, ref: ref, status: :init, buffer: []}} + {:ok, mint, ref} -> {:ok, %{mint: mint, ref: ref, status: :init, buffer: [], conn: conn}} {:error, _mint, reason} -> {:error, reason} end end @@ -35,27 +36,39 @@ defmodule Wayfarer.Server.WebSocketProxy do @doc false @impl true - def handle_control({payload, [{:opcode, :ping}]}, state) do - with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {:ping, payload}), + def handle_control({frame, [{:opcode, :ping}]}, state) do + with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {:ping, frame}), {:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do - {:ok, %{state | websocket: websocket, mint: mint}} + conn = request_client_frame(state.conn, {:ping, frame}) + + {:ok, %{state | websocket: websocket, mint: mint, conn: conn}} else error -> handle_error(error, state) end end - def handle_control(_, state), do: {:ok, state} + def handle_control({frame, [{:opcode, frame_type}]}, state) do + conn = request_client_frame(state.conn, {frame_type, frame}) + + {:ok, %{state | conn: conn}} + end @doc false @impl true def handle_in({payload, [{:opcode, frame_type}]}, state) when state.status == :init do - {:ok, %{state | buffer: [{frame_type, payload} | state.buffer]}} + frame = {frame_type, payload} + buffer = [frame | state.buffer] + conn = request_client_frame(state.conn, frame) + + {:ok, %{state | buffer: buffer, conn: conn}} end def handle_in({payload, [{:opcode, frame_type}]}, state) do with {:ok, websocket, data} <- WebSocket.encode(state.websocket, {frame_type, payload}), {:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do - {:ok, %{state | websocket: websocket, mint: mint}} + conn = request_client_frame(state.conn, {frame_type, payload}) + + {:ok, %{state | websocket: websocket, mint: mint, conn: conn}} else error -> handle_error(error, state) end @@ -150,7 +163,9 @@ defmodule Wayfarer.Server.WebSocketProxy do defp do_empty_buffer([head | tail], state) do with {:ok, websocket, data} <- WebSocket.encode(state.websocket, head), {:ok, mint} <- WebSocket.stream_request_body(state.mint, state.ref, data) do - do_empty_buffer(tail, %{state | websocket: websocket, mint: mint}) + conn = request_client_frame(state.conn, head) + + do_empty_buffer(tail, %{state | websocket: websocket, mint: mint, conn: conn}) end end @@ -181,15 +196,26 @@ defmodule Wayfarer.Server.WebSocketProxy do defp decode_frames(frames, state) do frames |> Enum.reduce_while({:ok, [], state}, fn frame, {:ok, messages, state} -> - case WebSocket.decode(state.websocket, frame) do - {:ok, websocket, frames} when is_list(frames) -> - messages = Enum.concat(messages, frames) - {:cont, {:ok, messages, %{state | websocket: websocket}}} - - {:error, websocket, reason} -> - {:halt, {:error, reason, %{state | websocket: websocket}}} + case decode_frame(state, frame) do + {:ok, new_messages, state} -> {:cont, {:ok, [new_messages, messages], state}} + {:error, reason, state} -> {:halt, {:error, reason, state}} end end) + |> case do + {:ok, messages, state} -> {:ok, List.flatten(messages), state} + {:error, reason, state} -> {:error, reason, state} + end + end + + defp decode_frame(state, frame) do + case WebSocket.decode(state.websocket, frame) do + {:ok, websocket, frames} when is_list(frames) -> + conn = Enum.reduce(frames, state.conn, &request_server_frame(&2, &1)) + {:ok, frames, %{state | websocket: websocket, conn: conn}} + + {:error, websocket, reason} -> + {:error, reason, %{state | websocket: websocket}} + end end defp response_for_messages([], state), do: {:ok, state} @@ -200,4 +226,26 @@ defmodule Wayfarer.Server.WebSocketProxy do {[{:close, code, _} | _], messages} -> {:stop, :normal, code, messages, state} end end + + defp request_client_frame(conn, {frame_type, frame}) do + frame_size = byte_size(frame) + + conn + |> Telemetry.increment_metrics(%{ + client_frame_bytes: frame_size, + client_frame_count: 1 + }) + |> Telemetry.request_client_frame(frame_size, frame_type) + end + + defp request_server_frame(conn, {frame_type, frame}) do + frame_size = byte_size(frame) + + conn + |> Telemetry.increment_metrics(%{ + server_frame_bytes: frame_size, + server_frame_count: 1 + }) + |> Telemetry.request_server_frame(frame_size, frame_type) + end end diff --git a/lib/wayfarer/target/check.ex b/lib/wayfarer/target/check.ex index a60040b..81bf591 100644 --- a/lib/wayfarer/target/check.ex +++ b/lib/wayfarer/target/check.ex @@ -5,22 +5,47 @@ defmodule Wayfarer.Target.Check do use GenServer, restart: :transient alias Mint.{HTTP, HTTP1, HTTP2, WebSocket} - alias Wayfarer.{Target, Target.TotalConnections} + alias Wayfarer.{Target, Target.TotalConnections, Telemetry} require Logger + @type state :: %{ + conn: struct(), + req: reference(), + scheme: :http | :https | :ws | :wss, + address: :inet.ip_address(), + port: :socket.port_number(), + uri: URI.t(), + ref: any, + method: String.t(), + headers: [{String.t(), String.t()}], + hostname: String.t(), + transport: :http1 | :http2 | :auto, + span: map + } + @doc false @impl true def init(state), do: {:ok, state, {:continue, :start_check}} @doc false @impl true + @spec handle_continue(:start_check, state) :: {:noreply, state, timeout} | {:stop, :normal, nil} def handle_continue(:start_check, state) do - with {:ok, conn} <- connect(state), - {:ok, conn, req} <- request(Map.put(state, :conn, conn)) do - state = - state - |> Map.merge(%{conn: conn, req: req}) + state = + state + |> Map.put(:span, %{ + metadata: %{ + target: %{scheme: state.scheme, address: state.address, port: state.port}, + method: state.method, + uri: state.uri, + hostname: state.hostname, + telemetry_span_context: make_ref() + } + }) + |> Telemetry.health_check_start() + with {:ok, state} <- connect(state), + {:ok, state} <- request(state) do {:noreply, state, state.response_timeout} else {:error, reason} -> @@ -33,7 +58,7 @@ defmodule Wayfarer.Target.Check do @doc false @impl true - def handle_info(:timeout, state), do: check_failed(state, "request timeout expired.") + def handle_info(:timeout, state), do: check_failed(state, :timeout) def handle_info(message, state) do with {:ok, conn, responses} <- WebSocket.stream(state.conn, message), @@ -44,13 +69,14 @@ defmodule Wayfarer.Target.Check do {:ok, status} <- get_status_response(conn, responses) do if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do Target.check_passed(state.ref) + Telemetry.health_check_pass(state, status) {:stop, :normal, nil} else check_failed(state, "received #{status} status code") end else {:continue, conn} -> - {:noreply, %{state | conn: conn}} + {:noreply, Map.put(state, :conn, conn)} :unknown -> check_failed(state, "Received unknown message: `#{inspect(message)}`") @@ -67,53 +93,86 @@ defmodule Wayfarer.Target.Check do do: connect(%{state | scheme: :https}) defp connect(state) when state.transport == :http1 do - HTTP1.connect(state.scheme, state.address, state.port, - timeout: state.connect_timeout, - hostname: state.hostname - ) + with {:ok, conn} <- + HTTP1.connect(state.scheme, state.address, state.port, + timeout: state.connect_timeout, + hostname: state.hostname + ) do + {:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http1)} + end end defp connect(state) when state.transport == :http2 do - HTTP2.connect(state.scheme, state.address, state.port, - timeout: state.connect_timeout, - hostname: state.hostname - ) + with {:ok, conn} <- + HTTP2.connect(state.scheme, state.address, state.port, + timeout: state.connect_timeout, + hostname: state.hostname + ) do + {:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http2)} + end end defp connect(state) do - HTTP.connect(state.scheme, state.address, state.port, - timeout: state.connect_timeout, - hostname: state.hostname - ) + with {:ok, conn} <- + HTTP.connect(state.scheme, state.address, state.port, + timeout: state.connect_timeout, + hostname: state.hostname + ) do + transport = + case conn do + %Mint.HTTP1{} -> :http1 + %Mint.HTTP2{} -> :http2 + end + + {:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), transport)} + end end defp request(state) when state.scheme in [:ws, :wss] do - WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) + with {:ok, conn, req} <- + WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) do + state = Map.merge(state, %{conn: conn, req: req}) + + {:ok, Telemetry.health_check_request(state)} + end end defp request(state) do - HTTP.request(state.conn, state.method, state.path, state.headers, nil) + with {:ok, conn, req} <- + HTTP.request(state.conn, state.method, state.path, state.headers, nil) do + state = Map.merge(state, %{conn: conn, req: req}) + + {:ok, Telemetry.health_check_request(state)} + end end defp check_failed(state, reason) when is_binary(reason) do Target.check_failed(state.ref) - Logger.warning("Health check failed for #{state.method} #{state.uri}: #{reason}.") + Telemetry.health_check_fail(state, reason) + + Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{reason}." end) {:stop, :normal, nil} end defp check_failed(state, exception) when is_exception(exception) do Target.check_failed(state.ref) + Telemetry.health_check_fail(state, exception) - Logger.warning( + Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{Exception.message(exception)}" - ) + end) {:stop, :normal, nil} end defp check_failed(state, reason) do Target.check_failed(state.ref) - Logger.warning("Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`") + Telemetry.health_check_fail(state, reason) + + Logger.warning(fn -> + "Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`" + end) + {:stop, :normal, nil} end diff --git a/lib/wayfarer/telemetry.ex b/lib/wayfarer/telemetry.ex new file mode 100644 index 0000000..f91940c --- /dev/null +++ b/lib/wayfarer/telemetry.ex @@ -0,0 +1,764 @@ +defmodule Wayfarer.Telemetry do + @moduledoc """ + Wayfarer emits a number of telemetry events and spans on top of the excellent + telemetry events emitted by `Bandit.Telemetry`. + """ + + alias Plug.Conn + alias Wayfarer.{Router, Target.Check, Target.Selector} + + @typedoc """ + Information about the target a request has been routed to. + """ + @type target :: + %{ + scheme: :http | :https | :ws | :wss, + address: :inet.ip_address(), + port: :socket.port_number() + } + | %{scheme: :plug, module: module, options: any} + + @typedoc """ + Information about the listener that received the request. + """ + @type listener :: + %{ + scheme: :http | :https, + module: module(), + port: :socket.port_number(), + address: :inet.ip_address() + } + + @typedoc """ + The time that the event was emitted, in `:native` time units. + + This is sources from `System.monotonic_time/0` which has some caveats but in + general is better for calculating durations as it should never go backwards. + """ + @type monotonic_time :: integer() + + @typedoc """ + The time passed since the beginning of the span, in `:native` time units. + + The difference between the current `monotonic_time` and the first + `monotonic_time` at the start of the span. + """ + @type duration :: integer() + + @typedoc "The HTTP protocol version of the request" + @type transport :: :http1 | :http2 + + @typedoc "A unique identifier for the span" + @type telemetry_span_context :: reference() + + @typedoc "A convenience type for describing telemetry events" + @opaque event(measurements, metadata) :: {measurements, metadata} + + @typedoc """ + The `[:wayfarer, :request, :start]` event. + + This event signals the start of a request span tracking a client request to + completion. + + You can use the `telemetry_span_context` metadata value to correlate + subsequent events within the same span. + """ + @type request_start :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context() + } + ) + + @typedoc """ + The `[:wayfarer, :request, :routed]` event. + + This event signals that the routing process has completed and a target has + been chosen to serve the request. + """ + @type request_routed :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm() + } + ) + + @typedoc """ + The `[:wayfarer, :request, :exception]` event. + + This event signals that something went wrong while processing the event. You + will likely still receive other events (eg `:stop`) for this span however. + """ + @type request_exception :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + optional(:target) => target(), + optional(:algorithm) => Selector.algorithm(), + required(:kind) => :throw | :exit | :error | :exception, + required(:reason) => any, + required(:stacktrace) => Exception.stacktrace() + } + ) + + @typedoc """ + The `[:wayfarer, :request, :stop]` event. + + This event signals that the request has completed. + + The measurements will contain any incrementing counters accumulated during the + course of the request. + """ + @type request_stop :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + required(:status) => nil | 100..599, + optional(:req_body_bytes) => non_neg_integer(), + optional(:resp_body_bytes) => non_neg_integer(), + optional(:client_frame_bytes) => non_neg_integer(), + optional(:client_frame_count) => non_neg_integer(), + optional(:server_frame_bytes) => non_neg_integer(), + optional(:server_frame_count) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + optional(:target) => target(), + optional(:algorithm) => Selector.algorithm(), + optional(:status) => nil | 100..599, + optional(:kind) => :throw | :exit | :error | :exception, + optional(:reason) => any, + optional(:stacktrace) => Exception.stacktrace() + } + ) + + @typedoc """ + The `[:wayfarer, :request, :received_status]` event. + + This event signals that an HTTP status code has been received from the + upstream target. + """ + @type request_received_status :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + required(:status) => nil | 100..599, + optional(:req_body_bytes) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599 + } + ) + + @typedoc """ + The `[:wayfarer, :request, :req_body_chunk]` event. + + This event is emitted while streaming the request body from the client to the + target. Under the hood `Plug.Conn.read_body/2` is being called with the + default options, meaning that each chunk is likely to be up to 8MB in size. + + If there is no request body then this event will not be emitted and the + `req_body_bytes` and `req_body_chunks` counters will both be set to zero for + this request. + """ + @type request_req_body_chunk :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + required(:status) => nil | 100..599, + required(:req_body_bytes) => non_neg_integer(), + required(:req_body_chunks) => non_neg_integer(), + required(:chunk_bytes) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599 + } + ) + + @typedoc """ + The `[:wayfarer, :request, :resp_started]` event. + + This event indicates that the HTTP status and headers have been received from + the target and the response will now start being sent to the target. + """ + @type request_resp_started :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + required(:status) => nil | 100..599, + optional(:req_body_bytes) => non_neg_integer(), + optional(:req_body_chunks) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599 + } + ) + + @typedoc """ + The `[:wayfarer, :request, :resp_body_chunk]` event. + + This event is emitted every time a chunk of response body is received from the + target for streaming to the client. Under the hood, these are emitted every + time `Mint.HTTP.stream/2` returns a data frame. + """ + @type request_resp_body_chunk :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + optional(:req_body_bytes) => non_neg_integer(), + optional(:req_body_chunks) => non_neg_integer(), + required(:resp_body_bytes) => non_neg_integer(), + required(:resp_body_chunks) => non_neg_integer(), + required(:chunk_bytes) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599 + } + ) + + @typedoc """ + The `[:wayfarer, :request, :upgraded]` event. + + This event is emitted when a client connection is upgraded to a WebSocket + connection. + """ + @type request_upgraded :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599 + } + ) + + @typedoc """ + The `[:wayfarer, :request, :client_frame]` event. + + This event is emitted any time a WebSocket frame is received from the client + for transmission to the target. + """ + @type request_client_frame :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + required(:frame_size) => non_neg_integer(), + required(:client_frame_bytes) => non_neg_integer(), + required(:client_frame_count) => non_neg_integer(), + optional(:server_frame_bytes) => non_neg_integer(), + optional(:server_frame_count) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599, + required(:opcode) => :text | :binary | :ping | :pong | :close + } + ) + + @typedoc """ + The `[:wayfarer, :request, :server_frame]` event. + + This event is emitted any time a WebSocket frame is received from the target + for transmission to the client. + """ + @type request_server_frame :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + required(:frame_size) => non_neg_integer(), + required(:server_frame_bytes) => non_neg_integer(), + required(:server_frame_count) => non_neg_integer(), + optional(:client_frame_bytes) => non_neg_integer(), + optional(:client_frame_count) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599 + } + ) + + @typedoc """ + All the event types that make up the `[:wayfarer, :request, :*]` span. + """ + @type request_span :: + request_start + | request_routed + | request_exception + | request_stop + | request_received_status + | request_req_body_chunk + | request_resp_started + | request_resp_body_chunk + | request_upgraded + | request_client_frame + | request_server_frame + + @typedoc """ + The `[:wayfarer, :health_check, :start]` event. + + This event signals the start of a health check span. + + You can use the `telemetry_span_context` metadata value to correlate + subsequent events within the same span. + """ + @type health_check_start :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t() + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t() + } + ) + + @typedoc """ + The `[:wayfarer, :health_check, :connect]` event. + + This event signals that the outgoing TCP connection has been made to the + target. + """ + @type health_check_connect :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration() + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + @typedoc """ + The `[:wayfarer, :health_check, :request]` event. + + This event signals that the HTTP or WebSocket request has been sent. + """ + @type health_check_request :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration() + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + + @typedoc """ + The `[:wayfarer, :health_check, :pass]` event. + + This event signals that the HTTP status code returned by the target matches + one of the configured success codes. + + It also signals the end of the span. + """ + @type health_check_pass :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration(), + required(:status) => 100..599 + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + + @typedoc """ + The `[:wayfarer, :health_check, :fail]` event. + + This event signals that the HTTP status code returned by the target did not + match any of the configured success codes. + + It also signals the end of the span. + """ + @type health_check_fail :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration(), + required(:reason) => any + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + + @typedoc """ + All the event types that make up the `[:wayfarer, :health_check, :*]` span. + """ + @type health_check_span :: + health_check_start | health_check_request | health_check_pass | health_check_fail + + @typedoc """ + All the spans that can be emitted by Wayfarer. + """ + @type spans :: request_span | health_check_span + + @doc false + @spec request_start(Conn.t()) :: Conn.t() + def request_start(conn) do + telemetry_span_context = make_ref() + + metadata = + conn + |> Map.get(:private, %{}) + |> Map.get(:wayfarer, %{}) + |> Map.merge(%{telemetry_span_context: telemetry_span_context, conn: conn}) + + conn + |> execute_request_span_event(:start, %{}, metadata) + end + + @doc false + @spec request_routed(Conn.t(), Router.target(), Router.algorithm()) :: Conn.t() + def request_routed(conn, target, algorithm) do + target = + case target do + {scheme, address, port, _transport} -> %{scheme: scheme, address: address, port: port} + {:plug, {module, opts}} -> %{scheme: :plug, module: module, options: opts} + {:plug, module} -> %{scheme: :plug, module: module, options: []} + end + + conn + |> execute_request_span_event(:routed, %{}, %{ + target: target, + algorithm: algorithm + }) + end + + @doc false + @spec request_exception(Conn.t(), kind :: any, reason :: any, stacktrace :: list | nil) :: + Conn.t() + def request_exception(conn, kind, reason, stacktrace \\ nil) do + conn + |> execute_request_span_event(:exception, %{}, %{ + kind: kind, + reason: reason, + stacktrace: stacktrace || Process.info(self(), :current_stacktrace) + }) + end + + @doc false + def request_stop(conn) do + conn + |> execute_request_span_event(:stop, %{status: conn.status}, %{}) + end + + @doc false + @spec request_received_status(Conn.t(), non_neg_integer()) :: Conn.t() + def request_received_status(conn, status) do + conn + |> execute_request_span_event(:received_status, %{status: status}, %{status: status}) + end + + @doc false + @spec request_req_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t() + def request_req_body_chunk(conn, chunk_bytes) do + conn + |> execute_request_span_event(:req_body_chunk, %{chunk_bytes: chunk_bytes}, %{}) + end + + @doc false + @spec request_resp_started(Conn.t()) :: Conn.t() + def request_resp_started(conn) do + metric = %{status: conn.status} + + conn + |> execute_request_span_event(:resp_started, metric, metric) + end + + @doc false + @spec request_resp_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t() + def request_resp_body_chunk(conn, chunk_bytes) do + conn + |> execute_request_span_event(:resp_body_chunk, %{chunk_bytes: chunk_bytes}, %{}) + end + + @doc false + @spec request_upgraded(Conn.t()) :: Conn.t() + def request_upgraded(conn) do + conn + |> execute_request_span_event(:upgraded, %{}, %{}) + end + + @doc false + @spec request_client_frame(Conn.t(), non_neg_integer(), atom) :: Conn.t() + def request_client_frame(conn, bytes, opcode) do + conn + |> execute_request_span_event(:client_frame, %{frame_size: bytes}, %{opcode: opcode}) + end + + @doc false + @spec request_server_frame(Conn.t(), non_neg_integer(), atom) :: Conn.t() + def request_server_frame(conn, bytes, opcode) do + conn + |> execute_request_span_event(:server_frame, %{frame_size: bytes}, %{opcode: opcode}) + end + + @doc false + @spec set_metrics(Conn.t(), %{atom => number}) :: Conn.t() + def set_metrics(conn, metrics) do + update_metrics(conn, &Map.merge(&1, metrics)) + end + + @doc false + @spec increment_metrics(Conn.t(), %{atom => number}) :: Conn.t() + def increment_metrics(conn, to_increment) do + update_metrics(conn, fn metrics -> + Enum.reduce(to_increment, metrics, fn {metric_name, increment_by}, metrics -> + Map.update(metrics, metric_name, increment_by, &(&1 + increment_by)) + end) + end) + end + + @doc false + @spec health_check_start(Check.state()) :: Check.state() + def health_check_start(check) do + execute_check_span_event(check, :start, %{}, %{}) + end + + @doc false + @spec health_check_connect(Check.state(), atom) :: Check.state() + def health_check_connect(check, transport) do + execute_check_span_event(check, :connect, %{}, %{transport: transport}) + end + + @doc false + @spec health_check_request(Check.state()) :: Check.state() + def health_check_request(check) do + execute_check_span_event(check, :request, %{}, %{}) + end + + @doc false + @spec health_check_fail(Check.state(), any) :: Check.state() + def health_check_fail(check, reason) do + execute_check_span_event(check, :fail, %{}, %{reason: reason}) + end + + @doc false + @spec health_check_pass(Check.state(), 100..599) :: Check.state() + def health_check_pass(check, status) do + execute_check_span_event(check, :pass, %{}, %{status: status}) + end + + defp update_metrics(conn, callback) do + private = + conn + |> Map.get(:private, %{}) + |> Map.get(:wayfarer, %{}) + + metrics = + private + |> Map.get(:metrics, %{}) + |> callback.() + + private = Map.put(private, :metrics, metrics) + + conn + |> Conn.put_private(:wayfarer, private) + end + + defp execute_request_span_event(conn, event, measurements, metadata) do + monotonic_time = System.monotonic_time() + now = DateTime.utc_now() + + private = + conn + |> Map.get(:private, %{}) + |> Map.get(:wayfarer, %{}) + + span_info = + private + |> Map.get(:request_span, %{}) + + metrics = + private + |> Map.get(:metrics, %{}) + + measurements = + if Map.has_key?(span_info, :start_time) do + %{ + monotonic_time: monotonic_time, + duration: monotonic_time - span_info.start_time, + wallclock_time: now + } + else + %{ + monotonic_time: monotonic_time, + wallclock_time: now + } + end + |> Map.merge(metrics) + |> Map.merge(measurements) + + metadata = + span_info + |> Map.get(:metadata, %{}) + |> Map.merge(metadata) + + :telemetry.execute([:wayfarer, :request, event], measurements, Map.put(metadata, :conn, conn)) + + span_info = + span_info + |> Map.put(:metadata, metadata) + |> Map.put_new(:start_time, monotonic_time) + + private = + private + |> Map.put(:request_span, span_info) + + conn + |> Conn.put_private(:wayfarer, private) + end + + defp execute_check_span_event(check, event, measurements, metadata) do + monotonic_time = System.monotonic_time() + now = DateTime.utc_now() + span_info = Map.get(check, :span, %{}) + + metrics = Map.get(span_info, :metrics, %{}) + + measurements = + if Map.has_key?(span_info, :start_time) do + %{ + monotonic_time: monotonic_time, + duration: monotonic_time - span_info.start_time, + wallclock_time: now + } + else + %{ + monotonic_time: monotonic_time, + wallclock_time: now + } + end + |> Map.merge(metrics) + |> Map.merge(measurements) + + metadata = + span_info + |> Map.get(:metadata, %{}) + |> Map.merge(metadata) + + :telemetry.execute([:wayfarer, :health_check, event], measurements, metadata) + + span_info = + span_info + |> Map.put(:metadata, metadata) + |> Map.put_new(:start_time, monotonic_time) + + Map.put(check, :span, span_info) + end +end diff --git a/test/wayfarer/server/proxy_test.exs b/test/wayfarer/server/proxy_test.exs index a80f023..853b5a0 100644 --- a/test/wayfarer/server/proxy_test.exs +++ b/test/wayfarer/server/proxy_test.exs @@ -73,7 +73,7 @@ defmodule Wayfarer.Server.ProxyTest do {:ok, :fake_conn} end) - |> expect(:stream_request_body, 2, fn mint, _, _ -> {:ok, mint} end) + |> expect(:stream_request_body, fn mint, _, _ -> {:ok, mint} end) |> expect(:request, fn mint, _, _, _, _ -> send(self(), :ignore) {:ok, mint, req}