wip: add health check telemetry.
This commit is contained in:
parent
176d95b0cc
commit
87172fce52
3 changed files with 452 additions and 90 deletions
|
@ -187,9 +187,6 @@ defmodule Wayfarer.Server.Proxy do
|
|||
|> Enum.reduce(conn, fn {header_name, header_value}, conn ->
|
||||
conn
|
||||
|> Conn.put_resp_header(header_name, header_value)
|
||||
|> Telemetry.increment_metrics(%{
|
||||
header_bytes: byte_size(header_name) + byte_size(header_value)
|
||||
})
|
||||
end)
|
||||
|> handle_responses(responses, mint, req)
|
||||
end
|
||||
|
@ -228,7 +225,6 @@ defmodule Wayfarer.Server.Proxy do
|
|||
if String.to_integer(length) == body_size do
|
||||
conn =
|
||||
conn
|
||||
# |> Conn.delete_resp_header("content-length")
|
||||
|> Conn.send_resp(conn.status, body)
|
||||
|> Telemetry.request_resp_started()
|
||||
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|
||||
|
@ -239,7 +235,6 @@ defmodule Wayfarer.Server.Proxy do
|
|||
else
|
||||
conn =
|
||||
conn
|
||||
# |> Conn.delete_resp_header("content-length")
|
||||
|> Telemetry.increment_metrics(%{resp_body_bytes: body_size})
|
||||
|> Telemetry.request_resp_body_chunk(body_size)
|
||||
|> Conn.send_chunked(conn.status)
|
||||
|
@ -278,8 +273,7 @@ defmodule Wayfarer.Server.Proxy do
|
|||
with {:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
|
||||
conn =
|
||||
conn
|
||||
|> Telemetry.set_metrics(%{req_body_bytes: 0})
|
||||
|> Telemetry.request_req_body_sent()
|
||||
|> Telemetry.set_metrics(%{req_body_bytes: 0, req_body_chunks: 0})
|
||||
|
||||
{:ok, mint, conn}
|
||||
end
|
||||
|
@ -287,18 +281,23 @@ defmodule Wayfarer.Server.Proxy do
|
|||
{:ok, chunk, conn} ->
|
||||
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk),
|
||||
{:ok, mint} <- HTTP.stream_request_body(mint, req, :eof) do
|
||||
chunk_size = byte_size(chunk)
|
||||
|
||||
conn =
|
||||
conn
|
||||
|> Telemetry.increment_metrics(%{req_body_bytes: byte_size(chunk)})
|
||||
|> Telemetry.request_req_body_sent()
|
||||
|> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|
||||
|> Telemetry.request_req_body_chunk(chunk_size)
|
||||
|
||||
{:ok, mint, conn}
|
||||
end
|
||||
|
||||
{:more, chunk, conn} ->
|
||||
with {:ok, mint} <- HTTP.stream_request_body(mint, req, chunk) do
|
||||
chunk_size = byte_size(chunk)
|
||||
|
||||
conn
|
||||
|> Telemetry.increment_metrics(%{req_body_bytes: byte_size(chunk)})
|
||||
|> Telemetry.increment_metrics(%{req_body_bytes: chunk_size, req_body_chunks: 1})
|
||||
|> Telemetry.request_req_body_chunk(chunk_size)
|
||||
|> stream_request_body(mint, req)
|
||||
end
|
||||
|
||||
|
|
|
@ -5,22 +5,47 @@ defmodule Wayfarer.Target.Check do
|
|||
|
||||
use GenServer, restart: :transient
|
||||
alias Mint.{HTTP, HTTP1, HTTP2, WebSocket}
|
||||
alias Wayfarer.{Target, Target.TotalConnections}
|
||||
alias Wayfarer.{Target, Target.TotalConnections, Telemetry}
|
||||
require Logger
|
||||
|
||||
@type state :: %{
|
||||
conn: struct(),
|
||||
req: reference(),
|
||||
scheme: :http | :https | :ws | :wss,
|
||||
address: :inet.ip_address(),
|
||||
port: :socket.port_number(),
|
||||
uri: URI.t(),
|
||||
ref: any,
|
||||
method: String.t(),
|
||||
headers: [{String.t(), String.t()}],
|
||||
hostname: String.t(),
|
||||
transport: :http1 | :http2 | :auto,
|
||||
span: map
|
||||
}
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def init(state), do: {:ok, state, {:continue, :start_check}}
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
@spec handle_continue(:start_check, state) :: {:noreply, state, timeout} | {:stop, :normal, nil}
|
||||
def handle_continue(:start_check, state) do
|
||||
with {:ok, conn} <- connect(state),
|
||||
{:ok, conn, req} <- request(Map.put(state, :conn, conn)) do
|
||||
state =
|
||||
state
|
||||
|> Map.merge(%{conn: conn, req: req})
|
||||
|> Map.put(:span, %{
|
||||
metadata: %{
|
||||
target: %{scheme: state.scheme, address: state.address, port: state.port},
|
||||
method: state.method,
|
||||
uri: state.uri,
|
||||
hostname: state.hostname,
|
||||
telemetry_span_context: make_ref()
|
||||
}
|
||||
})
|
||||
|> Telemetry.health_check_start()
|
||||
|
||||
with {:ok, state} <- connect(state),
|
||||
{:ok, state} <- request(state) do
|
||||
{:noreply, state, state.response_timeout}
|
||||
else
|
||||
{:error, reason} ->
|
||||
|
@ -33,7 +58,7 @@ defmodule Wayfarer.Target.Check do
|
|||
|
||||
@doc false
|
||||
@impl true
|
||||
def handle_info(:timeout, state), do: check_failed(state, "request timeout expired.")
|
||||
def handle_info(:timeout, state), do: check_failed(state, :timeout)
|
||||
|
||||
def handle_info(message, state) do
|
||||
with {:ok, conn, responses} <- WebSocket.stream(state.conn, message),
|
||||
|
@ -44,13 +69,14 @@ defmodule Wayfarer.Target.Check do
|
|||
{:ok, status} <- get_status_response(conn, responses) do
|
||||
if Enum.any?(state.success_codes, &Enum.member?(&1, status)) do
|
||||
Target.check_passed(state.ref)
|
||||
Telemetry.health_check_pass(state, status)
|
||||
{:stop, :normal, nil}
|
||||
else
|
||||
check_failed(state, "received #{status} status code")
|
||||
end
|
||||
else
|
||||
{:continue, conn} ->
|
||||
{:noreply, %{state | conn: conn}}
|
||||
{:noreply, Map.put(state, :conn, conn)}
|
||||
|
||||
:unknown ->
|
||||
check_failed(state, "Received unknown message: `#{inspect(message)}`")
|
||||
|
@ -67,53 +93,86 @@ defmodule Wayfarer.Target.Check do
|
|||
do: connect(%{state | scheme: :https})
|
||||
|
||||
defp connect(state) when state.transport == :http1 do
|
||||
with {:ok, conn} <-
|
||||
HTTP1.connect(state.scheme, state.address, state.port,
|
||||
timeout: state.connect_timeout,
|
||||
hostname: state.hostname
|
||||
)
|
||||
) do
|
||||
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http1)}
|
||||
end
|
||||
end
|
||||
|
||||
defp connect(state) when state.transport == :http2 do
|
||||
with {:ok, conn} <-
|
||||
HTTP2.connect(state.scheme, state.address, state.port,
|
||||
timeout: state.connect_timeout,
|
||||
hostname: state.hostname
|
||||
)
|
||||
) do
|
||||
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), :http2)}
|
||||
end
|
||||
end
|
||||
|
||||
defp connect(state) do
|
||||
with {:ok, conn} <-
|
||||
HTTP.connect(state.scheme, state.address, state.port,
|
||||
timeout: state.connect_timeout,
|
||||
hostname: state.hostname
|
||||
)
|
||||
) do
|
||||
transport =
|
||||
case conn do
|
||||
%Mint.HTTP1{} -> :http1
|
||||
%Mint.HTTP2{} -> :http2
|
||||
end
|
||||
|
||||
{:ok, Telemetry.health_check_connect(Map.put(state, :conn, conn), transport)}
|
||||
end
|
||||
end
|
||||
|
||||
defp request(state) when state.scheme in [:ws, :wss] do
|
||||
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, [])
|
||||
with {:ok, conn, req} <-
|
||||
WebSocket.upgrade(state.scheme, state.conn, state.path, state.headers, []) do
|
||||
state = Map.merge(state, %{conn: conn, req: req})
|
||||
|
||||
{:ok, Telemetry.health_check_request(state)}
|
||||
end
|
||||
end
|
||||
|
||||
defp request(state) do
|
||||
HTTP.request(state.conn, state.method, state.path, state.headers, nil)
|
||||
with {:ok, conn, req} <-
|
||||
HTTP.request(state.conn, state.method, state.path, state.headers, nil) do
|
||||
state = Map.merge(state, %{conn: conn, req: req})
|
||||
|
||||
{:ok, Telemetry.health_check_request(state)}
|
||||
end
|
||||
end
|
||||
|
||||
defp check_failed(state, reason) when is_binary(reason) do
|
||||
Target.check_failed(state.ref)
|
||||
Logger.warning("Health check failed for #{state.method} #{state.uri}: #{reason}.")
|
||||
Telemetry.health_check_fail(state, reason)
|
||||
|
||||
Logger.warning(fn -> "Health check failed for #{state.method} #{state.uri}: #{reason}." end)
|
||||
{:stop, :normal, nil}
|
||||
end
|
||||
|
||||
defp check_failed(state, exception) when is_exception(exception) do
|
||||
Target.check_failed(state.ref)
|
||||
Telemetry.health_check_fail(state, exception)
|
||||
|
||||
Logger.warning(
|
||||
Logger.warning(fn ->
|
||||
"Health check failed for #{state.method} #{state.uri}: #{Exception.message(exception)}"
|
||||
)
|
||||
end)
|
||||
|
||||
{:stop, :normal, nil}
|
||||
end
|
||||
|
||||
defp check_failed(state, reason) do
|
||||
Target.check_failed(state.ref)
|
||||
Logger.warning("Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`")
|
||||
Telemetry.health_check_fail(state, reason)
|
||||
|
||||
Logger.warning(fn ->
|
||||
"Health check failed for #{state.method} #{state.uri}: `#{inspect(reason)}`"
|
||||
end)
|
||||
|
||||
{:stop, :normal, nil}
|
||||
end
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ defmodule Wayfarer.Telemetry do
|
|||
"""
|
||||
|
||||
alias Plug.Conn
|
||||
alias Wayfarer.{Router, Selector}
|
||||
alias Wayfarer.{Router, Target.Check, Target.Selector}
|
||||
|
||||
@typedoc """
|
||||
Information about the target a request has been routed to.
|
||||
|
@ -29,10 +29,20 @@ defmodule Wayfarer.Telemetry do
|
|||
address: :inet.ip_address()
|
||||
}
|
||||
|
||||
@typedoc "The time that the event was emitted, in `:native` time"
|
||||
@typedoc """
|
||||
The time that the event was emitted, in `:native` time units.
|
||||
|
||||
This is sources from `System.monotonic_time/0` which has some caveats but in
|
||||
general is better for calculating durations as it should never go backwards.
|
||||
"""
|
||||
@type monotonic_time :: integer()
|
||||
|
||||
@typedoc "The difference between the current `monotonic_time` and the start of the span"
|
||||
@typedoc """
|
||||
The time passed since the beginning of the span, in `:native` time units.
|
||||
|
||||
The difference between the current `monotonic_time` and the first
|
||||
`monotonic_time` at the start of the span.
|
||||
"""
|
||||
@type duration :: integer()
|
||||
|
||||
@typedoc "The HTTP protocol version of the request"
|
||||
|
@ -41,13 +51,23 @@ defmodule Wayfarer.Telemetry do
|
|||
@typedoc "A unique identifier for the span"
|
||||
@type telemetry_span_context :: reference()
|
||||
|
||||
@type event([atom], measurements :: map, metadata :: map)
|
||||
@typedoc "A convenience type for describing telemetry events"
|
||||
@opaque event(measurements, metadata) :: {measurements, metadata}
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :start]` event.
|
||||
|
||||
This event signals the start of a request span tracking a client request to
|
||||
completion.
|
||||
|
||||
You can use the `telemetry_span_context` metadata value to correlate
|
||||
subsequent events within the same span.
|
||||
"""
|
||||
@type request_start ::
|
||||
event(
|
||||
[:wayfarer, :request, :start],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time()
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -57,12 +77,18 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :routed]` event.
|
||||
|
||||
This event signals that the routing process has completed and a target has
|
||||
been chosen to serve the request.
|
||||
"""
|
||||
@type request_routed ::
|
||||
event(
|
||||
[:wayfarer, :request, :routed],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration()
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -74,12 +100,18 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :exception]` event.
|
||||
|
||||
This event signals that something went wrong while processing the event. You
|
||||
will likely still receive other events (eg `:stop`) for this span however.
|
||||
"""
|
||||
@type request_exception ::
|
||||
event(
|
||||
[:wayfarer, :request, :exception],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration()
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -94,13 +126,27 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :stop]` event.
|
||||
|
||||
This event signals that the request has completed.
|
||||
|
||||
The measurements will contain any incrementing counters accumulated during the
|
||||
course of the request.
|
||||
"""
|
||||
@type request_stop ::
|
||||
event(
|
||||
[:wayfarer, :request, :stop],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:status) => nil | 100..599
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
optional(:req_body_bytes) => non_neg_integer(),
|
||||
optional(:resp_body_bytes) => non_neg_integer(),
|
||||
optional(:client_frame_bytes) => non_neg_integer(),
|
||||
optional(:client_frame_count) => non_neg_integer(),
|
||||
optional(:server_frame_bytes) => non_neg_integer(),
|
||||
optional(:server_frame_count) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -116,33 +162,20 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :received_status]` event.
|
||||
|
||||
This event signals that an HTTP status code has been received from the
|
||||
upstream target.
|
||||
"""
|
||||
@type request_received_status ::
|
||||
event(
|
||||
[:wayfarer, :request, :received_status],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:status) => nil | 100..599
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@type request_req_body_sent ::
|
||||
event(
|
||||
[:wayfarer, :request, :req_body_sent],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
required(:req_body_bytes) => non_neg_integer()
|
||||
optional(:req_body_bytes) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -155,14 +188,54 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :req_body_chunk]` event.
|
||||
|
||||
This event is emitted while streaming the request body from the client to the
|
||||
target. Under the hood `Plug.Conn.read_body/2` is being called with the
|
||||
default options, meaning that each chunk is likely to be up to 8MB in size.
|
||||
|
||||
If there is no request body then this event will not be emitted and the
|
||||
`req_body_bytes` and `req_body_chunks` counters will both be set to zero for
|
||||
this request.
|
||||
"""
|
||||
@type request_req_body_chunk ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
required(:req_body_bytes) => non_neg_integer(),
|
||||
required(:req_body_chunks) => non_neg_integer(),
|
||||
required(:chunk_bytes) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
required(:listener) => listener(),
|
||||
required(:transport) => transport(),
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :resp_started]` event.
|
||||
|
||||
This event indicates that the HTTP status and headers have been received from
|
||||
the target and the response will now start being sent to the target.
|
||||
"""
|
||||
@type request_resp_started ::
|
||||
event(
|
||||
[:wayfarer, :request, :resp_started],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:status) => nil | 100..599,
|
||||
required(:req_body_bytes) => non_neg_integer()
|
||||
optional(:req_body_bytes) => non_neg_integer(),
|
||||
optional(:req_body_chunks) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -175,15 +248,24 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :resp_body_chunk]` event.
|
||||
|
||||
This event is emitted every time a chunk of response body is received from the
|
||||
target for streaming to the client. Under the hood, these are emitted every
|
||||
time `Mint.HTTP.stream/2` returns a data frame.
|
||||
"""
|
||||
@type request_resp_body_chunk ::
|
||||
event(
|
||||
[:wayfarer, :request, :resp_body_chunk],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:req_body_bytes) => non_neg_integer(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
optional(:req_body_bytes) => non_neg_integer(),
|
||||
optional(:req_body_chunks) => non_neg_integer(),
|
||||
required(:resp_body_bytes) => non_neg_integer(),
|
||||
required(:chunk_size) => non_neg_integer()
|
||||
required(:resp_body_chunks) => non_neg_integer(),
|
||||
required(:chunk_bytes) => non_neg_integer()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -196,12 +278,18 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :upgraded]` event.
|
||||
|
||||
This event is emitted when a client connection is upgraded to a WebSocket
|
||||
connection.
|
||||
"""
|
||||
@type request_upgraded ::
|
||||
event(
|
||||
[:wayfarer, :request, :upgraded],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration()
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:conn) => Conn.t(),
|
||||
|
@ -210,17 +298,22 @@ defmodule Wayfarer.Telemetry do
|
|||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599,
|
||||
required(:opcode) => :text | :binary | :ping | :pong | :close
|
||||
required(:status) => nil | 100..599
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :client_frame]` event.
|
||||
|
||||
This event is emitted any time a WebSocket frame is received from the client
|
||||
for transmission to the target.
|
||||
"""
|
||||
@type request_client_frame ::
|
||||
event(
|
||||
[:wayfarer, :request, :client_frame],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:frame_size) => non_neg_integer(),
|
||||
required(:client_frame_bytes) => non_neg_integer(),
|
||||
required(:client_frame_count) => non_neg_integer(),
|
||||
|
@ -234,16 +327,23 @@ defmodule Wayfarer.Telemetry do
|
|||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:target) => target(),
|
||||
required(:algorithm) => Selector.algorithm(),
|
||||
required(:status) => nil | 100..599
|
||||
required(:status) => nil | 100..599,
|
||||
required(:opcode) => :text | :binary | :ping | :pong | :close
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :request, :server_frame]` event.
|
||||
|
||||
This event is emitted any time a WebSocket frame is received from the target
|
||||
for transmission to the client.
|
||||
"""
|
||||
@type request_server_frame ::
|
||||
event(
|
||||
[:wayfarer, :request, :server_frame],
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:duration) => duration(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:frame_size) => non_neg_integer(),
|
||||
required(:server_frame_bytes) => non_neg_integer(),
|
||||
required(:server_frame_count) => non_neg_integer(),
|
||||
|
@ -261,19 +361,152 @@ defmodule Wayfarer.Telemetry do
|
|||
}
|
||||
)
|
||||
|
||||
@type request ::
|
||||
@typedoc """
|
||||
All the event types that make up the `[:wayfarer, :request, :*]` span.
|
||||
"""
|
||||
@type request_span ::
|
||||
request_start
|
||||
| request_routed
|
||||
| request_exception
|
||||
| request_stop
|
||||
| request_received_status
|
||||
| request_req_body_sent
|
||||
| request_req_body_chunk
|
||||
| request_resp_started
|
||||
| request_resp_body_chunk
|
||||
| request_upgraded
|
||||
| request_client_frame
|
||||
| request_server_frame
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :start]` event.
|
||||
|
||||
This event signals the start of a health check span.
|
||||
|
||||
You can use the `telemetry_span_context` metadata value to correlate
|
||||
subsequent events within the same span.
|
||||
"""
|
||||
@type health_check_start ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t()
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :connect]` event.
|
||||
|
||||
This event signals that the outgoing TCP connection has been made to the
|
||||
target.
|
||||
"""
|
||||
@type health_check_connect ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration()
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :request]` event.
|
||||
|
||||
This event signals that the HTTP or WebSocket request has been sent.
|
||||
"""
|
||||
@type health_check_request ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration()
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :pass]` event.
|
||||
|
||||
This event signals that the HTTP status code returned by the target matches
|
||||
one of the configured success codes.
|
||||
|
||||
It also signals the end of the span.
|
||||
"""
|
||||
@type health_check_pass ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration(),
|
||||
required(:status) => 100..599
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
The `[:wayfarer, :health_check, :fail]` event.
|
||||
|
||||
This event signals that the HTTP status code returned by the target did not
|
||||
match any of the configured success codes.
|
||||
|
||||
It also signals the end of the span.
|
||||
"""
|
||||
@type health_check_fail ::
|
||||
event(
|
||||
%{
|
||||
required(:monotonic_time) => monotonic_time(),
|
||||
required(:wallclock_time) => DateTime.t(),
|
||||
required(:duration) => duration(),
|
||||
required(:reason) => any
|
||||
},
|
||||
%{
|
||||
required(:telemetry_span_context) => telemetry_span_context(),
|
||||
required(:hostname) => String.t(),
|
||||
required(:uri) => URI.t(),
|
||||
required(:target) => target(),
|
||||
required(:method) => String.t(),
|
||||
required(:transport) => transport()
|
||||
}
|
||||
)
|
||||
|
||||
@typedoc """
|
||||
All the event types that make up the `[:wayfarer, :health_check, :*]` span.
|
||||
"""
|
||||
@type health_check_span ::
|
||||
health_check_start | health_check_request | health_check_pass | health_check_fail
|
||||
|
||||
@typedoc """
|
||||
All the spans that can be emitted by Wayfarer.
|
||||
"""
|
||||
@type spans :: request_span | health_check_span
|
||||
|
||||
@doc false
|
||||
@spec request_start(Conn.t()) :: Conn.t()
|
||||
def request_start(conn) do
|
||||
|
@ -332,10 +565,10 @@ defmodule Wayfarer.Telemetry do
|
|||
end
|
||||
|
||||
@doc false
|
||||
@spec request_req_body_sent(Conn.t()) :: Conn.t()
|
||||
def request_req_body_sent(conn) do
|
||||
@spec request_req_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
|
||||
def request_req_body_chunk(conn, chunk_bytes) do
|
||||
conn
|
||||
|> execute_request_span_event(:req_body_sent, %{}, %{})
|
||||
|> execute_request_span_event(:req_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
|
@ -349,9 +582,9 @@ defmodule Wayfarer.Telemetry do
|
|||
|
||||
@doc false
|
||||
@spec request_resp_body_chunk(Conn.t(), non_neg_integer()) :: Conn.t()
|
||||
def request_resp_body_chunk(conn, chunk_size) do
|
||||
def request_resp_body_chunk(conn, chunk_bytes) do
|
||||
conn
|
||||
|> execute_request_span_event(:resp_body_chunk, %{chunk_size: chunk_size}, %{})
|
||||
|> execute_request_span_event(:resp_body_chunk, %{chunk_bytes: chunk_bytes}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
|
@ -391,6 +624,36 @@ defmodule Wayfarer.Telemetry do
|
|||
end)
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_start(Check.state()) :: Check.state()
|
||||
def health_check_start(check) do
|
||||
execute_check_span_event(check, :start, %{}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_connect(Check.state(), atom) :: Check.state()
|
||||
def health_check_connect(check, transport) do
|
||||
execute_check_span_event(check, :connect, %{}, %{transport: transport})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_request(Check.state()) :: Check.state()
|
||||
def health_check_request(check) do
|
||||
execute_check_span_event(check, :request, %{}, %{})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_fail(Check.state(), any) :: Check.state()
|
||||
def health_check_fail(check, reason) do
|
||||
execute_check_span_event(check, :fail, %{}, %{reason: reason})
|
||||
end
|
||||
|
||||
@doc false
|
||||
@spec health_check_pass(Check.state(), 100..599) :: Check.state()
|
||||
def health_check_pass(check, status) do
|
||||
execute_check_span_event(check, :pass, %{}, %{status: status})
|
||||
end
|
||||
|
||||
defp update_metrics(conn, callback) do
|
||||
private =
|
||||
conn
|
||||
|
@ -410,6 +673,7 @@ defmodule Wayfarer.Telemetry do
|
|||
|
||||
defp execute_request_span_event(conn, event, measurements, metadata) do
|
||||
monotonic_time = System.monotonic_time()
|
||||
now = DateTime.utc_now()
|
||||
|
||||
private =
|
||||
conn
|
||||
|
@ -428,11 +692,13 @@ defmodule Wayfarer.Telemetry do
|
|||
if Map.has_key?(span_info, :start_time) do
|
||||
%{
|
||||
monotonic_time: monotonic_time,
|
||||
duration: monotonic_time - span_info.start_time
|
||||
duration: monotonic_time - span_info.start_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
else
|
||||
%{
|
||||
monotonic_time: monotonic_time
|
||||
monotonic_time: monotonic_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
end
|
||||
|> Map.merge(metrics)
|
||||
|
@ -457,4 +723,42 @@ defmodule Wayfarer.Telemetry do
|
|||
conn
|
||||
|> Conn.put_private(:wayfarer, private)
|
||||
end
|
||||
|
||||
defp execute_check_span_event(check, event, measurements, metadata) do
|
||||
monotonic_time = System.monotonic_time()
|
||||
now = DateTime.utc_now()
|
||||
span_info = Map.get(check, :span, %{})
|
||||
|
||||
metrics = Map.get(span_info, :metrics, %{})
|
||||
|
||||
measurements =
|
||||
if Map.has_key?(span_info, :start_time) do
|
||||
%{
|
||||
monotonic_time: monotonic_time,
|
||||
duration: monotonic_time - span_info.start_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
else
|
||||
%{
|
||||
monotonic_time: monotonic_time,
|
||||
wallclock_time: now
|
||||
}
|
||||
end
|
||||
|> Map.merge(metrics)
|
||||
|> Map.merge(measurements)
|
||||
|
||||
metadata =
|
||||
span_info
|
||||
|> Map.get(:metadata, %{})
|
||||
|> Map.merge(metadata)
|
||||
|
||||
:telemetry.execute([:wayfarer, :health_check, event], measurements, metadata)
|
||||
|
||||
span_info =
|
||||
span_info
|
||||
|> Map.put(:metadata, metadata)
|
||||
|> Map.put_new(:start_time, monotonic_time)
|
||||
|
||||
Map.put(check, :span, span_info)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue