feat: Add telemetry middleware. (#93)

* improvement: don't incur compile-time dependencies on middleware.

* feat: Add a middleware which emits telemetry events about Reactor.
This commit is contained in:
James Harton 2024-02-29 09:23:51 +13:00 committed by GitHub
parent ee72c64af9
commit 37b9eda48e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 744 additions and 1 deletions

View file

@ -699,7 +699,7 @@ Call functions before and after a group of steps.
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`before_all`](#reactor-group-before_all){: #reactor-group-before_all .spark-required} | `(any, any, any -> any) \| mfa` | | The before function. See `Reactor.Step.Group` for more information. |
| [`after_all`](#reactor-group-after_all){: #reactor-group-after_all .spark-required} | `(any, any, any -> any) \| mfa` | | The after function. See `Reactor.Step.Group` for more information. |
| [`after_all`](#reactor-group-after_all){: #reactor-group-after_all .spark-required} | `(any -> any) \| mfa` | | The after function. See `Reactor.Step.Group` for more information. |
| [`allow_async?`](#reactor-group-allow_async?){: #reactor-group-allow_async? } | `boolean` | `true` | Whether the emitted steps should be allowed to run asynchronously. |

View file

@ -22,6 +22,7 @@ defmodule Reactor.Dsl.Middleware do
target: __MODULE__,
args: [:module],
identifier: :module,
modules: [:module],
schema: [
module: [
type: {:behaviour, Middleware},

View file

@ -8,6 +8,15 @@ defmodule Reactor.Executor.Hooks do
@doc "Run the init hooks collecting the new context as it goes"
@spec init(Reactor.t(), Reactor.context()) :: {:ok, Reactor.context()} | {:error, any}
def init(reactor, context) do
context =
Map.put(context, :__reactor__, %{
id: reactor.id,
inputs: reactor.inputs,
middleware: reactor.middleware,
step_count: step_count(reactor),
initial_state: reactor.state
})
Utils.reduce_while_ok(reactor.middleware, context, fn middleware, context ->
if function_exported?(middleware, :init, 1) do
middleware.init(context)
@ -94,4 +103,11 @@ defmodule Reactor.Executor.Hooks do
:ok
end
defp step_count(reactor) when is_nil(reactor.plan), do: length(reactor.steps)
defp step_count(reactor) do
vertices = Graph.num_vertices(reactor.plan)
length(reactor.steps) + vertices
end
end

View file

@ -0,0 +1,537 @@
defmodule Reactor.Middleware.Telemetry do
@moduledoc """
A Reactor middleware that emits telemetry events.
The following events are emitted:
* `[:reactor, :run, :start]`
* `[:reactor, :run, :stop]`
* `[:reactor, :step, :run, :start]`
* `[:reactor, :step, :run, :stop]`
* `[:reactor, :step, :process, :start]`
* `[:reactor, :step, :process, :stop]`
* `[:reactor, :step, :compensate, :start]`
* `[:reactor, :step, :compensate, :stop]`
* `[:reactor, :step, :undo, :start]`
* `[:reactor, :step, :undo, :stop]`
"""
use Reactor.Middleware
@doc false
@impl true
def init(context) do
start = System.monotonic_time()
metadata = context.__reactor__
:telemetry.execute(
[:reactor, :run, :start],
%{system_time: System.system_time()},
metadata
)
context =
context
|> Map.put(__MODULE__, %{
start_time: start,
metadata: metadata
})
{:ok, context}
end
@doc false
@impl true
def complete(result, %{__MODULE__ => %{start_time: start_time, metadata: metadata}}) do
end_time = System.monotonic_time()
duration = end_time - start_time
metadata =
metadata
|> Map.merge(%{
status: :ok,
result: result
})
:telemetry.execute(
[:reactor, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
{:ok, result}
end
@doc false
@impl true
def error(error_or_errors, %{__MODULE__ => %{start_time: start_time, metadata: metadata}}) do
end_time = System.monotonic_time()
duration = end_time - start_time
metadata =
metadata
|> Map.merge(%{
status: :error,
errors: error_or_errors
})
:telemetry.execute(
[:reactor, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
:ok
end
@doc false
@impl true
def halt(%{__MODULE__ => %{start_time: start_time, metadata: metadata}} = context) do
end_time = System.monotonic_time()
duration = end_time - start_time
metadata =
metadata
|> Map.merge(%{
status: :halt
})
:telemetry.execute(
[:reactor, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
{:ok, context}
end
@doc false
@impl true
def event({:process_start, pid}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
pid: pid
})
start_time = System.monotonic_time()
Process.put({__MODULE__, :process_start_time}, start_time)
:telemetry.execute(
[:reactor, :step, :process, :start],
%{system_time: System.system_time()},
metadata
)
end
def event({:process_terminate, pid}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
pid: pid
})
start_time = Process.delete({__MODULE__, :process_start_time})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :process, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:run_start, arguments}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
arguments: arguments
})
start_time = System.monotonic_time()
Process.put({__MODULE__, :step_start_time, step.name}, start_time)
:telemetry.execute(
[:reactor, :step, :run, :start],
%{system_time: System.system_time()},
metadata
)
end
def event({:run_complete, result}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: result,
status: :ok
})
start_time = Process.delete({__MODULE__, :step_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:run_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: errors,
status: :error
})
start_time = Process.delete({__MODULE__, :step_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:run_halt, value}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: value,
status: :halt
})
start_time = Process.delete({__MODULE__, :step_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:run_retry, value}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: value,
status: :retry
})
start_time = Process.delete({__MODULE__, :step_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event(:run_retry, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: nil,
status: :retry
})
start_time = Process.delete({__MODULE__, :step_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :run, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:compensate_start, reason}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: reason,
status: :compensate
})
start_time = System.monotonic_time()
Process.put({__MODULE__, :compensate_start_time, step.name}, start_time)
:telemetry.execute(
[:reactor, :step, :compensate, :start],
%{system_time: System.system_time()},
metadata
)
end
def event(:compensate_retry, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: nil,
status: :retry
})
start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :compensate, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event(:compensate_complete, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: nil,
status: :ok
})
start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :compensate, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:compensate_retry, value}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: value,
status: :retry
})
start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :compensate, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:compensate_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: errors,
status: :error
})
start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :compensate, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:compensate_continue, result}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: result,
status: :ok
})
start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :compensate, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event(:undo_start, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: nil,
status: :undo
})
start_time = System.monotonic_time()
Process.put({__MODULE__, :undo_start_time, step.name}, start_time)
:telemetry.execute(
[:reactor, :step, :undo, :start],
%{system_time: System.system_time()},
metadata
)
end
def event({:undo_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: errors,
status: :error
})
start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :undo, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event({:undo_retry, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: errors,
status: :retry
})
start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :undo, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event(:undo_retry, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: nil,
status: :retry
})
start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :undo, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
def event(:undo_complete, step, %{__MODULE__ => %{metadata: metadata}}) do
metadata =
metadata
|> Map.merge(%{
step: step,
result: nil,
status: :ok
})
start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
end_time = System.monotonic_time()
duration = end_time - start_time
:telemetry.execute(
[:reactor, :step, :undo, :stop],
%{
system_time: System.system_time(),
duration: duration
},
metadata
)
end
end

View file

@ -40,6 +40,8 @@ defmodule Reactor.MixProject do
groups_for_modules: [
DSL: ~r/^Reactor\.Dsl$/,
Steps: ~r/^Reactor\.Step.*/,
Middleware: ~r/^Reactor\.Middleware.*/,
Builder: ~r/^Reactor\.Builder.*/,
Internals: ~r/^Reactor\..*/
],
extra_section: "GUIDES",
@ -89,6 +91,7 @@ defmodule Reactor.MixProject do
[
{:spark, "~> 1.0"},
{:libgraph, "~> 0.16"},
{:telemetry, "~> 1.2"},
# Dev/Test dependencies
{:credo, ">= 0.0.0", only: [:dev, :test], runtime: false},

View file

@ -24,6 +24,7 @@
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
"sourceror": {:hex, :sourceror, "0.14.1", "c6fb848d55bd34362880da671debc56e77fd722fa13b4dcbeac89a8998fc8b09", [:mix], [], "hexpm", "8b488a219e4c4d7d9ff29d16346fd4a5858085ccdd010e509101e226bbfd8efc"},
"spark": {:hex, :spark, "1.1.54", "54dac39403a2960f738ba5d60678d20b30de7381fb51b787b6bcb6aeabb73d9d", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "abc9a67cfb60a97d2f3c7e270fa968a2ace94f389e2741d406239d237ec6dbb1"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
"yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"},
}

View file

@ -0,0 +1,185 @@
defmodule Reactor.Middleware.TelemetryTest do
@moduledoc false
use ExUnit.Case, async: true
setup(context) do
table =
:ets.new(context.test, [:public, :ordered_set])
:telemetry.attach_many(
to_string(context.test),
[
[:reactor, :run, :start],
[:reactor, :run, :stop],
[:reactor, :step, :process, :start],
[:reactor, :step, :process, :stop],
[:reactor, :step, :run, :start],
[:reactor, :step, :run, :stop],
[:reactor, :step, :compensate, :start],
[:reactor, :step, :compensate, :stop],
[:reactor, :step, :undo, :start],
[:reactor, :step, :undo, :stop]
],
&__MODULE__.handler/4,
table
)
on_exit(fn ->
:telemetry.detach(to_string(context.test))
end)
{:ok, table: table}
end
def handler(event, measurements, metadata, table) do
:ets.insert(
table,
{measurements.system_time, %{event: event, measurements: measurements, metadata: metadata}}
)
end
def get_events(table) do
Process.sleep(200)
table
|> :ets.tab2list()
|> Enum.map(&elem(&1, 1))
end
test "step run events", %{table: table} do
defmodule SuccessfulStepReactor do
@moduledoc false
use Reactor
middlewares do
middleware Reactor.Middleware.Telemetry
end
step :noop do
argument :marty, value(:mcfly)
run fn _, _ -> {:ok, :noop} end
end
return :noop
end
start_time = System.monotonic_time()
{:ok, :noop} = Reactor.run(SuccessfulStepReactor)
expected_duration =
System.convert_time_unit(System.monotonic_time() - start_time, :native, :millisecond)
events = get_events(table)
assert [
[:reactor, :run, :start],
[:reactor, :step, :process, :start],
[:reactor, :step, :run, :start],
[:reactor, :step, :run, :stop],
[:reactor, :step, :process, :stop],
[:reactor, :run, :stop]
] = Enum.map(events, & &1.event)
[run_start, _, _, step_stop, _, run_stop] = events
assert run_start.metadata.id == SuccessfulStepReactor
assert run_start.metadata.inputs == []
assert run_start.metadata.middleware == [Reactor.Middleware.Telemetry]
assert run_start.metadata.step_count == 1
assert run_stop.metadata.status == :ok
assert run_stop.metadata.result == :noop
run_duration_in_millis =
System.convert_time_unit(run_stop.measurements.duration, :native, :millisecond)
assert_in_delta run_duration_in_millis, expected_duration, 100
assert run_duration_in_millis <= expected_duration
step_duration_in_millis =
System.convert_time_unit(step_stop.measurements.duration, :native, :millisecond)
assert step_duration_in_millis <= run_duration_in_millis
assert_in_delta step_duration_in_millis, expected_duration, 100
end
test "step compensation events", %{table: table} do
defmodule CompensationReactor do
@moduledoc false
use Reactor
middlewares do
middleware Reactor.Middleware.Telemetry
end
step :fail do
run fn _, _ -> raise "hell" end
compensate fn _, _ -> :ok end
end
end
{:error, _} = Reactor.run(CompensationReactor)
events = get_events(table)
assert [
[:reactor, :run, :start],
[:reactor, :step, :process, :start],
[:reactor, :step, :run, :start],
[:reactor, :step, :run, :stop],
[:reactor, :step, :compensate, :start],
[:reactor, :step, :compensate, :stop],
[:reactor, :step, :process, :stop],
[:reactor, :run, :stop]
] = Enum.map(events, & &1.event)
end
test "step undo events", %{table: table} do
defmodule UndoReactor do
@moduledoc false
use Reactor
middlewares do
middleware Reactor.Middleware.Telemetry
end
step :noop do
run fn _, _ ->
{:ok, :noop}
end
undo fn _ ->
:ok
end
end
step :fail do
wait_for :noop
run fn _, _ ->
raise "hell"
end
end
end
{:error, _} = Reactor.run(UndoReactor)
events = get_events(table)
assert [
[:reactor, :run, :start],
[:reactor, :step, :process, :start],
[:reactor, :step, :run, :start],
[:reactor, :step, :run, :stop],
[:reactor, :step, :process, :stop],
[:reactor, :step, :process, :start],
[:reactor, :step, :run, :start],
[:reactor, :step, :run, :stop],
[:reactor, :step, :process, :stop],
[:reactor, :step, :undo, :start],
[:reactor, :step, :undo, :stop],
[:reactor, :run, :stop]
] = Enum.map(events, & &1.event)
end
end