From 87172fce52cce1490396bc104718b18eef47d86d Mon Sep 17 00:00:00 2001 From: James Harton Date: Tue, 20 Aug 2024 16:41:27 +1200 Subject: [PATCH] wip: add health check telemetry. --- lib/wayfarer/server/proxy.ex | 19 +- lib/wayfarer/target/check.ex | 111 +++++++--- lib/wayfarer/telemetry.ex | 412 ++++++++++++++++++++++++++++++----- 3 files changed, 452 insertions(+), 90 deletions(-) diff --git a/lib/wayfarer/server/proxy.ex b/lib/wayfarer/server/proxy.ex index e83b641..788d7b8 100644 --- a/lib/wayfarer/server/proxy.ex +++ b/lib/wayfarer/server/proxy.ex @@ -187,9 +187,6 @@ defmodule Wayfarer.Server.Proxy do |> Enum.reduce(conn, fn {header_name, header_value}, conn -> conn |> Conn.put_resp_header(header_name, header_value) - |> Telemetry.increment_metrics(%{ - header_bytes: byte_size(header_name) + byte_size(header_value) - }) end) |> handle_responses(responses, mint, req) end @@ -228,7 +225,6 @@ defmodule Wayfarer.Server.Proxy do if String.to_integer(length) == body_size do conn = conn - # |> Conn.delete_resp_header("content-length") |> Conn.send_resp(conn.status, body) |> Telemetry.request_resp_started() |> Telemetry.increment_metrics(%{resp_body_bytes: body_size}) @@ -239,7 +235,6 @@ defmodule Wayfarer.Server.Proxy do else conn = conn - # |> Conn.delete_resp_header("content-length") |> Telemetry.increment_metrics(%{resp_body_bytes: body_size}) |> Telemetry.request_resp_body_chunk(body_size) |> Conn.send_chunked(conn.status) @@ -278,8 +273,7 @@ defmodule Wayfarer.Server.Proxy do with {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do conn = conn - |> Telemetry.set_metrics(%{req_body_bytes: 0}) - |> Telemetry.request_req_body_sent() + |> Telemetry.set_metrics(%{req_body_bytes: 0, req_body_chunks: 0}) {:ok, mint, conn} end @@ -287,18 +281,23 @@ defmodule Wayfarer.Server.Proxy do {:ok, chunk, conn} -> with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk), {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do + chunk_size = byte_size(chunk) + conn = conn - |> Telemetry.increment_metrics(%{req_body_bytes: byte_size(chunk)}) - |> Telemetry.request_req_body_sent() + |> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1}) + |> Telemetry.request_req_body_chunk(chunk_size) {:ok, mint, conn} end {:more, chunk, conn} -> with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do + chunk_size = byte_size(chunk) + conn - |> Telemetry.increment_metrics(%{req_body_bytes: byte_size(chunk)}) + |> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1}) + |> Telemetry.request_req_body_chunk(chunk_size) |> stream_request_body(mint, req) end diff --git a/lib/wayfarer/target/check.ex b/lib/wayfarer/target/check.ex index a60040b..81bf591 100644 --- a/lib/wayfarer/target/check.ex +++ b/lib/wayfarer/target/check.ex @@ -5,22 +5,47 @@ defmodule Wayfarer.Target.Check do use GenServer, restart: :transient alias Mint.{HTTP, HTTP1, HTTP2, WebSocket} - alias Wayfarer.{Target, Target.TotalConnections} + alias Wayfarer.{Target, Target.TotalConnections, Telemetry} require Logger + @type state :: %{ + conn: struct(), + req: reference(), + scheme: :http | :https | :ws | :wss, + address: :inet.ip_address(), + port: :socket.port_number(), + uri: URI.t(), + ref: any, + method: String.t(), + headers: [{String.t(), String.t()}], + hostname: String.t(), + transport: :http1 | :http2 | :auto, + span: map + } + @doc false @impl true def init(state), do: {:ok, state, {:continue, :start_check}} @doc false @impl true + @spec handle_continue(:start_check, state) :: {:noreply, state, timeout} | {:stop, :normal, nil} def handle_continue(:start_check, state) do - with {:ok, conn} <- connect(state), - {:ok, conn, req} <- request(Map.put(state, :conn, conn)) do - state = - state - |> Map.merge(%{conn: conn, req: req}) + state = + state + |> Map.put(:span, %{ + metadata: %{ + target: %{scheme: state.scheme, address: state.address, port: state.port}, + method: state.method, + uri: state.uri, + hostname: state.hostname, + telemetry_span_context: make_ref() + } + }) + |> Telemetry.health_check_start() + with {:ok, state} <- connect(state), + {:ok, state} <- request(state) do {:noreply, state, state.response_timeout} else {:error, reason} -> @@ -33,7 +58,7 @@ defmodule Wayfarer.Target.Check do @doc false @impl true - def handle_info(:timeout, state), do: check_failed(state, "request timeout expired.") + def handle_info(:timeout, state), do: check_failed(state, :timeout) def handle_info(message, state) do with {:ok, conn, responses} <- WebSocket.stream(state.conn, message), @@ -44,13 +69,14 @@ defmodule Wayfarer.Target.Check do {:ok, status} <- get_status_response(conn, responses) do if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do Target.check_passed(state.ref) + Telemetry.health_check_pass(state, status) {:stop, :normal, nil} else check_failed(state, "received #{status} status code") end else {:continue, conn} -> - {:noreply, %{state | conn: conn}} + {:noreply, Map.put(state, :conn, conn)} :unknown -> check_failed(state, "Received unknown message: `#{inspect(message)}`") @@ -67,53 +93,86 @@ defmodule Wayfarer.Target.Check do do: connect(%{state | scheme: :https}) defp connect(state) when state.transport == :http1 do - HTTP1.connect(state.scheme, state.address, state.port, - timeout: state.connect_timeout, - hostname: state.hostname - ) + with {:ok, conn} <- + HTTP1.connect(state.scheme, state.address, state.port, + timeout: state.connect_timeout, + hostname: state.hostname + ) do + {:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http1)} + end end defp connect(state) when state.transport == :http2 do - HTTP2.connect(state.scheme, state.address, state.port, - timeout: state.connect_timeout, - hostname: state.hostname - ) + with {:ok, conn} <- + HTTP2.connect(state.scheme, state.address, state.port, + timeout: state.connect_timeout, + hostname: state.hostname + ) do + {:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http2)} + end end defp connect(state) do - HTTP.connect(state.scheme, state.address, state.port, - timeout: state.connect_timeout, - hostname: state.hostname - ) + with {:ok, conn} <- + HTTP.connect(state.scheme, state.address, state.port, + timeout: state.connect_timeout, + hostname: state.hostname + ) do + transport = + case conn do + %Mint.HTTP1{} -> :http1 + %Mint.HTTP2{} -> :http2 + end + + {:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), transport)} + end end defp request(state) when state.scheme in [:ws, :wss] do - WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) + with {:ok, conn, req} <- + WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) do + state = Map.merge(state, %{conn: conn, req: req}) + + {:ok, Telemetry.health_check_request(state)} + end end defp request(state) do - HTTP.request(state.conn, state.method, state.path, state.headers, nil) + with {:ok, conn, req} <- + HTTP.request(state.conn, state.method, state.path, state.headers, nil) do + state = Map.merge(state, %{conn: conn, req: req}) + + {:ok, Telemetry.health_check_request(state)} + end end defp check_failed(state, reason) when is_binary(reason) do Target.check_failed(state.ref) - Logger.warning("Health check failed for #{state.method} #{state.uri}: #{reason}.") + Telemetry.health_check_fail(state, reason) + + Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{reason}." end) {:stop, :normal, nil} end defp check_failed(state, exception) when is_exception(exception) do Target.check_failed(state.ref) + Telemetry.health_check_fail(state, exception) - Logger.warning( + Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{Exception.message(exception)}" - ) + end) {:stop, :normal, nil} end defp check_failed(state, reason) do Target.check_failed(state.ref) - Logger.warning("Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`") + Telemetry.health_check_fail(state, reason) + + Logger.warning(fn -> + "Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`" + end) + {:stop, :normal, nil} end diff --git a/lib/wayfarer/telemetry.ex b/lib/wayfarer/telemetry.ex index 9ae4d6e..f91940c 100644 --- a/lib/wayfarer/telemetry.ex +++ b/lib/wayfarer/telemetry.ex @@ -5,7 +5,7 @@ defmodule Wayfarer.Telemetry do """ alias Plug.Conn - alias Wayfarer.{Router, Selector} + alias Wayfarer.{Router, Target.Check, Target.Selector} @typedoc """ Information about the target a request has been routed to. @@ -29,10 +29,20 @@ defmodule Wayfarer.Telemetry do address: :inet.ip_address() } - @typedoc "The time that the event was emitted, in `:native` time" + @typedoc """ + The time that the event was emitted, in `:native` time units. + + This is sources from `System.monotonic_time/0` which has some caveats but in + general is better for calculating durations as it should never go backwards. + """ @type monotonic_time :: integer() - @typedoc "The difference between the current `monotonic_time` and the start of the span" + @typedoc """ + The time passed since the beginning of the span, in `:native` time units. + + The difference between the current `monotonic_time` and the first + `monotonic_time` at the start of the span. + """ @type duration :: integer() @typedoc "The HTTP protocol version of the request" @@ -41,13 +51,23 @@ defmodule Wayfarer.Telemetry do @typedoc "A unique identifier for the span" @type telemetry_span_context :: reference() - @type event([atom], measurements :: map, metadata :: map) + @typedoc "A convenience type for describing telemetry events" + @opaque event(measurements, metadata) :: {measurements, metadata} + @typedoc """ + The `[:wayfarer, :request, :start]` event. + + This event signals the start of a request span tracking a client request to + completion. + + You can use the `telemetry_span_context` metadata value to correlate + subsequent events within the same span. + """ @type request_start :: event( - [:wayfarer, :request, :start], %{ - required(:monotonic_time) => monotonic_time() + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t() }, %{ required(:conn) => Conn.t(), @@ -57,12 +77,18 @@ defmodule Wayfarer.Telemetry do } ) + @typedoc """ + The `[:wayfarer, :request, :routed]` event. + + This event signals that the routing process has completed and a target has + been chosen to serve the request. + """ @type request_routed :: event( - [:wayfarer, :request, :routed], %{ required(:monotonic_time) => monotonic_time(), - required(:duration) => duration() + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t() }, %{ required(:conn) => Conn.t(), @@ -74,12 +100,18 @@ defmodule Wayfarer.Telemetry do } ) + @typedoc """ + The `[:wayfarer, :request, :exception]` event. + + This event signals that something went wrong while processing the event. You + will likely still receive other events (eg `:stop`) for this span however. + """ @type request_exception :: event( - [:wayfarer, :request, :exception], %{ required(:monotonic_time) => monotonic_time(), - required(:duration) => duration() + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t() }, %{ required(:conn) => Conn.t(), @@ -94,13 +126,27 @@ defmodule Wayfarer.Telemetry do } ) + @typedoc """ + The `[:wayfarer, :request, :stop]` event. + + This event signals that the request has completed. + + The measurements will contain any incrementing counters accumulated during the + course of the request. + """ @type request_stop :: event( - [:wayfarer, :request, :stop], %{ required(:monotonic_time) => monotonic_time(), required(:duration) => duration(), - required(:status) => nil | 100..599 + required(:wallclock_time) => DateTime.t(), + required(:status) => nil | 100..599, + optional(:req_body_bytes) => non_neg_integer(), + optional(:resp_body_bytes) => non_neg_integer(), + optional(:client_frame_bytes) => non_neg_integer(), + optional(:client_frame_count) => non_neg_integer(), + optional(:server_frame_bytes) => non_neg_integer(), + optional(:server_frame_count) => non_neg_integer() }, %{ required(:conn) => Conn.t(), @@ -116,33 +162,20 @@ defmodule Wayfarer.Telemetry do } ) + @typedoc """ + The `[:wayfarer, :request, :received_status]` event. + + This event signals that an HTTP status code has been received from the + upstream target. + """ @type request_received_status :: event( - [:wayfarer, :request, :received_status], - %{ - required(:monotonic_time) => monotonic_time(), - required(:duration) => duration(), - required(:status) => nil | 100..599 - }, - %{ - required(:conn) => Conn.t(), - required(:listener) => listener(), - required(:transport) => transport(), - required(:telemetry_span_context) => telemetry_span_context(), - required(:target) => target(), - required(:algorithm) => Selector.algorithm(), - required(:status) => nil | 100..599 - } - ) - - @type request_req_body_sent :: - event( - [:wayfarer, :request, :req_body_sent], %{ required(:monotonic_time) => monotonic_time(), required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), required(:status) => nil | 100..599, - required(:req_body_bytes) => non_neg_integer() + optional(:req_body_bytes) => non_neg_integer() }, %{ required(:conn) => Conn.t(), @@ -155,14 +188,54 @@ defmodule Wayfarer.Telemetry do } ) + @typedoc """ + The `[:wayfarer, :request, :req_body_chunk]` event. + + This event is emitted while streaming the request body from the client to the + target. Under the hood `Plug.Conn.read_body/2` is being called with the + default options, meaning that each chunk is likely to be up to 8MB in size. + + If there is no request body then this event will not be emitted and the + `req_body_bytes` and `req_body_chunks` counters will both be set to zero for + this request. + """ + @type request_req_body_chunk :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), + required(:status) => nil | 100..599, + required(:req_body_bytes) => non_neg_integer(), + required(:req_body_chunks) => non_neg_integer(), + required(:chunk_bytes) => non_neg_integer() + }, + %{ + required(:conn) => Conn.t(), + required(:listener) => listener(), + required(:transport) => transport(), + required(:telemetry_span_context) => telemetry_span_context(), + required(:target) => target(), + required(:algorithm) => Selector.algorithm(), + required(:status) => nil | 100..599 + } + ) + + @typedoc """ + The `[:wayfarer, :request, :resp_started]` event. + + This event indicates that the HTTP status and headers have been received from + the target and the response will now start being sent to the target. + """ @type request_resp_started :: event( - [:wayfarer, :request, :resp_started], %{ required(:monotonic_time) => monotonic_time(), required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), required(:status) => nil | 100..599, - required(:req_body_bytes) => non_neg_integer() + optional(:req_body_bytes) => non_neg_integer(), + optional(:req_body_chunks) => non_neg_integer() }, %{ required(:conn) => Conn.t(), @@ -175,15 +248,24 @@ defmodule Wayfarer.Telemetry do } ) + @typedoc """ + The `[:wayfarer, :request, :resp_body_chunk]` event. + + This event is emitted every time a chunk of response body is received from the + target for streaming to the client. Under the hood, these are emitted every + time `Mint.HTTP.stream/2` returns a data frame. + """ @type request_resp_body_chunk :: event( - [:wayfarer, :request, :resp_body_chunk], %{ required(:monotonic_time) => monotonic_time(), required(:duration) => duration(), - required(:req_body_bytes) => non_neg_integer(), + required(:wallclock_time) => DateTime.t(), + optional(:req_body_bytes) => non_neg_integer(), + optional(:req_body_chunks) => non_neg_integer(), required(:resp_body_bytes) => non_neg_integer(), - required(:chunk_size) => non_neg_integer() + required(:resp_body_chunks) => non_neg_integer(), + required(:chunk_bytes) => non_neg_integer() }, %{ required(:conn) => Conn.t(), @@ -196,12 +278,18 @@ defmodule Wayfarer.Telemetry do } ) + @typedoc """ + The `[:wayfarer, :request, :upgraded]` event. + + This event is emitted when a client connection is upgraded to a WebSocket + connection. + """ @type request_upgraded :: event( - [:wayfarer, :request, :upgraded], %{ required(:monotonic_time) => monotonic_time(), - required(:duration) => duration() + required(:duration) => duration(), + required(:wallclock_time) => DateTime.t() }, %{ required(:conn) => Conn.t(), @@ -210,17 +298,22 @@ defmodule Wayfarer.Telemetry do required(:telemetry_span_context) => telemetry_span_context(), required(:target) => target(), required(:algorithm) => Selector.algorithm(), - required(:status) => nil | 100..599, - required(:opcode) => :text | :binary | :ping | :pong | :close + required(:status) => nil | 100..599 } ) + @typedoc """ + The `[:wayfarer, :request, :client_frame]` event. + + This event is emitted any time a WebSocket frame is received from the client + for transmission to the target. + """ @type request_client_frame :: event( - [:wayfarer, :request, :client_frame], %{ required(:monotonic_time) => monotonic_time(), required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), required(:frame_size) => non_neg_integer(), required(:client_frame_bytes) => non_neg_integer(), required(:client_frame_count) => non_neg_integer(), @@ -234,16 +327,23 @@ defmodule Wayfarer.Telemetry do required(:telemetry_span_context) => telemetry_span_context(), required(:target) => target(), required(:algorithm) => Selector.algorithm(), - required(:status) => nil | 100..599 + required(:status) => nil | 100..599, + required(:opcode) => :text | :binary | :ping | :pong | :close } ) + @typedoc """ + The `[:wayfarer, :request, :server_frame]` event. + + This event is emitted any time a WebSocket frame is received from the target + for transmission to the client. + """ @type request_server_frame :: event( - [:wayfarer, :request, :server_frame], %{ required(:monotonic_time) => monotonic_time(), required(:duration) => duration(), + required(:wallclock_time) => DateTime.t(), required(:frame_size) => non_neg_integer(), required(:server_frame_bytes) => non_neg_integer(), required(:server_frame_count) => non_neg_integer(), @@ -261,19 +361,152 @@ defmodule Wayfarer.Telemetry do } ) - @type request :: + @typedoc """ + All the event types that make up the `[:wayfarer, :request, :*]` span. + """ + @type request_span :: request_start | request_routed | request_exception | request_stop | request_received_status - | request_req_body_sent + | request_req_body_chunk | request_resp_started | request_resp_body_chunk | request_upgraded | request_client_frame | request_server_frame + @typedoc """ + The `[:wayfarer, :health_check, :start]` event. + + This event signals the start of a health check span. + + You can use the `telemetry_span_context` metadata value to correlate + subsequent events within the same span. + """ + @type health_check_start :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t() + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t() + } + ) + + @typedoc """ + The `[:wayfarer, :health_check, :connect]` event. + + This event signals that the outgoing TCP connection has been made to the + target. + """ + @type health_check_connect :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration() + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + @typedoc """ + The `[:wayfarer, :health_check, :request]` event. + + This event signals that the HTTP or WebSocket request has been sent. + """ + @type health_check_request :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration() + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + + @typedoc """ + The `[:wayfarer, :health_check, :pass]` event. + + This event signals that the HTTP status code returned by the target matches + one of the configured success codes. + + It also signals the end of the span. + """ + @type health_check_pass :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration(), + required(:status) => 100..599 + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + + @typedoc """ + The `[:wayfarer, :health_check, :fail]` event. + + This event signals that the HTTP status code returned by the target did not + match any of the configured success codes. + + It also signals the end of the span. + """ + @type health_check_fail :: + event( + %{ + required(:monotonic_time) => monotonic_time(), + required(:wallclock_time) => DateTime.t(), + required(:duration) => duration(), + required(:reason) => any + }, + %{ + required(:telemetry_span_context) => telemetry_span_context(), + required(:hostname) => String.t(), + required(:uri) => URI.t(), + required(:target) => target(), + required(:method) => String.t(), + required(:transport) => transport() + } + ) + + @typedoc """ + All the event types that make up the `[:wayfarer, :health_check, :*]` span. + """ + @type health_check_span :: + health_check_start | health_check_request | health_check_pass | health_check_fail + + @typedoc """ + All the spans that can be emitted by Wayfarer. + """ + @type spans :: request_span | health_check_span + @doc false @spec request_start(Conn.t()) :: Conn.t() def request_start(conn) do @@ -332,10 +565,10 @@ defmodule Wayfarer.Telemetry do end @doc false - @spec request_req_body_sent(Conn.t()) :: Conn.t() - def request_req_body_sent(conn) do + @spec request_req_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t() + def request_req_body_chunk(conn, chunk_bytes) do conn - |> execute_request_span_event(:req_body_sent, %{}, %{}) + |> execute_request_span_event(:req_body_chunk, %{chunk_bytes: chunk_bytes}, %{}) end @doc false @@ -349,9 +582,9 @@ defmodule Wayfarer.Telemetry do @doc false @spec request_resp_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t() - def request_resp_body_chunk(conn, chunk_size) do + def request_resp_body_chunk(conn, chunk_bytes) do conn - |> execute_request_span_event(:resp_body_chunk, %{chunk_size: chunk_size}, %{}) + |> execute_request_span_event(:resp_body_chunk, %{chunk_bytes: chunk_bytes}, %{}) end @doc false @@ -391,6 +624,36 @@ defmodule Wayfarer.Telemetry do end) end + @doc false + @spec health_check_start(Check.state()) :: Check.state() + def health_check_start(check) do + execute_check_span_event(check, :start, %{}, %{}) + end + + @doc false + @spec health_check_connect(Check.state(), atom) :: Check.state() + def health_check_connect(check, transport) do + execute_check_span_event(check, :connect, %{}, %{transport: transport}) + end + + @doc false + @spec health_check_request(Check.state()) :: Check.state() + def health_check_request(check) do + execute_check_span_event(check, :request, %{}, %{}) + end + + @doc false + @spec health_check_fail(Check.state(), any) :: Check.state() + def health_check_fail(check, reason) do + execute_check_span_event(check, :fail, %{}, %{reason: reason}) + end + + @doc false + @spec health_check_pass(Check.state(), 100..599) :: Check.state() + def health_check_pass(check, status) do + execute_check_span_event(check, :pass, %{}, %{status: status}) + end + defp update_metrics(conn, callback) do private = conn @@ -410,6 +673,7 @@ defmodule Wayfarer.Telemetry do defp execute_request_span_event(conn, event, measurements, metadata) do monotonic_time = System.monotonic_time() + now = DateTime.utc_now() private = conn @@ -428,11 +692,13 @@ defmodule Wayfarer.Telemetry do if Map.has_key?(span_info, :start_time) do %{ monotonic_time: monotonic_time, - duration: monotonic_time - span_info.start_time + duration: monotonic_time - span_info.start_time, + wallclock_time: now } else %{ - monotonic_time: monotonic_time + monotonic_time: monotonic_time, + wallclock_time: now } end |> Map.merge(metrics) @@ -457,4 +723,42 @@ defmodule Wayfarer.Telemetry do conn |> Conn.put_private(:wayfarer, private) end + + defp execute_check_span_event(check, event, measurements, metadata) do + monotonic_time = System.monotonic_time() + now = DateTime.utc_now() + span_info = Map.get(check, :span, %{}) + + metrics = Map.get(span_info, :metrics, %{}) + + measurements = + if Map.has_key?(span_info, :start_time) do + %{ + monotonic_time: monotonic_time, + duration: monotonic_time - span_info.start_time, + wallclock_time: now + } + else + %{ + monotonic_time: monotonic_time, + wallclock_time: now + } + end + |> Map.merge(metrics) + |> Map.merge(measurements) + + metadata = + span_info + |> Map.get(:metadata, %{}) + |> Map.merge(metadata) + + :telemetry.execute([:wayfarer, :health_check, event], measurements, metadata) + + span_info = + span_info + |> Map.put(:metadata, metadata) + |> Map.put_new(:start_time, monotonic_time) + + Map.put(check, :span, span_info) + end end