wip: add health check telemetry.
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
James Harton 2024-08-20 16:41:27 +12:00
parent 176d95b0cc
commit eb4c209a6c
Signed by: james
GPG key ID: 90E82DAA13F624F4
3 changed files with 452 additions and 90 deletions

View file

@ -187,9 +187,6 @@ defmodule Wayfarer.Server.Proxy do
|> Enum.reduce(conn, fn {header_name, header_value}, conn -> |> Enum.reduce(conn, fn {header_name, header_value}, conn ->
conn conn
|> Conn.put_resp_header(header_name, header_value) |> Conn.put_resp_header(header_name, header_value)
|> Telemetry.increment_metrics(%{
header_bytes: byte_size(header_name) + byte_size(header_value)
})
end) end)
|> handle_responses(responses, mint, req) |> handle_responses(responses, mint, req)
end end
@ -228,7 +225,6 @@ defmodule Wayfarer.Server.Proxy do
if String.to_integer(length) == body_size do if String.to_integer(length) == body_size do
conn = conn =
conn conn
# |> Conn.delete_resp_header("content-length")
|> Conn.send_resp(conn.status, body) |> Conn.send_resp(conn.status, body)
|> Telemetry.request_resp_started() |> Telemetry.request_resp_started()
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size}) |> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
@ -239,7 +235,6 @@ defmodule Wayfarer.Server.Proxy do
else else
conn = conn =
conn conn
# |> Conn.delete_resp_header("content-length")
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size}) |> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|> Telemetry.request_resp_body_chunk(body_size) |> Telemetry.request_resp_body_chunk(body_size)
|> Conn.send_chunked(conn.status) |> Conn.send_chunked(conn.status)
@ -278,8 +273,7 @@ defmodule Wayfarer.Server.Proxy do
with {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do with {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
conn = conn =
conn conn
|> Telemetry.set_metrics(%{req_body_bytes: 0}) |> Telemetry.set_metrics(%{req_body_bytes: 0, req_body_chunks: 0})
|> Telemetry.request_req_body_sent()
{:ok, mint, conn} {:ok, mint, conn}
end end
@ -287,18 +281,23 @@ defmodule Wayfarer.Server.Proxy do
{:ok, chunk, conn} -> {:ok, chunk, conn} ->
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk), with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk),
{:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
chunk_size = byte_size(chunk)
conn = conn =
conn conn
|> Telemetry.increment_metrics(%{req_body_bytes: byte_size(chunk)}) |> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|> Telemetry.request_req_body_sent() |> Telemetry.request_req_body_chunk(chunk_size)
{:ok, mint, conn} {:ok, mint, conn}
end end
{:more, chunk, conn} -> {:more, chunk, conn} ->
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do
chunk_size = byte_size(chunk)
conn conn
|> Telemetry.increment_metrics(%{req_body_bytes: byte_size(chunk)}) |> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|> Telemetry.request_req_body_chunk(chunk_size)
|> stream_request_body(mint, req) |> stream_request_body(mint, req)
end end

View file

@ -5,22 +5,47 @@ defmodule Wayfarer.Target.Check do
use GenServer, restart: :transient use GenServer, restart: :transient
alias Mint.{HTTP, HTTP1, HTTP2, WebSocket} alias Mint.{HTTP, HTTP1, HTTP2, WebSocket}
alias Wayfarer.{Target, Target.TotalConnections} alias Wayfarer.{Target, Target.TotalConnections, Telemetry}
require Logger require Logger
@type state :: %{
conn: struct(),
req: reference(),
scheme: :http | :https | :ws | :wss,
address: :inet.ip_address(),
port: :socket.port_number(),
uri: URI.t(),
ref: any,
method: String.t(),
headers: [{String.t(), String.t()}],
hostname: String.t(),
transport: :http1 | :http2 | :auto,
span: map
}
@doc false @doc false
@impl true @impl true
def init(state), do: {:ok, state, {:continue, :start_check}} def init(state), do: {:ok, state, {:continue, :start_check}}
@doc false @doc false
@impl true @impl true
@spec handle_continue(:start_check, state) :: {:noreply, state, timeout} | {:stop, :normal, nil}
def handle_continue(:start_check, state) do def handle_continue(:start_check, state) do
with {:ok, conn} <- connect(state), state =
{:ok, conn, req} <- request(Map.put(state, :conn, conn)) do state
state = |> Map.put(:span, %{
state metadata: %{
|> Map.merge(%{conn: conn, req: req}) target: %{scheme: state.scheme, address: state.address, port: state.port},
method: state.method,
uri: state.uri,
hostname: state.hostname,
telemetry_span_context: make_ref()
}
})
|> Telemetry.health_check_start()
with {:ok, state} <- connect(state),
{:ok, state} <- request(state) do
{:noreply, state, state.response_timeout} {:noreply, state, state.response_timeout}
else else
{:error, reason} -> {:error, reason} ->
@ -33,7 +58,7 @@ defmodule Wayfarer.Target.Check do
@doc false @doc false
@impl true @impl true
def handle_info(:timeout, state), do: check_failed(state, "request timeout expired.") def handle_info(:timeout, state), do: check_failed(state, :timeout)
def handle_info(message, state) do def handle_info(message, state) do
with {:ok, conn, responses} <- WebSocket.stream(state.conn, message), with {:ok, conn, responses} <- WebSocket.stream(state.conn, message),
@ -44,13 +69,14 @@ defmodule Wayfarer.Target.Check do
{:ok, status} <- get_status_response(conn, responses) do {:ok, status} <- get_status_response(conn, responses) do
if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do
Target.check_passed(state.ref) Target.check_passed(state.ref)
Telemetry.health_check_pass(state, status)
{:stop, :normal, nil} {:stop, :normal, nil}
else else
check_failed(state, "received #{status} status code") check_failed(state, "received #{status} status code")
end end
else else
{:continue, conn} -> {:continue, conn} ->
{:noreply, %{state | conn: conn}} {:noreply, Map.put(state, :conn, conn)}
:unknown -> :unknown ->
check_failed(state, "Received unknown message: `#{inspect(message)}`") check_failed(state, "Received unknown message: `#{inspect(message)}`")
@ -67,53 +93,86 @@ defmodule Wayfarer.Target.Check do
do: connect(%{state | scheme: :https}) do: connect(%{state | scheme: :https})
defp connect(state) when state.transport == :http1 do defp connect(state) when state.transport == :http1 do
HTTP1.connect(state.scheme, state.address, state.port, with {:ok, conn} <-
timeout: state.connect_timeout, HTTP1.connect(state.scheme, state.address, state.port,
hostname: state.hostname timeout: state.connect_timeout,
) hostname: state.hostname
) do
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http1)}
end
end end
defp connect(state) when state.transport == :http2 do defp connect(state) when state.transport == :http2 do
HTTP2.connect(state.scheme, state.address, state.port, with {:ok, conn} <-
timeout: state.connect_timeout, HTTP2.connect(state.scheme, state.address, state.port,
hostname: state.hostname timeout: state.connect_timeout,
) hostname: state.hostname
) do
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http2)}
end
end end
defp connect(state) do defp connect(state) do
HTTP.connect(state.scheme, state.address, state.port, with {:ok, conn} <-
timeout: state.connect_timeout, HTTP.connect(state.scheme, state.address, state.port,
hostname: state.hostname timeout: state.connect_timeout,
) hostname: state.hostname
) do
transport =
case conn do
%Mint.HTTP1{} -> :http1
%Mint.HTTP2{} -> :http2
end
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), transport)}
end
end end
defp request(state) when state.scheme in [:ws, :wss] do defp request(state) when state.scheme in [:ws, :wss] do
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) with {:ok, conn, req} <-
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) do
state = Map.merge(state, %{conn: conn, req: req})
{:ok, Telemetry.health_check_request(state)}
end
end end
defp request(state) do defp request(state) do
HTTP.request(state.conn, state.method, state.path, state.headers, nil) with {:ok, conn, req} <-
HTTP.request(state.conn, state.method, state.path, state.headers, nil) do
state = Map.merge(state, %{conn: conn, req: req})
{:ok, Telemetry.health_check_request(state)}
end
end end
defp check_failed(state, reason) when is_binary(reason) do defp check_failed(state, reason) when is_binary(reason) do
Target.check_failed(state.ref) Target.check_failed(state.ref)
Logger.warning("Health check failed for #{state.method} #{state.uri}: #{reason}.") Telemetry.health_check_fail(state, reason)
Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{reason}." end)
{:stop, :normal, nil} {:stop, :normal, nil}
end end
defp check_failed(state, exception) when is_exception(exception) do defp check_failed(state, exception) when is_exception(exception) do
Target.check_failed(state.ref) Target.check_failed(state.ref)
Telemetry.health_check_fail(state, exception)
Logger.warning( Logger.warning(fn ->
"Health check failed for #{state.method} #{state.uri}: #{Exception.message(exception)}" "Health check failed for #{state.method} #{state.uri}: #{Exception.message(exception)}"
) end)
{:stop, :normal, nil} {:stop, :normal, nil}
end end
defp check_failed(state, reason) do defp check_failed(state, reason) do
Target.check_failed(state.ref) Target.check_failed(state.ref)
Logger.warning("Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`") Telemetry.health_check_fail(state, reason)
Logger.warning(fn ->
"Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`"
end)
{:stop, :normal, nil} {:stop, :normal, nil}
end end

View file

@ -5,7 +5,7 @@ defmodule Wayfarer.Telemetry do
""" """
alias Plug.Conn alias Plug.Conn
alias Wayfarer.{Router, Selector} alias Wayfarer.{Router, Target.Selector, Target.Check}
@typedoc """ @typedoc """
Information about the target a request has been routed to. Information about the target a request has been routed to.
@ -29,10 +29,20 @@ defmodule Wayfarer.Telemetry do
address: :inet.ip_address() address: :inet.ip_address()
} }
@typedoc "The time that the event was emitted, in `:native` time" @typedoc """
The time that the event was emitted, in `:native` time units.
This is sources from `System.monotonic_time/0` which has some caveats but in
general is better for calculating durations as it should never go backwards.
"""
@type monotonic_time :: integer() @type monotonic_time :: integer()
@typedoc "The difference between the current `monotonic_time` and the start of the span" @typedoc """
The time passed since the beginning of the span, in `:native` time units.
The difference between the current `monotonic_time` and the first
`monotonic_time` at the start of the span.
"""
@type duration :: integer() @type duration :: integer()
@typedoc "The HTTP protocol version of the request" @typedoc "The HTTP protocol version of the request"
@ -41,13 +51,23 @@ defmodule Wayfarer.Telemetry do
@typedoc "A unique identifier for the span" @typedoc "A unique identifier for the span"
@type telemetry_span_context :: reference() @type telemetry_span_context :: reference()
@type event([atom], measurements :: map, metadata :: map) @typedoc "A convenience type for describing telemetry events"
@opaque event(measurements, metadata) :: {measurements, metadata}
@typedoc """
The `[:wayfarer, :request, :start]` event.
This event signals the start of a request span tracking a client request to
completion.
You can use the `telemetry_span_context` metadata value to correlate
subsequent events within the same span.
"""
@type request_start :: @type request_start ::
event( event(
[:wayfarer, :request, :start],
%{ %{
required(:monotonic_time) => monotonic_time() required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -57,12 +77,18 @@ defmodule Wayfarer.Telemetry do
} }
) )
@typedoc """
The `[:wayfarer, :request, :routed]` event.
This event signals that the routing process has completed and a target has
been chosen to serve the request.
"""
@type request_routed :: @type request_routed ::
event( event(
[:wayfarer, :request, :routed],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration() required(:duration) => duration(),
required(:wallclock_time) => DateTime.t()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -74,12 +100,18 @@ defmodule Wayfarer.Telemetry do
} }
) )
@typedoc """
The `[:wayfarer, :request, :exception]` event.
This event signals that something went wrong while processing the event. You
will likely still receive other events (eg `:stop`) for this span however.
"""
@type request_exception :: @type request_exception ::
event( event(
[:wayfarer, :request, :exception],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration() required(:duration) => duration(),
required(:wallclock_time) => DateTime.t()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -94,13 +126,27 @@ defmodule Wayfarer.Telemetry do
} }
) )
@typedoc """
The `[:wayfarer, :request, :stop]` event.
This event signals that the request has completed.
The measurements will contain any incrementing counters accumulated during the
course of the request.
"""
@type request_stop :: @type request_stop ::
event( event(
[:wayfarer, :request, :stop],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(), required(:duration) => duration(),
required(:status) => nil | 100..599 required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599,
optional(:req_body_bytes) => non_neg_integer(),
optional(:resp_body_bytes) => non_neg_integer(),
optional(:client_frame_bytes) => non_neg_integer(),
optional(:client_frame_count) => non_neg_integer(),
optional(:server_frame_bytes) => non_neg_integer(),
optional(:server_frame_count) => non_neg_integer()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -116,33 +162,20 @@ defmodule Wayfarer.Telemetry do
} }
) )
@typedoc """
The `[:wayfarer, :request, :received_status]` event.
This event signals that an HTTP status code has been received from the
upstream target.
"""
@type request_received_status :: @type request_received_status ::
event( event(
[:wayfarer, :request, :received_status],
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:status) => nil | 100..599
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@type request_req_body_sent ::
event(
[:wayfarer, :request, :req_body_sent],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(), required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599, required(:status) => nil | 100..599,
required(:req_body_bytes) => non_neg_integer() optional(:req_body_bytes) => non_neg_integer()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -155,14 +188,54 @@ defmodule Wayfarer.Telemetry do
} }
) )
@typedoc """
The `[:wayfarer, :request, :req_body_chunk]` event.
This event is emitted while streaming the request body from the client to the
target. Under the hood `Plug.Conn.read_body/2` is being called with the
default options, meaning that each chunk is likely to be up to 8MB in size.
If there is no request body then this event will not be emitted and the
`req_body_bytes` and `req_body_chunks` counters will both be set to zero for
this request.
"""
@type request_req_body_chunk ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599,
required(:req_body_bytes) => non_neg_integer(),
required(:req_body_chunks) => non_neg_integer(),
required(:chunk_bytes) => non_neg_integer()
},
%{
required(:conn) => Conn.t(),
required(:listener) => listener(),
required(:transport) => transport(),
required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(),
required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599
}
)
@typedoc """
The `[:wayfarer, :request, :resp_started]` event.
This event indicates that the HTTP status and headers have been received from
the target and the response will now start being sent to the target.
"""
@type request_resp_started :: @type request_resp_started ::
event( event(
[:wayfarer, :request, :resp_started],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(), required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:status) => nil | 100..599, required(:status) => nil | 100..599,
required(:req_body_bytes) => non_neg_integer() optional(:req_body_bytes) => non_neg_integer(),
optional(:req_body_chunks) => non_neg_integer()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -175,15 +248,24 @@ defmodule Wayfarer.Telemetry do
} }
) )
@typedoc """
The `[:wayfarer, :request, :resp_body_chunk]` event.
This event is emitted every time a chunk of response body is received from the
target for streaming to the client. Under the hood, these are emitted every
time `Mint.HTTP.stream/2` returns a data frame.
"""
@type request_resp_body_chunk :: @type request_resp_body_chunk ::
event( event(
[:wayfarer, :request, :resp_body_chunk],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(), required(:duration) => duration(),
required(:req_body_bytes) => non_neg_integer(), required(:wallclock_time) => DateTime.t(),
optional(:req_body_bytes) => non_neg_integer(),
optional(:req_body_chunks) => non_neg_integer(),
required(:resp_body_bytes) => non_neg_integer(), required(:resp_body_bytes) => non_neg_integer(),
required(:chunk_size) => non_neg_integer() required(:resp_body_chunks) => non_neg_integer(),
required(:chunk_bytes) => non_neg_integer()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -196,12 +278,18 @@ defmodule Wayfarer.Telemetry do
} }
) )
@typedoc """
The `[:wayfarer, :request, :upgraded]` event.
This event is emitted when a client connection is upgraded to a WebSocket
connection.
"""
@type request_upgraded :: @type request_upgraded ::
event( event(
[:wayfarer, :request, :upgraded],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration() required(:duration) => duration(),
required(:wallclock_time) => DateTime.t()
}, },
%{ %{
required(:conn) => Conn.t(), required(:conn) => Conn.t(),
@ -210,17 +298,22 @@ defmodule Wayfarer.Telemetry do
required(:telemetry_span_context) => telemetry_span_context(), required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(), required(:target) => target(),
required(:algorithm) => Selector.algorithm(), required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599, required(:status) => nil | 100..599
required(:opcode) => :text | :binary | :ping | :pong | :close
} }
) )
@typedoc """
The `[:wayfarer, :request, :client_frame]` event.
This event is emitted any time a WebSocket frame is received from the client
for transmission to the target.
"""
@type request_client_frame :: @type request_client_frame ::
event( event(
[:wayfarer, :request, :client_frame],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(), required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:frame_size) => non_neg_integer(), required(:frame_size) => non_neg_integer(),
required(:client_frame_bytes) => non_neg_integer(), required(:client_frame_bytes) => non_neg_integer(),
required(:client_frame_count) => non_neg_integer(), required(:client_frame_count) => non_neg_integer(),
@ -234,16 +327,23 @@ defmodule Wayfarer.Telemetry do
required(:telemetry_span_context) => telemetry_span_context(), required(:telemetry_span_context) => telemetry_span_context(),
required(:target) => target(), required(:target) => target(),
required(:algorithm) => Selector.algorithm(), required(:algorithm) => Selector.algorithm(),
required(:status) => nil | 100..599 required(:status) => nil | 100..599,
required(:opcode) => :text | :binary | :ping | :pong | :close
} }
) )
@typedoc """
The `[:wayfarer, :request, :server_frame]` event.
This event is emitted any time a WebSocket frame is received from the target
for transmission to the client.
"""
@type request_server_frame :: @type request_server_frame ::
event( event(
[:wayfarer, :request, :server_frame],
%{ %{
required(:monotonic_time) => monotonic_time(), required(:monotonic_time) => monotonic_time(),
required(:duration) => duration(), required(:duration) => duration(),
required(:wallclock_time) => DateTime.t(),
required(:frame_size) => non_neg_integer(), required(:frame_size) => non_neg_integer(),
required(:server_frame_bytes) => non_neg_integer(), required(:server_frame_bytes) => non_neg_integer(),
required(:server_frame_count) => non_neg_integer(), required(:server_frame_count) => non_neg_integer(),
@ -261,19 +361,152 @@ defmodule Wayfarer.Telemetry do
} }
) )
@type request :: @typedoc """
All the event types that make up the `[:wayfarer, :request, :*]` span.
"""
@type request_span ::
request_start request_start
| request_routed | request_routed
| request_exception | request_exception
| request_stop | request_stop
| request_received_status | request_received_status
| request_req_body_sent | request_req_body_chunk
| request_resp_started | request_resp_started
| request_resp_body_chunk | request_resp_body_chunk
| request_upgraded | request_upgraded
| request_client_frame | request_client_frame
| request_server_frame | request_server_frame
@typedoc """
The `[:wayfarer, :health_check, :start]` event.
This event signals the start of a health check span.
You can use the `telemetry_span_context` metadata value to correlate
subsequent events within the same span.
"""
@type health_check_start ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t()
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t()
}
)
@typedoc """
The `[:wayfarer, :health_check, :connect]` event.
This event signals that the outgoing TCP connection has been made to the
target.
"""
@type health_check_connect ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration()
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
The `[:wayfarer, :health_check, :request]` event.
This event signals that the HTTP or WebSocket request has been sent.
"""
@type health_check_request ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration()
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
The `[:wayfarer, :health_check, :pass]` event.
This event signals that the HTTP status code returned by the target matches
one of the configured success codes.
It also signals the end of the span.
"""
@type health_check_pass ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration(),
required(:status) => 100..599
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
The `[:wayfarer, :health_check, :fail]` event.
This event signals that the HTTP status code returned by the target did not
match any of the configured success codes.
It also signals the end of the span.
"""
@type health_check_fail ::
event(
%{
required(:monotonic_time) => monotonic_time(),
required(:wallclock_time) => DateTime.t(),
required(:duration) => duration(),
required(:reason) => any
},
%{
required(:telemetry_span_context) => telemetry_span_context(),
required(:hostname) => String.t(),
required(:uri) => URI.t(),
required(:target) => target(),
required(:method) => String.t(),
required(:transport) => transport()
}
)
@typedoc """
All the event types that make up the `[:wayfarer, :health_check, :*]` span.
"""
@type health_check_span ::
health_check_start | health_check_request | health_check_pass | health_check_fail
@typedoc """
All the spans that can be emitted by Wayfarer.
"""
@type spans :: request_span | health_check_span
@doc false @doc false
@spec request_start(Conn.t()) :: Conn.t() @spec request_start(Conn.t()) :: Conn.t()
def request_start(conn) do def request_start(conn) do
@ -332,10 +565,10 @@ defmodule Wayfarer.Telemetry do
end end
@doc false @doc false
@spec request_req_body_sent(Conn.t()) :: Conn.t() @spec request_req_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
def request_req_body_sent(conn) do def request_req_body_chunk(conn, chunk_bytes) do
conn conn
|> execute_request_span_event(:req_body_sent, %{}, %{}) |> execute_request_span_event(:req_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
end end
@doc false @doc false
@ -349,9 +582,9 @@ defmodule Wayfarer.Telemetry do
@doc false @doc false
@spec request_resp_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t() @spec request_resp_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
def request_resp_body_chunk(conn, chunk_size) do def request_resp_body_chunk(conn, chunk_bytes) do
conn conn
|> execute_request_span_event(:resp_body_chunk, %{chunk_size: chunk_size}, %{}) |> execute_request_span_event(:resp_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
end end
@doc false @doc false
@ -391,6 +624,36 @@ defmodule Wayfarer.Telemetry do
end) end)
end end
@doc false
@spec health_check_start(Check.state()) :: Check.state()
def health_check_start(check) do
execute_check_span_event(check, :start, %{}, %{})
end
@doc false
@spec health_check_connect(Check.state(), atom) :: Check.state()
def health_check_connect(check, transport) do
execute_check_span_event(check, :connect, %{}, %{transport: transport})
end
@doc false
@spec health_check_request(Check.state()) :: Check.state()
def health_check_request(check) do
execute_check_span_event(check, :request, %{}, %{})
end
@doc false
@spec health_check_fail(Check.state(), any) :: Check.state()
def health_check_fail(check, reason) do
execute_check_span_event(check, :fail, %{}, %{reason: reason})
end
@doc false
@spec health_check_pass(Check.state(), 100..599) :: Check.state()
def health_check_pass(check, status) do
execute_check_span_event(check, :pass, %{}, %{status: status})
end
defp update_metrics(conn, callback) do defp update_metrics(conn, callback) do
private = private =
conn conn
@ -410,6 +673,7 @@ defmodule Wayfarer.Telemetry do
defp execute_request_span_event(conn, event, measurements, metadata) do defp execute_request_span_event(conn, event, measurements, metadata) do
monotonic_time = System.monotonic_time() monotonic_time = System.monotonic_time()
now = DateTime.utc_now()
private = private =
conn conn
@ -428,11 +692,13 @@ defmodule Wayfarer.Telemetry do
if Map.has_key?(span_info, :start_time) do if Map.has_key?(span_info, :start_time) do
%{ %{
monotonic_time: monotonic_time, monotonic_time: monotonic_time,
duration: monotonic_time - span_info.start_time duration: monotonic_time - span_info.start_time,
wallclock_time: now
} }
else else
%{ %{
monotonic_time: monotonic_time monotonic_time: monotonic_time,
wallclock_time: now
} }
end end
|> Map.merge(metrics) |> Map.merge(metrics)
@ -457,4 +723,42 @@ defmodule Wayfarer.Telemetry do
conn conn
|> Conn.put_private(:wayfarer, private) |> Conn.put_private(:wayfarer, private)
end end
defp execute_check_span_event(check, event, measurements, metadata) do
monotonic_time = System.monotonic_time()
now = DateTime.utc_now()
span_info = Map.get(check, :span, %{})
metrics = Map.get(span_info, :metrics, %{})
measurements =
if Map.has_key?(span_info, :start_time) do
%{
monotonic_time: monotonic_time,
duration: monotonic_time - span_info.start_time,
wallclock_time: now
}
else
%{
monotonic_time: monotonic_time,
wallclock_time: now
}
end
|> Map.merge(metrics)
|> Map.merge(measurements)
metadata =
span_info
|> Map.get(:metadata, %{})
|> Map.merge(metadata)
:telemetry.execute([:wayfarer, :health_check, event], measurements, metadata)
span_info =
span_info
|> Map.put(:metadata, metadata)
|> Map.put_new(:start_time, monotonic_time)
Map.put(check, :span, span_info)
end
end end