diff --git a/documentation/dsls/DSL:-Reactor.md b/documentation/dsls/DSL:-Reactor.md index f90708a..1cad18a 100644 --- a/documentation/dsls/DSL:-Reactor.md +++ b/documentation/dsls/DSL:-Reactor.md @@ -699,7 +699,7 @@ Call functions before and after a group of steps. | Name | Type | Default | Docs | |------|------|---------|------| | [`before_all`](#reactor-group-before_all){: #reactor-group-before_all .spark-required} | `(any, any, any -> any) \| mfa` | | The before function. See `Reactor.Step.Group` for more information. | -| [`after_all`](#reactor-group-after_all){: #reactor-group-after_all .spark-required} | `(any, any, any -> any) \| mfa` | | The after function. See `Reactor.Step.Group` for more information. | +| [`after_all`](#reactor-group-after_all){: #reactor-group-after_all .spark-required} | `(any -> any) \| mfa` | | The after function. See `Reactor.Step.Group` for more information. | | [`allow_async?`](#reactor-group-allow_async?){: #reactor-group-allow_async? } | `boolean` | `true` | Whether the emitted steps should be allowed to run asynchronously. | diff --git a/lib/reactor/dsl/middleware.ex b/lib/reactor/dsl/middleware.ex index 70e0145..8594402 100644 --- a/lib/reactor/dsl/middleware.ex +++ b/lib/reactor/dsl/middleware.ex @@ -22,6 +22,7 @@ defmodule Reactor.Dsl.Middleware do target: __MODULE__, args: [:module], identifier: :module, + modules: [:module], schema: [ module: [ type: {:behaviour, Middleware}, diff --git a/lib/reactor/executor/hooks.ex b/lib/reactor/executor/hooks.ex index 7981f65..b808948 100644 --- a/lib/reactor/executor/hooks.ex +++ b/lib/reactor/executor/hooks.ex @@ -8,6 +8,15 @@ defmodule Reactor.Executor.Hooks do @doc "Run the init hooks collecting the new context as it goes" @spec init(Reactor.t(), Reactor.context()) :: {:ok, Reactor.context()} | {:error, any} def init(reactor, context) do + context = + Map.put(context, :__reactor__, %{ + id: reactor.id, + inputs: reactor.inputs, + middleware: reactor.middleware, + step_count: step_count(reactor), + initial_state: reactor.state + }) + Utils.reduce_while_ok(reactor.middleware, context, fn middleware, context -> if function_exported?(middleware, :init, 1) do middleware.init(context) @@ -94,4 +103,11 @@ defmodule Reactor.Executor.Hooks do :ok end + + defp step_count(reactor) when is_nil(reactor.plan), do: length(reactor.steps) + + defp step_count(reactor) do + vertices = Graph.num_vertices(reactor.plan) + length(reactor.steps) + vertices + end end diff --git a/lib/reactor/middleware/telemetry.ex b/lib/reactor/middleware/telemetry.ex new file mode 100644 index 0000000..c64c9ec --- /dev/null +++ b/lib/reactor/middleware/telemetry.ex @@ -0,0 +1,537 @@ +defmodule Reactor.Middleware.Telemetry do + @moduledoc """ + A Reactor middleware that emits telemetry events. + + The following events are emitted: + + * `[:reactor, :run, :start]` + * `[:reactor, :run, :stop]` + * `[:reactor, :step, :run, :start]` + * `[:reactor, :step, :run, :stop]` + * `[:reactor, :step, :process, :start]` + * `[:reactor, :step, :process, :stop]` + * `[:reactor, :step, :compensate, :start]` + * `[:reactor, :step, :compensate, :stop]` + * `[:reactor, :step, :undo, :start]` + * `[:reactor, :step, :undo, :stop]` + """ + + use Reactor.Middleware + + @doc false + @impl true + def init(context) do + start = System.monotonic_time() + + metadata = context.__reactor__ + + :telemetry.execute( + [:reactor, :run, :start], + %{system_time: System.system_time()}, + metadata + ) + + context = + context + |> Map.put(__MODULE__, %{ + start_time: start, + metadata: metadata + }) + + {:ok, context} + end + + @doc false + @impl true + def complete(result, %{__MODULE__ => %{start_time: start_time, metadata: metadata}}) do + end_time = System.monotonic_time() + duration = end_time - start_time + + metadata = + metadata + |> Map.merge(%{ + status: :ok, + result: result + }) + + :telemetry.execute( + [:reactor, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + + {:ok, result} + end + + @doc false + @impl true + def error(error_or_errors, %{__MODULE__ => %{start_time: start_time, metadata: metadata}}) do + end_time = System.monotonic_time() + duration = end_time - start_time + + metadata = + metadata + |> Map.merge(%{ + status: :error, + errors: error_or_errors + }) + + :telemetry.execute( + [:reactor, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + + :ok + end + + @doc false + @impl true + def halt(%{__MODULE__ => %{start_time: start_time, metadata: metadata}} = context) do + end_time = System.monotonic_time() + duration = end_time - start_time + + metadata = + metadata + |> Map.merge(%{ + status: :halt + }) + + :telemetry.execute( + [:reactor, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + + {:ok, context} + end + + @doc false + @impl true + def event({:process_start, pid}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + pid: pid + }) + + start_time = System.monotonic_time() + Process.put({__MODULE__, :process_start_time}, start_time) + + :telemetry.execute( + [:reactor, :step, :process, :start], + %{system_time: System.system_time()}, + metadata + ) + end + + def event({:process_terminate, pid}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + pid: pid + }) + + start_time = Process.delete({__MODULE__, :process_start_time}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :process, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:run_start, arguments}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + arguments: arguments + }) + + start_time = System.monotonic_time() + Process.put({__MODULE__, :step_start_time, step.name}, start_time) + + :telemetry.execute( + [:reactor, :step, :run, :start], + %{system_time: System.system_time()}, + metadata + ) + end + + def event({:run_complete, result}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: result, + status: :ok + }) + + start_time = Process.delete({__MODULE__, :step_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:run_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: errors, + status: :error + }) + + start_time = Process.delete({__MODULE__, :step_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:run_halt, value}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: value, + status: :halt + }) + + start_time = Process.delete({__MODULE__, :step_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:run_retry, value}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: value, + status: :retry + }) + + start_time = Process.delete({__MODULE__, :step_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event(:run_retry, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: nil, + status: :retry + }) + + start_time = Process.delete({__MODULE__, :step_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :run, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:compensate_start, reason}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: reason, + status: :compensate + }) + + start_time = System.monotonic_time() + Process.put({__MODULE__, :compensate_start_time, step.name}, start_time) + + :telemetry.execute( + [:reactor, :step, :compensate, :start], + %{system_time: System.system_time()}, + metadata + ) + end + + def event(:compensate_retry, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: nil, + status: :retry + }) + + start_time = Process.delete({__MODULE__, :compensate_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :compensate, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event(:compensate_complete, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: nil, + status: :ok + }) + + start_time = Process.delete({__MODULE__, :compensate_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :compensate, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:compensate_retry, value}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: value, + status: :retry + }) + + start_time = Process.delete({__MODULE__, :compensate_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :compensate, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:compensate_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: errors, + status: :error + }) + + start_time = Process.delete({__MODULE__, :compensate_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :compensate, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:compensate_continue, result}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: result, + status: :ok + }) + + start_time = Process.delete({__MODULE__, :compensate_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :compensate, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event(:undo_start, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: nil, + status: :undo + }) + + start_time = System.monotonic_time() + Process.put({__MODULE__, :undo_start_time, step.name}, start_time) + + :telemetry.execute( + [:reactor, :step, :undo, :start], + %{system_time: System.system_time()}, + metadata + ) + end + + def event({:undo_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: errors, + status: :error + }) + + start_time = Process.delete({__MODULE__, :undo_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :undo, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event({:undo_retry, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: errors, + status: :retry + }) + + start_time = Process.delete({__MODULE__, :undo_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :undo, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event(:undo_retry, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: nil, + status: :retry + }) + + start_time = Process.delete({__MODULE__, :undo_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :undo, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end + + def event(:undo_complete, step, %{__MODULE__ => %{metadata: metadata}}) do + metadata = + metadata + |> Map.merge(%{ + step: step, + result: nil, + status: :ok + }) + + start_time = Process.delete({__MODULE__, :undo_start_time, step.name}) + end_time = System.monotonic_time() + duration = end_time - start_time + + :telemetry.execute( + [:reactor, :step, :undo, :stop], + %{ + system_time: System.system_time(), + duration: duration + }, + metadata + ) + end +end diff --git a/mix.exs b/mix.exs index deb39af..1282250 100644 --- a/mix.exs +++ b/mix.exs @@ -40,6 +40,8 @@ defmodule Reactor.MixProject do groups_for_modules: [ DSL: ~r/^Reactor\.Dsl$/, Steps: ~r/^Reactor\.Step.*/, + Middleware: ~r/^Reactor\.Middleware.*/, + Builder: ~r/^Reactor\.Builder.*/, Internals: ~r/^Reactor\..*/ ], extra_section: "GUIDES", @@ -89,6 +91,7 @@ defmodule Reactor.MixProject do [ {:spark, "~> 1.0"}, {:libgraph, "~> 0.16"}, + {:telemetry, "~> 1.2"}, # Dev/Test dependencies {:credo, ">= 0.0.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index eaafd3e..0629632 100644 --- a/mix.lock +++ b/mix.lock @@ -24,6 +24,7 @@ "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, "sourceror": {:hex, :sourceror, "0.14.1", "c6fb848d55bd34362880da671debc56e77fd722fa13b4dcbeac89a8998fc8b09", [:mix], [], "hexpm", "8b488a219e4c4d7d9ff29d16346fd4a5858085ccdd010e509101e226bbfd8efc"}, "spark": {:hex, :spark, "1.1.54", "54dac39403a2960f738ba5d60678d20b30de7381fb51b787b6bcb6aeabb73d9d", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "abc9a67cfb60a97d2f3c7e270fa968a2ace94f389e2741d406239d237ec6dbb1"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, "yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"}, } diff --git a/test/reactor/middleware/telemetry_test.exs b/test/reactor/middleware/telemetry_test.exs new file mode 100644 index 0000000..7bf3d75 --- /dev/null +++ b/test/reactor/middleware/telemetry_test.exs @@ -0,0 +1,185 @@ +defmodule Reactor.Middleware.TelemetryTest do + @moduledoc false + use ExUnit.Case, async: true + + setup(context) do + table = + :ets.new(context.test, [:public, :ordered_set]) + + :telemetry.attach_many( + to_string(context.test), + [ + [:reactor, :run, :start], + [:reactor, :run, :stop], + [:reactor, :step, :process, :start], + [:reactor, :step, :process, :stop], + [:reactor, :step, :run, :start], + [:reactor, :step, :run, :stop], + [:reactor, :step, :compensate, :start], + [:reactor, :step, :compensate, :stop], + [:reactor, :step, :undo, :start], + [:reactor, :step, :undo, :stop] + ], + &__MODULE__.handler/4, + table + ) + + on_exit(fn -> + :telemetry.detach(to_string(context.test)) + end) + + {:ok, table: table} + end + + def handler(event, measurements, metadata, table) do + :ets.insert( + table, + {measurements.system_time, %{event: event, measurements: measurements, metadata: metadata}} + ) + end + + def get_events(table) do + Process.sleep(200) + + table + |> :ets.tab2list() + |> Enum.map(&elem(&1, 1)) + end + + test "step run events", %{table: table} do + defmodule SuccessfulStepReactor do + @moduledoc false + use Reactor + + middlewares do + middleware Reactor.Middleware.Telemetry + end + + step :noop do + argument :marty, value(:mcfly) + run fn _, _ -> {:ok, :noop} end + end + + return :noop + end + + start_time = System.monotonic_time() + + {:ok, :noop} = Reactor.run(SuccessfulStepReactor) + + expected_duration = + System.convert_time_unit(System.monotonic_time() - start_time, :native, :millisecond) + + events = get_events(table) + + assert [ + [:reactor, :run, :start], + [:reactor, :step, :process, :start], + [:reactor, :step, :run, :start], + [:reactor, :step, :run, :stop], + [:reactor, :step, :process, :stop], + [:reactor, :run, :stop] + ] = Enum.map(events, & &1.event) + + [run_start, _, _, step_stop, _, run_stop] = events + + assert run_start.metadata.id == SuccessfulStepReactor + assert run_start.metadata.inputs == [] + assert run_start.metadata.middleware == [Reactor.Middleware.Telemetry] + assert run_start.metadata.step_count == 1 + + assert run_stop.metadata.status == :ok + assert run_stop.metadata.result == :noop + + run_duration_in_millis = + System.convert_time_unit(run_stop.measurements.duration, :native, :millisecond) + + assert_in_delta run_duration_in_millis, expected_duration, 100 + assert run_duration_in_millis <= expected_duration + + step_duration_in_millis = + System.convert_time_unit(step_stop.measurements.duration, :native, :millisecond) + + assert step_duration_in_millis <= run_duration_in_millis + assert_in_delta step_duration_in_millis, expected_duration, 100 + end + + test "step compensation events", %{table: table} do + defmodule CompensationReactor do + @moduledoc false + use Reactor + + middlewares do + middleware Reactor.Middleware.Telemetry + end + + step :fail do + run fn _, _ -> raise "hell" end + compensate fn _, _ -> :ok end + end + end + + {:error, _} = Reactor.run(CompensationReactor) + + events = get_events(table) + + assert [ + [:reactor, :run, :start], + [:reactor, :step, :process, :start], + [:reactor, :step, :run, :start], + [:reactor, :step, :run, :stop], + [:reactor, :step, :compensate, :start], + [:reactor, :step, :compensate, :stop], + [:reactor, :step, :process, :stop], + [:reactor, :run, :stop] + ] = Enum.map(events, & &1.event) + end + + test "step undo events", %{table: table} do + defmodule UndoReactor do + @moduledoc false + use Reactor + + middlewares do + middleware Reactor.Middleware.Telemetry + end + + step :noop do + run fn _, _ -> + {:ok, :noop} + end + + undo fn _ -> + :ok + end + end + + step :fail do + wait_for :noop + + run fn _, _ -> + raise "hell" + end + end + end + + {:error, _} = Reactor.run(UndoReactor) + + events = get_events(table) + + assert [ + [:reactor, :run, :start], + [:reactor, :step, :process, :start], + [:reactor, :step, :run, :start], + [:reactor, :step, :run, :stop], + [:reactor, :step, :process, :stop], + [:reactor, :step, :process, :start], + [:reactor, :step, :run, :start], + [:reactor, :step, :run, :stop], + [:reactor, :step, :process, :stop], + [:reactor, :step, :undo, :start], + [:reactor, :step, :undo, :stop], + [:reactor, :run, :stop] + ] = Enum.map(events, & &1.event) + end +end