improvement!: Use Splode for managing errors. (#97)

This commit is contained in:
James Harton 2024-03-18 13:05:09 +13:00 committed by GitHub
parent 92c7f21eac
commit 27a01e7862
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
51 changed files with 891 additions and 195 deletions

View file

@ -1,8 +1,13 @@
{ {
"cSpell.words": [ "cSpell.words": [
"andalso",
"backoff", "backoff",
"casted", "casted",
"Desugars",
"mappish", "mappish",
"Planable" "noreply",
"Planable",
"splode",
"Splode"
] ]
} }

View file

@ -143,7 +143,8 @@ defmodule Reactor do
if Spark.Dsl.is?(reactor, Reactor) do if Spark.Dsl.is?(reactor, Reactor) do
run(reactor.reactor(), inputs, context, options) run(reactor.reactor(), inputs, context, options)
else else
{:error, "Module `#{inspect(reactor)}` is not a Reactor module"} {:error,
ArgumentError.exception(message: "Module `#{inspect(reactor)}` is not a Reactor module")}
end end
end end

View file

@ -11,7 +11,7 @@ defmodule Reactor.Builder.Compose do
import Reactor, only: :macros import Reactor, only: :macros
import Reactor.Argument, only: :macros import Reactor.Argument, only: :macros
import Reactor.Utils import Reactor.Utils
alias Reactor.{Argument, Builder, Error.ComposeError, Step} alias Reactor.{Argument, Builder, Error.Internal.ComposeError, Step}
@doc """ @doc """
Compose another Reactor inside this one. Compose another Reactor inside this one.

24
lib/reactor/error.ex Normal file
View file

@ -0,0 +1,24 @@
defmodule Reactor.Error do
@moduledoc """
Uses `splode` to manage various classes of error.
"""
use Splode,
error_classes: [
reactor: Reactor.Error.Internal,
invalid: Reactor.Error.Invalid,
unknown: Reactor.Error.Unknown,
validation: Reactor.Error.Validation
],
unknown_error: Reactor.Error.Unknown.UnknownError
@doc "Convenience wrapper around `use Splode.Error`"
@spec __using__(keyword) :: Macro.output()
defmacro __using__(opts) do
quote do
use Splode.Error, unquote(opts)
import Reactor.Error.Utils
import Reactor.Utils
end
end
end

View file

@ -0,0 +1,13 @@
defmodule Reactor.Error.Internal do
@moduledoc """
The [Splode error class](e:splode:get-started-with-splode.html#error-classes)
for Reactor-caused errors.
"""
use Reactor.Error, fields: [:errors], class: :reactor
@doc false
@impl true
def splode_message(%{errors: errors}) do
Splode.ErrorClass.error_messages(errors)
end
end

View file

@ -1,20 +1,18 @@
defmodule Reactor.Error.ComposeError do defmodule Reactor.Error.Internal.ComposeError do
@moduledoc """ @moduledoc """
An error used when attempting to compose to Reactors together. This error is returned when two Reactors cannot be composed together.
""" """
defexception [:outer_reactor, :inner_reactor, :message, :arguments]
import Reactor.Utils use Reactor.Error,
fields: [:arguments, :inner_reactor, :message, :outer_reactor],
class: :reactor
@doc false @doc false
@impl true @impl true
def exception(attrs), do: struct(__MODULE__, attrs) def splode_message(error) do
@doc false
@impl true
def message(error) do
[ [
""" """
# Unable to compose Reactors # Reactor Compose Error
#{error.message} #{error.message}
""" """

View file

@ -0,0 +1,33 @@
defmodule Reactor.Error.Internal.MissingReturnResultError do
@moduledoc """
This error is returned when the Reactor's return name doesn't match any of the
known step results.
"""
use Reactor.Error,
fields: [:reactor],
class: :reactor
@doc false
@impl true
def splode_message(error) do
intermediate_keys =
error.reactor.intermediate_values
|> Map.keys()
known_results =
intermediate_keys
|> Enum.map_join("\n", &" * `#{inspect(&1)}`")
"""
# Missing Return Result Error
The Reactor was asked to return a result named `#{inspect(error.reactor.return)}`, however an intermediate result with that name is missing.
#{did_you_mean?(error.reactor.return, intermediate_keys)}
## Intermediate results:
#{known_results}
"""
end
end

View file

@ -1,20 +1,20 @@
defmodule Reactor.Error.PlanError do defmodule Reactor.Error.Internal.PlanError do
@moduledoc """ @moduledoc """
An error thrown during the planning of a Reactor. This error is returned when the step graph cannot be built.
""" """
defexception [:reactor, :graph, :step, :message]
import Reactor.Utils use Reactor.Error,
fields: [:graph, :message, :reactor, :step],
class: :reactor
@doc false @doc false
@impl true @impl true
def exception(attrs), do: struct(__MODULE__, attrs) def splode_message(error) do
@doc false
@impl true
def message(error) do
[ [
""" """
# Unable to plan Reactor # Reactor Plan Error
An error occurred while building or updating the Reactor execution graph.
#{error.message} #{error.message}
""" """

View file

@ -0,0 +1,14 @@
defmodule Reactor.Error.Invalid do
@moduledoc """
The [Splode error class](e:splode:get-started-with-splode.html#error-classes)
for user-caused errors.
"""
use Reactor.Error, fields: [:errors], class: :unknown
@doc false
@impl true
def splode_message(%{errors: errors}) do
Splode.ErrorClass.error_messages(errors)
end
end

View file

@ -0,0 +1,38 @@
defmodule Reactor.Error.Invalid.ArgumentSubpathError do
@moduledoc """
This error is returned when an argument cannot have a subpath applied to it.
"""
use Reactor.Error,
fields: [:argument, :culprit, :culprit_key, :culprit_path, :message, :step, :value],
class: :invalid
@doc false
@impl true
def splode_message(error) do
"""
# Argument Subpath Error
The step `#{inspect(error.step.name)}` is expecting the value for argument `#{inspect(error.argument.name)}` to be able to be subpathed via `#{inspect(error.argument.subpath)}` however #{error.reason}.
## `step`:
#{inspect(error.step)}
## `argument`:
#{inspect(error.argument)}
## `value`:
#{inspect(error.value)}
## `culprit`:
#{inspect(error.culprit)}
## `culprit_path`:
#{inspect(error.culprit_path)}
"""
end
end

View file

@ -0,0 +1,28 @@
defmodule Reactor.Error.Invalid.CompensateStepError do
@moduledoc """
This error is returned when an error occurs during step compensation.
Its `error` key will contain the error that was raised or returned by the
`c:Step.compensate/4` callback.
"""
use Reactor.Error, fields: [:error, :reactor, :step], class: :invalid
@doc false
@impl true
def splode_message(error) do
"""
# Compensate Step Error
An error occurred while attempting to compensate the `#{inspect(error.step.name)}` step.
## `step`:
#{inspect(error.step)}
## `error`:
#{describe_error(error.error)}
"""
end
end

View file

@ -0,0 +1,32 @@
defmodule Reactor.Error.Invalid.MissingArgumentError do
@moduledoc """
This error is returned when an expected argument is not passed to a step.
"""
use Reactor.Error, fields: [:argument, :arguments, :step], class: :invalid
@doc false
@impl true
def splode_message(error) do
"""
# Missing Argument Error
The step `#{inspect(error.step.name)}` run function is expecting to be passed the `#{inspect(error.argument)}` argument, but it is not present.
#{did_you_mean?(error.argument, Map.keys(error.arguments))}
## `step`:
#{inspect(error.step)}
## `argument`:
#{inspect(error.argument)}
## Arguments passed:
```
#{inspect(error.arguments)}
```
"""
end
end

View file

@ -0,0 +1,34 @@
defmodule Reactor.Error.Invalid.MissingInputError do
@moduledoc """
Error raised when a required Reactor input is missing.
"""
use Reactor.Error, fields: [:argument, :reactor, :step], class: :invalid
@doc false
@impl true
def splode_message(error) do
inputs =
error.reactor.inputs
|> Enum.map_join("\n", &" * `#{inspect(&1)}`")
"""
# Missing Input Error
The step `#{inspect(error.step.name)}` is expecting the Reactor to have an input named `#{inspect(error.argument.source.name)}` however it is not present.
#{did_you_mean?(error.argument.source.name, error.reactor.inputs)}
## `step`:
#{inspect(error.step)}
## `argument`:
#{inspect(error.argument)}
## Available inputs:
#{inputs}
"""
end
end

View file

@ -0,0 +1,34 @@
defmodule Reactor.Error.Invalid.MissingResultError do
@moduledoc """
This error is returned when a step attempts to consume an intermediate result
which is not present in the Reactor state.
"""
use Reactor.Error, fields: [:argument, :reactor, :step], class: :invalid
@doc false
@impl true
def splode_message(error) do
inputs =
error.reactor.inputs
|> Enum.map_join("\n", &" * `#{inspect(&1)}`")
"""
# Missing Result Error
The step `#{inspect(error.step.name)}` is expecting the Reactor to have an existing result named `#{inspect(error.argument.source.name)}` however it is not present.
#{did_you_mean?(error.argument.source.name, Map.keys(error.reactor.intermediate_results))}
## `step`:
#{inspect(error.step)}
## `argument`:
#{inspect(error.argument)}
## Available inputs:
#{inputs}
"""
end
end

View file

@ -0,0 +1,25 @@
defmodule Reactor.Error.Invalid.RetriesExceededError do
@moduledoc """
This error is returned when a step attempts to retry more times that is
allowed.
"""
use Reactor.Error, fields: [:retry_count, :step], class: :invalid
@doc false
@impl true
def splode_message(error) do
"""
# Retries Exceeded Error
Maximum number of retries exceeded executing step `#{inspect(error.step.name)}`.
## `retry_count`:
#{inspect(error.retry_count)}
## `step`:
#{inspect(error.step)}
"""
end
end

View file

@ -0,0 +1,27 @@
defmodule Reactor.Error.Invalid.RunStepError do
@moduledoc """
This error is returned when an error occurs during step execution.
Its `error` key will contain the error that was raised or returned by the
`c:Step.run/3` callback.
"""
use Reactor.Error, fields: [:error, :step], class: :invalid
@doc false
@impl true
def splode_message(error) do
"""
# Run Step Error
An error occurred while attempting to run the `#{inspect(error.step.name)}` step.
## `step`:
#{inspect(error.step)}
## `error`:
#{describe_error(error.error)}
"""
end
end

View file

@ -0,0 +1,41 @@
defmodule Reactor.Error.Invalid.TransformError do
@moduledoc """
An error which occurs when building and running transforms.
"""
use Reactor.Error, fields: [:input, :output, :error], class: :invalid
@doc false
@impl true
def splode_message(error) do
message = """
# Transform Error
An error occurred while trying to transform a value.
## `input`:
`#{inspect(error.input)}`
"""
message =
if error.output do
"""
#{message}
## `output`:
`#{inspect(error.output)}`
"""
else
message
end
"""
#{message}
## `error`:
#{describe_error(error.error)}
"""
end
end

View file

@ -0,0 +1,25 @@
defmodule Reactor.Error.Invalid.UndoRetriesExceededError do
@moduledoc """
An error used when a step runs out of retry events and no other error is
thrown.
"""
use Reactor.Error, fields: [:step, :retry_count], class: :invalid
@doc false
@impl true
def splode_message(error) do
"""
# Undo Retries Exceeded Error
Maximum number of retries exceeded while attempting to undo step `#{inspect(error.step.name)}`.
## `retry_count`:
#{inspect(error.retry_count)}
## `step`:
#{inspect(error.step)}
"""
end
end

View file

@ -0,0 +1,27 @@
defmodule Reactor.Error.Invalid.UndoStepError do
@moduledoc """
This error is returned when an error occurs when attempting to undo step execution.
Its `error` key will contain the error that was raised or returned by the
`c:Step.undo/4` callback.
"""
use Reactor.Error, fields: [:step, :error], class: :invalid
@doc false
@impl true
def splode_message(error) do
"""
# Undo Step Error
An error occurred while attempting to undo the step `#{inspect(error.step.name)}`.
## `step`:
#{inspect(error.step)}
## `error`:
#{describe_error(error.error)}
"""
end
end

View file

@ -0,0 +1,14 @@
defmodule Reactor.Error.Unknown do
@moduledoc """
The [Splode error class](e:splode:get-started-with-splode.html#error-classes)
for unknown errors.
"""
use Reactor.Error, fields: [:errors], class: :unknown
@doc false
@impl true
def splode_message(%{errors: errors}) do
Splode.ErrorClass.error_messages(errors)
end
end

View file

@ -0,0 +1,21 @@
defmodule Reactor.Error.Unknown.UnknownError do
@moduledoc """
An error used to wrap unknown errors.
"""
use Reactor.Error, fields: [:error], class: :unknown
@doc false
@impl true
def splode_message(error) do
"""
# Unknown Error
An unknown error occurred.
## `error`:
#{describe_error(error.error)}
"""
end
end

View file

@ -0,0 +1,53 @@
defmodule Reactor.Error.Utils do
@moduledoc false
@doc "Attempt to describe an error"
@spec describe_error(any) :: String.t()
def describe_error(error) when is_exception(error), do: Exception.message(error)
def describe_error(error) when is_binary(error) do
if String.printable?(error) do
error
else
inspect_error(error)
end
end
def describe_error(error), do: inspect_error(error)
@doc "Helper function to provide suggestions in error messages"
@spec did_you_mean?(any, Enumerable.t(any)) :: nil | String.t()
def did_you_mean?(requested, possible) do
best_match =
possible
|> Enum.map(&inspect/1)
|> Enum.max_by(
&String.jaro_distance(&1, inspect(requested)),
&>=/2,
fn ->
nil
end
)
if best_match do
"Did you mean `#{inspect(best_match)}`?"
end
end
defp inspect_error(error) do
inspected =
error
|> inspect()
|> String.trim()
if inspected =~ ~r/[\r\n\v]/ do
"""
```
#{inspected}
```
"""
else
"`#{inspected}`"
end
end
end

View file

@ -0,0 +1,14 @@
defmodule Reactor.Error.Validation do
@moduledoc """
The [Splode error class](e:splode:get-started-with-splode.html#error-classes)
for validation errors.
"""
use Reactor.Error, fields: [:errors], class: :validation
@doc false
@impl true
def splode_message(%{errors: errors}) do
Splode.ErrorClass.error_messages(errors)
end
end

View file

@ -0,0 +1,22 @@
defmodule Reactor.Error.Validation.MissingReturnError do
@moduledoc """
An error returned when a Reactor cannot be validated because of a missing
return value.
"""
use Reactor.Error,
fields: [:reactor],
class: :validation
@doc false
@impl true
def splode_message(_error) do
"""
# Missing Return Error
The Reactor does not have a named return value.
You can set one using `Reactor.Builder.return/2` or by setting the `d:Reactor.Dsl.reactor.return` DSL option.
"""
end
end

View file

@ -0,0 +1,39 @@
defmodule Reactor.Error.Validation.StateError do
@moduledoc """
An error returned when a Reactor is in an unexpected state.
"""
use Reactor.Error,
fields: [:reactor, :state, :expected],
class: :validation
@doc false
@impl true
def splode_message(error) do
"""
# Reactor State Error
#{state_message(error)}
"""
end
defp state_message(%{expected: [], state: state}),
do: "Reactor is in an invalid state: `#{inspect(state)}`"
defp state_message(%{expected: [expected], state: state}),
do: "Reactor is in an invalid state: `#{inspect(state)}`, expected: `#{inspect(expected)}`"
defp state_message(error) do
valid_states =
error.expected
|> Enum.map_join("\n", &" * `#{inspect(&1)}`")
"""
Reactor is in an invalid state: `#{inspect(error.state)}`
Expected states:
#{valid_states}
"""
end
end

View file

@ -1,27 +0,0 @@
defmodule Reactor.Error.RetriesExceededError do
@moduledoc """
An error used when a step runs out of retry events and no other error is
thrown.
"""
defexception [:step, :retry_count]
@doc false
@impl true
def exception(attrs), do: struct(__MODULE__, attrs)
@doc false
@impl true
def message(error) do
"""
# Maximum number of retries exceeded executing step.
## `retry_count`:
#{inspect(error.retry_count)}
## `step`:
#{inspect(error.step)}
"""
end
end

View file

@ -1,14 +0,0 @@
defmodule Reactor.Error.TransformError do
@moduledoc """
An error which occurs when building and running transforms.
"""
defexception input: nil, output: nil, message: nil
@doc false
@impl true
def exception(attrs), do: struct(__MODULE__, attrs)
@doc false
@impl true
def message(error), do: error.message
end

View file

@ -35,8 +35,15 @@ defmodule Reactor.Executor do
3. When a step or compensation asks for a retry then the step is placed back 3. When a step or compensation asks for a retry then the step is placed back
in the graph to be run again next iteration. in the graph to be run again next iteration.
""" """
alias Reactor.Executor.ConcurrencyTracker alias Reactor.{
alias Reactor.{Executor, Planner, Step} Error.Internal.MissingReturnResultError,
Error.Validation.MissingReturnError,
Error.Validation.StateError,
Executor,
Executor.ConcurrencyTracker,
Planner,
Step
}
@doc """ @doc """
Run a reactor. Run a reactor.
@ -51,7 +58,7 @@ defmodule Reactor.Executor do
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ []) def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
def run(reactor, _inputs, _context, _options) when is_nil(reactor.return), def run(reactor, _inputs, _context, _options) when is_nil(reactor.return),
do: {:error, ArgumentError.exception("`reactor` has no return value")} do: {:error, MissingReturnError.exception(reactor: reactor)}
def run(reactor, inputs, context, options) when reactor.state in ~w[pending halted]a do def run(reactor, inputs, context, options) when reactor.state in ~w[pending halted]a do
with {:ok, context} <- Executor.Hooks.init(reactor, context), with {:ok, context} <- Executor.Hooks.init(reactor, context),
@ -60,8 +67,14 @@ defmodule Reactor.Executor do
end end
end end
def run(_reactor, _inputs, _context, _options), def run(reactor, _inputs, _context, _options),
do: {:error, ArgumentError.exception("`reactor` is not in `pending` or `halted` state")} do:
{:error,
StateError.exception(
reactor: reactor,
state: reactor.state,
expected: ~w[pending halted]a
)}
defp execute(reactor, state) when state.max_iterations == 0 do defp execute(reactor, state) when state.max_iterations == 0 do
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state) {reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
@ -219,7 +232,8 @@ defmodule Reactor.Executor do
end end
defp handle_undo(reactor, state, []) do defp handle_undo(reactor, state, []) do
Executor.Hooks.error(reactor, state.errors, reactor.context) error = Reactor.Error.to_class(state.errors)
Executor.Hooks.error(reactor, error, reactor.context)
end end
defp handle_undo(reactor, state, [{step, value} | tail]) do defp handle_undo(reactor, state, [{step, value} | tail]) do
@ -234,8 +248,11 @@ defmodule Reactor.Executor do
{:ok, value} <- Map.fetch(reactor.intermediate_results, reactor.return) do {:ok, value} <- Map.fetch(reactor.intermediate_results, reactor.return) do
{:ok, value} {:ok, value}
else else
:error -> {:error, "Unable to find result for `#{inspect(reactor.return)}` step"} :error ->
n when is_integer(n) -> {:continue, reactor} {:error, MissingReturnResultError.exception(reactor: reactor)}
n when is_integer(n) ->
{:continue, reactor}
end end
end end

View file

@ -3,8 +3,9 @@ defmodule Reactor.Executor.Async do
Handle the asynchronous execution of a batch of steps, along with any Handle the asynchronous execution of a batch of steps, along with any
mutations to the reactor or execution state. mutations to the reactor or execution state.
""" """
alias Reactor.Error.Invalid.RetriesExceededError, as: RetriesExceededError
alias Reactor.Executor.ConcurrencyTracker alias Reactor.Executor.ConcurrencyTracker
alias Reactor.{Error, Executor, Step} alias Reactor.{Executor, Step}
require Logger require Logger
@doc """ @doc """
@ -276,7 +277,7 @@ defmodule Reactor.Executor.Async do
reason reason
{step, :retry} -> {step, :retry} ->
Error.RetriesExceededError.exception( RetriesExceededError.exception(
step: step, step: step,
retry_count: Map.get(state.retries, step.ref) retry_count: Map.get(state.retries, step.ref)
) )

View file

@ -2,7 +2,20 @@ defmodule Reactor.Executor.StepRunner do
@moduledoc """ @moduledoc """
Run an individual step, including compensation if possible. Run an individual step, including compensation if possible.
""" """
alias Reactor.{Executor.ConcurrencyTracker, Executor.Hooks, Executor.State, Step} alias Reactor.{
Error.Invalid.ArgumentSubpathError,
Error.Invalid.CompensateStepError,
Error.Invalid.MissingInputError,
Error.Invalid.MissingResultError,
Error.Invalid.RunStepError,
Error.Invalid.UndoRetriesExceededError,
Error.Invalid.UndoStepError,
Executor.ConcurrencyTracker,
Executor.Hooks,
Executor.State,
Step
}
import Reactor.Utils import Reactor.Utils
import Reactor.Argument, only: :macros import Reactor.Argument, only: :macros
require Logger require Logger
@ -67,11 +80,11 @@ defmodule Reactor.Executor.StepRunner do
defp do_undo(reactor, _value, step, _arguments, context, undo_count) defp do_undo(reactor, _value, step, _arguments, context, undo_count)
when undo_count == @max_undo_count do when undo_count == @max_undo_count do
reason = "`undo/4` retried #{@max_undo_count} times on step `#{inspect(step.name)}`." error = UndoRetriesExceededError.exception(step: step.name, retry_count: undo_count)
Hooks.event(reactor, {:undo_error, reason}, step, context) Hooks.event(reactor, {:undo_error, error}, step, context)
{:error, reason} {:error, error}
end end
defp do_undo(reactor, value, step, arguments, context, undo_count) do defp do_undo(reactor, value, step, arguments, context, undo_count) do
@ -90,8 +103,9 @@ defmodule Reactor.Executor.StepRunner do
do_undo(reactor, value, step, arguments, context, undo_count + 1) do_undo(reactor, value, step, arguments, context, undo_count + 1)
{:error, reason} -> {:error, reason} ->
Hooks.event(reactor, {:undo_error, reason}, step, context) error = UndoStepError.exception(step: step.name, error: reason)
{:error, reason} Hooks.event(reactor, {:undo_error, error}, step, context)
{:error, error}
end end
end end
@ -103,9 +117,10 @@ defmodule Reactor.Executor.StepRunner do
|> handle_run_result(reactor, step, arguments, context) |> handle_run_result(reactor, step, arguments, context)
rescue rescue
reason -> reason ->
Hooks.event(reactor, {:run_error, reason}, step, context) error = RunStepError.exception(step: step.name, error: reason)
Hooks.event(reactor, {:run_error, error}, step, context)
maybe_compensate(reactor, step, reason, arguments, context) maybe_compensate(reactor, step, error, arguments, context)
end end
defp handle_run_result({:ok, value}, reactor, step, _arguments, context) do defp handle_run_result({:ok, value}, reactor, step, _arguments, context) do
@ -134,9 +149,10 @@ defmodule Reactor.Executor.StepRunner do
end end
defp handle_run_result({:error, reason}, reactor, step, arguments, context) do defp handle_run_result({:error, reason}, reactor, step, arguments, context) do
Hooks.event(reactor, {:run_error, reason}, step, context) error = RunStepError.exception(step: step.name, error: reason)
Hooks.event(reactor, {:run_error, error}, step, context)
maybe_compensate(reactor, step, reason, arguments, context) maybe_compensate(reactor, step, error, arguments, context)
end end
defp handle_run_result({:halt, value}, reactor, step, _arguments, context) do defp handle_run_result({:halt, value}, reactor, step, _arguments, context) do
@ -145,30 +161,38 @@ defmodule Reactor.Executor.StepRunner do
{:halt, value} {:halt, value}
end end
defp maybe_compensate(reactor, step, reason, arguments, context) do defp maybe_compensate(reactor, step, error, arguments, context) do
if Step.can?(step, :compensate) do if Step.can?(step, :compensate) do
compensate(reactor, step, reason, arguments, context) compensate(reactor, step, error, arguments, context)
else else
{:error, reason} {:error, error}
end end
end end
defp compensate(reactor, step, reason, arguments, context) do defp compensate(reactor, step, error, arguments, context) do
Hooks.event(reactor, {:compensate_start, reason}, step, context) Hooks.event(reactor, {:compensate_start, error}, step, context)
step step
|> Step.compensate(reason, arguments, context) |> Step.compensate(error.error, arguments, context)
|> handle_compensate_result(reactor, step, context, reason) |> handle_compensate_result(reactor, step, context, error)
rescue rescue
error -> error ->
Hooks.event(reactor, {:compensate_error, reason}, step, context) error =
CompensateStepError.exception(
reactor: reactor,
step: step,
error: error,
stacktrace: __STACKTRACE__
)
Hooks.event(reactor, {:compensate_error, error}, step, context)
Logger.error(fn -> Logger.error(fn ->
"Warning: step `#{inspect(step.name)}` `compensate/4` raised an error:\n" <> "Warning: step `#{inspect(step.name)}` `compensate/4` raised an error:\n" <>
Exception.format(:error, error, __STACKTRACE__) Exception.format(:error, error, __STACKTRACE__)
end) end)
{:error, reason} {:error, error}
end end
defp handle_compensate_result({:continue, value}, reactor, step, context, _) do defp handle_compensate_result({:continue, value}, reactor, step, context, _) do
@ -190,15 +214,17 @@ defmodule Reactor.Executor.StepRunner do
end end
defp handle_compensate_result({:error, reason}, reactor, step, context, _) do defp handle_compensate_result({:error, reason}, reactor, step, context, _) do
Hooks.event(reactor, {:compensate_error, reason}, step, context) error = CompensateStepError.exception(reactor: reactor, step: step, error: reason)
{:error, reason} Hooks.event(reactor, {:compensate_error, error}, step, context)
{:error, error}
end end
defp handle_compensate_result(:ok, reactor, step, context, reason) do defp handle_compensate_result(:ok, reactor, step, context, error) do
Hooks.event(reactor, :compensate_complete, step, context) Hooks.event(reactor, :compensate_complete, step, context)
{:error, reason} {:error, error}
end end
defp get_step_arguments(reactor, step) do defp get_step_arguments(reactor, step) do
@ -208,7 +234,7 @@ defmodule Reactor.Executor.StepRunner do
argument, arguments -> argument, arguments ->
with {:ok, value} <- fetch_argument(reactor, step, argument), with {:ok, value} <- fetch_argument(reactor, step, argument),
{:ok, value} <- subpath_argument(value, argument) do {:ok, value} <- subpath_argument(value, step, argument) do
{:ok, Map.put(arguments, argument.name, value)} {:ok, Map.put(arguments, argument.name, value)}
end end
end) end)
@ -216,14 +242,13 @@ defmodule Reactor.Executor.StepRunner do
defp fetch_argument(reactor, step, argument) when is_from_input(argument) do defp fetch_argument(reactor, step, argument) when is_from_input(argument) do
with :error <- Map.fetch(reactor.context.private.inputs, argument.source.name) do with :error <- Map.fetch(reactor.context.private.inputs, argument.source.name) do
{:error, {:error, MissingInputError.exception(reactor: reactor, step: step, argument: argument)}
"Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` relies on missing input `#{argument.source.name}`"}
end end
end end
defp fetch_argument(reactor, step, argument) when is_from_result(argument) do defp fetch_argument(reactor, step, argument) when is_from_result(argument) do
with :error <- Map.fetch(reactor.intermediate_results, argument.source.name) do with :error <- Map.fetch(reactor.intermediate_results, argument.source.name) do
{:error, "Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` is missing"} {:error, MissingResultError.exception(reactor: reactor, step: step, argument: argument)}
end end
end end
@ -231,44 +256,125 @@ defmodule Reactor.Executor.StepRunner do
{:ok, argument.source.value} {:ok, argument.source.value}
end end
defp subpath_argument(value, argument) when has_sub_path(argument), defp subpath_argument(value, step, argument) when has_sub_path(argument),
do: perform_argument_subpath(value, argument.name, argument.source.sub_path, []) do: perform_argument_subpath(value, step, argument, argument.source.sub_path, [], value)
defp subpath_argument(value, _argument), do: {:ok, value} defp subpath_argument(value, _step, _argument), do: {:ok, value}
defp perform_argument_subpath(value, _, [], _), do: {:ok, value} defp perform_argument_subpath(
value,
step,
argument,
remaining_path,
done_path,
intermediate_value
)
defp perform_argument_subpath(value, name, remaining, done) when is_struct(value), defp perform_argument_subpath(_value, _step, _argument, [], _, result), do: {:ok, result}
do: value |> Map.from_struct() |> perform_argument_subpath(name, remaining, done)
defp perform_argument_subpath(value, name, [head | tail], []) do defp perform_argument_subpath(
case access_fetch_with_rescue(value, head) do value,
{:ok, value} -> step,
perform_argument_subpath(value, name, tail, [head]) argument,
[key | remaining_path],
done_path,
intermediate_value
)
when is_map(intermediate_value) do
case Map.fetch(intermediate_value, key) do
{:ok, intermediate_value} ->
perform_argument_subpath(
value,
step,
argument,
remaining_path,
[key | done_path],
intermediate_value
)
:error -> :error ->
type = if is_struct(intermediate_value), do: "struct", else: "map"
{:error, {:error,
"Unable to resolve subpath for argument `#{inspect(name)}` at key `[#{inspect(head)}]`"} ArgumentSubpathError.exception(
step: step,
argument: argument,
culprit: intermediate_value,
culprit_path: done_path,
culprit_key: key,
value: value,
message:
"key `#{inspect(key)}` not present in #{type} at path `#{inspect(done_path)}`."
)}
end end
end end
defp perform_argument_subpath(value, name, [head | tail], done) do defp perform_argument_subpath(
case access_fetch_with_rescue(value, head) do value,
{:ok, value} -> step,
perform_argument_subpath(value, name, tail, [head]) argument,
[key | remaining_path],
done_path,
intermediate_value
)
when is_list(intermediate_value) do
if Keyword.keyword?(intermediate_value) do
case Keyword.fetch(intermediate_value, key) do
{:ok, intermediate_value} ->
perform_argument_subpath(
value,
step,
argument,
remaining_path,
[key | done_path],
intermediate_value
)
:error -> :error ->
path = Enum.reverse([head | done]) {:error,
ArgumentSubpathError.exception(
{:error, step: step,
"Unable to resolve subpath for argument `#{inspect(name)}` at key `#{inspect(path)}`"} argument: argument,
culprit: intermediate_value,
culprit_path: done_path,
culprit_key: key,
value: value,
message:
"key `#{inspect(key)}` not present in keyword list at path `#{inspect(done_path)}`."
)}
end
else
{:error,
ArgumentSubpathError.exception(
step: step,
argument: argument,
value: value,
culprit: intermediate_value,
culprit_key: List.first(done_path),
culprit_path: done_path,
message: "list at path `#{inspect(done_path)}` is not a keyword list."
)}
end end
end end
defp access_fetch_with_rescue(container, key) do defp perform_argument_subpath(
Access.fetch(container, key) value,
rescue step,
FunctionClauseError -> :error argument,
_remaining_path,
done_path,
intermediate_value
) do
{:error,
ArgumentSubpathError.exception(
step: step,
argument: argument,
value: value,
culprit: intermediate_value,
culprit_path: done_path,
culprit_key: List.first(done_path),
message: "value is neither a map or keyword list."
)}
end end
defp build_context(reactor, state, step, concurrency_key) do defp build_context(reactor, state, step, concurrency_key) do

View file

@ -4,7 +4,8 @@ defmodule Reactor.Executor.Sync do
the reactor or execution state. the reactor or execution state.
""" """
alias Reactor.{Error, Executor, Step} alias Reactor.Error.Invalid.RetriesExceededError, as: RetriesExceededError
alias Reactor.{Executor, Step}
@doc """ @doc """
Try and run a step synchronously. Try and run a step synchronously.
@ -22,7 +23,7 @@ defmodule Reactor.Executor.Sync do
reactor = drop_from_plan(reactor, step) reactor = drop_from_plan(reactor, step)
error = error =
Error.RetriesExceededError.exception( RetriesExceededError.exception(
step: step, step: step,
retry_count: Map.get(state.retries, step.ref) retry_count: Map.get(state.retries, step.ref)
) )

View file

@ -6,7 +6,7 @@ defmodule Reactor.Planner do
between them representing their dependencies (arguments). between them representing their dependencies (arguments).
""" """
alias Reactor.{Error.PlanError, Step} alias Reactor.{Error.Internal.PlanError, Step}
import Reactor, only: :macros import Reactor, only: :macros
import Reactor.Argument, only: :macros import Reactor.Argument, only: :macros
import Reactor.Utils import Reactor.Utils

View file

@ -118,7 +118,7 @@ defmodule Reactor.Step do
- `:retry` or `{:retry, reason}` if you would like the reactor to attempt to - `:retry` or `{:retry, reason}` if you would like the reactor to attempt to
re-run the step. You can optionally supply an error reason which will be re-run the step. You can optionally supply an error reason which will be
used in the event that the step runs out of retries, otherwise a used in the event that the step runs out of retries, otherwise a
`Reactor.Error.RetriesExceededError` will be used. `Reactor.Error.Invalid.RetriesExceededError` will be used.
- `{:error, reason}` if compensation was unsuccessful. - `{:error, reason}` if compensation was unsuccessful.
""" """
@callback compensate( @callback compensate(

View file

@ -107,7 +107,6 @@ defmodule Reactor.Step.Around do
{:ok, result} <- fun.(arguments, context, steps, &__MODULE__.around(&1, &2, &3, options)) do {:ok, result} <- fun.(arguments, context, steps, &__MODULE__.around(&1, &2, &3, options)) do
{:ok, result} {:ok, result}
else else
:error -> {:error, "Missing `fun` option."}
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end end
end end
@ -154,9 +153,12 @@ defmodule Reactor.Step.Around do
end} end}
end) end)
_ -> {:ok, _} ->
{:error, {:error,
argument_error(:options, "Expected `fun` option to be a 4 arity function", options)} argument_error(:options, "Expected `fun` option to be a 4 arity function", options)}
:error ->
{:error, argument_error(:options, "The required option `fun` is not present", options)}
end end
end end
@ -164,7 +166,12 @@ defmodule Reactor.Step.Around do
if Code.ensure_loaded?(m) && function_exported?(m, f, arity) do if Code.ensure_loaded?(m) && function_exported?(m, f, arity) do
callback.() callback.()
else else
{:error, "Expected `#{inspect(m)}.#{f}/#{arity}` to be exported."} {:error,
argument_error(
:mfa,
"Expected `#{inspect(m)}.#{f}/#{arity}` to be exported.",
{m, f, arity}
)}
end end
end end

View file

@ -12,7 +12,7 @@ defmodule Reactor.Step.Compose do
""" """
use Reactor.Step use Reactor.Step
alias Reactor.{Argument, Builder, Error.ComposeError, Info, Step} alias Reactor.{Argument, Builder, Error.Internal.ComposeError, Info, Step}
import Reactor, only: :macros import Reactor, only: :macros
import Reactor.Argument, only: :macros import Reactor.Argument, only: :macros
import Reactor.Utils import Reactor.Utils

View file

@ -6,16 +6,24 @@ defmodule Reactor.Step.Transform do
semantics. semantics.
""" """
alias Reactor.Step alias Reactor.{Error.Invalid.MissingArgumentError, Error.Invalid.TransformError, Step}
use Step use Step
@doc false @doc false
@impl true @impl true
@spec run(Reactor.inputs(), Reactor.context(), keyword) :: {:ok | :error, any} @spec run(Reactor.inputs(), Reactor.context(), keyword) :: {:ok | :error, any}
def run(arguments, _context, options) do def run(arguments, context, options) do
case Map.fetch(arguments, :value) do case Map.fetch(arguments, :value) do
{:ok, value} -> do_transform(value, options) {:ok, value} ->
:error -> {:error, ArgumentError.exception("The `value` argument is missing")} do_transform(value, options)
:error ->
{:error,
MissingArgumentError.exception(
step: context.current_step,
argument: :value,
arguments: arguments
)}
end end
end end
@ -31,6 +39,6 @@ defmodule Reactor.Step.Transform do
{:ok, apply(m, f, [value | a])} {:ok, apply(m, f, [value | a])}
end end
rescue rescue
error -> {:error, error} error -> {:error, TransformError.exception(input: value, error: error)}
end end
end end

View file

@ -7,7 +7,7 @@ defmodule Reactor.Step.TransformAll do
""" """
use Reactor.Step use Reactor.Step
alias Reactor.{Error.TransformError, Step.Transform} alias Reactor.{Error.Invalid.TransformError, Step.Transform}
@doc false @doc false
@impl true @impl true
@ -22,11 +22,11 @@ defmodule Reactor.Step.TransformAll do
TransformError.exception( TransformError.exception(
input: arguments, input: arguments,
output: result, output: result,
message: "Step transformers must return a map to use as replacement arguments." error: "Step transformers must return a map to use as replacement arguments."
)} )}
{:error, reason} -> {:error, reason} ->
{:error, reason} {:error, TransformError.exception(input: arguments, error: reason)}
end end
end end
end end

View file

@ -90,6 +90,7 @@ defmodule Reactor.MixProject do
defp deps do defp deps do
[ [
{:spark, "~> 2.0"}, {:spark, "~> 2.0"},
{:splode, "~> 0.1.0"},
{:libgraph, "~> 0.16"}, {:libgraph, "~> 0.16"},
{:telemetry, "~> 1.2"}, {:telemetry, "~> 1.2"},

View file

@ -23,6 +23,7 @@
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
"sourceror": {:hex, :sourceror, "1.0.2", "c5e86fdc14881f797749d1fe5df017ca66727a8146e7ee3e736605a3df78f3e6", [:mix], [], "hexpm", "832335e87d0913658f129d58b2a7dc0490ddd4487b02de6d85bca0169ec2bd79"}, "sourceror": {:hex, :sourceror, "1.0.2", "c5e86fdc14881f797749d1fe5df017ca66727a8146e7ee3e736605a3df78f3e6", [:mix], [], "hexpm", "832335e87d0913658f129d58b2a7dc0490ddd4487b02de6d85bca0169ec2bd79"},
"spark": {:hex, :spark, "2.1.0", "71b27a34c4eb6e9df958237ecb5df1f738465c05a190f3024fed6c305adff903", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "6719170085ec23c9ecd48c1a92cef0c596acec7f77b2265a4b32e92a4c6a7daa"}, "spark": {:hex, :spark, "2.1.0", "71b27a34c4eb6e9df958237ecb5df1f738465c05a190f3024fed6c305adff903", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "6719170085ec23c9ecd48c1a92cef0c596acec7f77b2265a4b32e92a4c6a7daa"},
"splode": {:hex, :splode, "0.1.1", "1e3290c2d11f95bd3c3e6cf44cd33261ce76e2429a6d367f7896fd84698ca29f", [:mix], [], "hexpm", "7bf9dd0ade39c5074434776d4e10e4d71e88707a39bee9e809438a73bc0e92a9"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
"yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"}, "yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"},

View file

@ -1,6 +1,16 @@
defmodule Reactor.Builder.ComposeTest do defmodule Reactor.Builder.ComposeTest do
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Reactor.{Argument, Builder, Builder.Compose, Error.ComposeError, Planner, Step, Template}
alias Reactor.{
Argument,
Builder,
Builder.Compose,
Error.Internal.ComposeError,
Planner,
Step,
Template
}
require Reactor.Argument require Reactor.Argument
describe "compose/4" do describe "compose/4" do

View file

@ -1,5 +1,6 @@
defmodule Reactor.Executor.AsyncTest do defmodule Reactor.Executor.AsyncTest do
alias Reactor.{Error, Executor} alias Reactor.Error.Invalid.RetriesExceededError, as: RetriesExceededError
alias Reactor.Executor
import Reactor.Executor.Async import Reactor.Executor.Async
use ExUnit.Case, async: true use ExUnit.Case, async: true
@ -224,7 +225,7 @@ defmodule Reactor.Executor.AsyncTest do
task = Task.Supervisor.async_nolink(supervisor, fn -> :retry end) task = Task.Supervisor.async_nolink(supervisor, fn -> :retry end)
state = %{state | current_tasks: %{task => undoable}, retries: %{undoable.ref => 100}} state = %{state | current_tasks: %{task => undoable}, retries: %{undoable.ref => 100}}
assert {:undo, _reactor, %{errors: [%Error.RetriesExceededError{}]}} = assert {:undo, _reactor, %{errors: [%RetriesExceededError{}]}} =
handle_completed_steps(reactor, state) handle_completed_steps(reactor, state)
end end
end end

View file

@ -1,6 +1,11 @@
defmodule Reactor.Executor.EventTest do defmodule Reactor.Executor.EventTest do
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Reactor.Error.Invalid.CompensateStepError
alias Reactor.Error.Invalid.RunStepError
alias Reactor.Error.Invalid.UndoRetriesExceededError
alias Reactor.Error.Invalid.UndoStepError
defmodule EventMiddleware do defmodule EventMiddleware do
use Reactor.Middleware use Reactor.Middleware
@ -58,7 +63,7 @@ defmodule Reactor.Executor.EventTest do
test "fail step" do test "fail step" do
assert [ assert [
{:run_start, _}, {:run_start, _},
{:run_error, :marty} {:run_error, %RunStepError{error: :marty}}
] = ] =
run(StepReactor, %{step_result: {:error, :marty}}, async?: false) run(StepReactor, %{step_result: {:error, :marty}}, async?: false)
end end
@ -96,8 +101,8 @@ defmodule Reactor.Executor.EventTest do
test "successful compensation events" do test "successful compensation events" do
assert [ assert [
{:run_start, _}, {:run_start, _},
{:run_error, :fail}, {:run_error, %RunStepError{error: :fail}},
{:compensate_start, :fail}, {:compensate_start, %RunStepError{error: :fail}},
:compensate_complete :compensate_complete
] = ] =
run(CompensateReactor, %{compensation_result: :ok}, async?: false) run(CompensateReactor, %{compensation_result: :ok}, async?: false)
@ -106,12 +111,12 @@ defmodule Reactor.Executor.EventTest do
test "compensation retries" do test "compensation retries" do
assert [ assert [
{:run_start, _}, {:run_start, _},
{:run_error, :fail}, {:run_error, %RunStepError{error: :fail}},
{:compensate_start, :fail}, {:compensate_start, %RunStepError{error: :fail}},
:compensate_retry, :compensate_retry,
{:run_start, _}, {:run_start, _},
{:run_error, :fail}, {:run_error, %RunStepError{error: :fail}},
{:compensate_start, :fail}, {:compensate_start, %RunStepError{error: :fail}},
:compensate_retry :compensate_retry
] = run(CompensateReactor, %{compensation_result: :retry}, async?: false) ] = run(CompensateReactor, %{compensation_result: :retry}, async?: false)
end end
@ -119,9 +124,9 @@ defmodule Reactor.Executor.EventTest do
test "compensation failure" do test "compensation failure" do
assert [ assert [
{:run_start, _}, {:run_start, _},
{:run_error, :fail}, {:run_error, %RunStepError{error: :fail}},
{:compensate_start, :fail}, {:compensate_start, %RunStepError{error: :fail}},
{:compensate_error, :cant_compensate} {:compensate_error, %CompensateStepError{error: :cant_compensate}}
] = ] =
run(CompensateReactor, %{compensation_result: {:error, :cant_compensate}}, run(CompensateReactor, %{compensation_result: {:error, :cant_compensate}},
async?: false async?: false
@ -131,8 +136,8 @@ defmodule Reactor.Executor.EventTest do
test "compensation complete" do test "compensation complete" do
assert [ assert [
{:run_start, _}, {:run_start, _},
{:run_error, :fail}, {:run_error, %RunStepError{error: :fail}},
{:compensate_start, :fail}, {:compensate_start, %RunStepError{error: :fail}},
:compensate_complete :compensate_complete
] = ] =
run(CompensateReactor, %{compensation_result: :ok}, async?: false) run(CompensateReactor, %{compensation_result: :ok}, async?: false)
@ -141,8 +146,8 @@ defmodule Reactor.Executor.EventTest do
test "compensation continue" do test "compensation continue" do
assert [ assert [
{:run_start, _}, {:run_start, _},
{:run_error, :fail}, {:run_error, %RunStepError{error: :fail}},
{:compensate_start, :fail}, {:compensate_start, %RunStepError{error: :fail}},
{:compensate_continue, :all_is_well} {:compensate_continue, :all_is_well}
] = ] =
run(CompensateReactor, %{compensation_result: {:continue, :all_is_well}}, run(CompensateReactor, %{compensation_result: {:continue, :all_is_well}},
@ -181,7 +186,7 @@ defmodule Reactor.Executor.EventTest do
{:run_start, _}, {:run_start, _},
{:run_complete, :marty}, {:run_complete, :marty},
{:run_start, _}, {:run_start, _},
{:run_error, :doc_brown}, {:run_error, %RunStepError{error: :doc_brown}},
:undo_start, :undo_start,
:undo_complete :undo_complete
] = ] =
@ -193,14 +198,14 @@ defmodule Reactor.Executor.EventTest do
{:run_start, _}, {:run_start, _},
{:run_complete, :marty}, {:run_complete, :marty},
{:run_start, _}, {:run_start, _},
{:run_error, :doc_brown}, {:run_error, %RunStepError{error: :doc_brown}},
:undo_start, :undo_start,
:undo_retry, :undo_retry,
:undo_retry, :undo_retry,
:undo_retry, :undo_retry,
:undo_retry, :undo_retry,
:undo_retry, :undo_retry,
{:undo_error, "`undo/4` retried 5 times on step `:undo_step`."} {:undo_error, %UndoRetriesExceededError{}}
] = ] =
run(UndoReactor, %{undo_result: :retry}, async?: false) run(UndoReactor, %{undo_result: :retry}, async?: false)
end end
@ -210,14 +215,14 @@ defmodule Reactor.Executor.EventTest do
{:run_start, %{undo_result: {:retry, :einstein}}}, {:run_start, %{undo_result: {:retry, :einstein}}},
{:run_complete, :marty}, {:run_complete, :marty},
{:run_start, %{}}, {:run_start, %{}},
{:run_error, :doc_brown}, {:run_error, %RunStepError{error: :doc_brown}},
:undo_start, :undo_start,
{:undo_retry, :einstein}, {:undo_retry, :einstein},
{:undo_retry, :einstein}, {:undo_retry, :einstein},
{:undo_retry, :einstein}, {:undo_retry, :einstein},
{:undo_retry, :einstein}, {:undo_retry, :einstein},
{:undo_retry, :einstein}, {:undo_retry, :einstein},
{:undo_error, "`undo/4` retried 5 times on step `:undo_step`."} {:undo_error, %UndoRetriesExceededError{}}
] = ] =
run(UndoReactor, %{undo_result: {:retry, :einstein}}, async?: false) run(UndoReactor, %{undo_result: {:retry, :einstein}}, async?: false)
end end
@ -227,9 +232,9 @@ defmodule Reactor.Executor.EventTest do
{:run_start, _}, {:run_start, _},
{:run_complete, :marty}, {:run_complete, :marty},
{:run_start, _}, {:run_start, _},
{:run_error, :doc_brown}, {:run_error, %RunStepError{error: :doc_brown}},
:undo_start, :undo_start,
{:undo_error, :einstein} {:undo_error, %UndoStepError{error: :einstein}}
] = run(UndoReactor, %{undo_result: {:error, :einstein}}, async?: false) ] = run(UndoReactor, %{undo_result: {:error, :einstein}}, async?: false)
end end
end end

View file

@ -1,7 +1,7 @@
defmodule Reactor.Executor.HooksTest do defmodule Reactor.Executor.HooksTest do
@moduledoc false @moduledoc false
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Reactor.Builder alias Reactor.{Builder, Error.Invalid.RunStepError}
describe "init" do describe "init" do
defmodule ReturnContextReactor do defmodule ReturnContextReactor do
@ -84,10 +84,8 @@ defmodule Reactor.Executor.HooksTest do
@moduledoc false @moduledoc false
@behaviour Reactor.Middleware @behaviour Reactor.Middleware
def error(errors, _context) do def error(error, _context) do
[error] = List.wrap(errors) assert Exception.message(error) =~ "hell"
assert is_exception(error, RuntimeError)
assert Exception.message(error) == "hell"
{:error, :wat} {:error, :wat}
end end
@ -116,7 +114,7 @@ defmodule Reactor.Executor.HooksTest do
ErrorReactor.reactor() ErrorReactor.reactor()
|> Builder.add_middleware!(ErrorContextMiddleware) |> Builder.add_middleware!(ErrorContextMiddleware)
assert {:error, [%RuntimeError{message: "hell"}]} = assert {:error, %{errors: [%RunStepError{error: %RuntimeError{message: "hell"}}]}} =
Reactor.run(reactor, %{}, %{is_context?: true}) Reactor.run(reactor, %{}, %{is_context?: true})
end end
end end

View file

@ -1,7 +1,18 @@
defmodule Reactor.Executor.StepRunnerTest do defmodule Reactor.Executor.StepRunnerTest do
@moduledoc false @moduledoc false
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Reactor.{Argument, Builder, Executor.State, Template}
alias Reactor.{
Argument,
Builder,
Error.Invalid.ArgumentSubpathError,
Error.Invalid.MissingResultError,
Error.Invalid.RunStepError,
Error.Invalid.UndoRetriesExceededError,
Executor.State,
Template
}
import Reactor.Executor.StepRunner import Reactor.Executor.StepRunner
use Mimic use Mimic
@ -21,8 +32,8 @@ defmodule Reactor.Executor.StepRunnerTest do
{:ok, reactor} = Builder.add_step(reactor, :marty, Example.Step.Doable, [argument]) {:ok, reactor} = Builder.add_step(reactor, :marty, Example.Step.Doable, [argument])
step = reactor.steps |> hd() step = reactor.steps |> hd()
assert {:error, reason} = run(reactor, state, step, nil) assert {:error, %MissingResultError{argument: %{source: %{name: :time_circuits}}}} =
assert reason =~ "argument `:current_year` is missing" run(reactor, state, step, nil)
end end
test "when the required argument cannot be subpathed, it returns an error", %{ test "when the required argument cannot be subpathed, it returns an error", %{
@ -40,8 +51,8 @@ defmodule Reactor.Executor.StepRunnerTest do
step = reactor.steps |> hd() step = reactor.steps |> hd()
reactor = %{reactor | intermediate_results: %{time_circuits: 1985}} reactor = %{reactor | intermediate_results: %{time_circuits: 1985}}
assert {:error, reason} = run(reactor, state, step, nil) assert {:error, %ArgumentSubpathError{argument: %{name: :current_year}}} =
assert reason == "Unable to resolve subpath for argument `:current_year` at key `[:year]`" run(reactor, state, step, nil)
end end
test "when the required argument can be subpathed, it calls the step with the correct arguments", test "when the required argument can be subpathed, it calls the step with the correct arguments",
@ -145,7 +156,7 @@ defmodule Reactor.Executor.StepRunnerTest do
{:error, :doc} {:error, :doc}
end) end)
assert {:error, :doc} = run(reactor, state, step, nil) assert {:error, %RunStepError{error: :doc}} = run(reactor, state, step, nil)
end end
test "when a step raises an error it returns an error tuple", %{ test "when a step raises an error it returns an error tuple", %{
@ -161,8 +172,7 @@ defmodule Reactor.Executor.StepRunnerTest do
end) end)
assert {:error, error} = run(reactor, state, step, nil) assert {:error, error} = run(reactor, state, step, nil)
assert is_struct(error, RuntimeError) assert Exception.message(error) =~ "Not enough plutonium!"
assert Exception.message(error) == "Not enough plutonium!"
end end
test "when a step returns an error and can be compensated and the compensation says it can continue it returns an ok tuple", test "when a step returns an error and can be compensated and the compensation says it can continue it returns an ok tuple",
@ -186,7 +196,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|> stub(:run, fn _, _, _ -> {:error, :doc} end) |> stub(:run, fn _, _, _ -> {:error, :doc} end)
|> stub(:compensate, fn :doc, _, _, _ -> :ok end) |> stub(:compensate, fn :doc, _, _, _ -> :ok end)
assert {:error, :doc} = run(reactor, state, step, nil) assert {:error, %RunStepError{error: :doc}} = run(reactor, state, step, nil)
end end
end end
@ -241,8 +251,8 @@ defmodule Reactor.Executor.StepRunnerTest do
Example.Step.Undoable Example.Step.Undoable
|> stub(:undo, fn _, _, _, _ -> :retry end) |> stub(:undo, fn _, _, _, _ -> :retry end)
assert {:error, message} = undo(reactor, state, step, :marty, nil) assert {:error, %UndoRetriesExceededError{step: :marty}} =
assert message =~ "retried 5 times" undo(reactor, state, step, :marty, nil)
end end
end end
end end

View file

@ -1,5 +1,6 @@
defmodule Reactor.Executor.SyncTest do defmodule Reactor.Executor.SyncTest do
alias Reactor.{Error, Executor} alias Reactor.Error.Invalid.RetriesExceededError, as: RetriesExceededError
alias Reactor.Executor
import Reactor.Executor.Sync import Reactor.Executor.Sync
use ExUnit.Case, async: true use ExUnit.Case, async: true
use Mimic use Mimic
@ -77,7 +78,9 @@ defmodule Reactor.Executor.SyncTest do
|> stub(:run, fn _, _, _ -> :retry end) |> stub(:run, fn _, _, _ -> :retry end)
state = %{state | retries: Map.put(state.retries, step.ref, 100)} state = %{state | retries: Map.put(state.retries, step.ref, 100)}
assert {:undo, _, %{errors: [%Error.RetriesExceededError{}]}} = run(reactor, state, step)
assert {:undo, _, %{errors: [%RetriesExceededError{}]}} =
run(reactor, state, step)
end end
test "when the step is successful it tells the reactor to recurse", %{ test "when the step is successful it tells the reactor to recurse", %{

View file

@ -183,9 +183,11 @@ defmodule Reactor.ExecutorTest do
{:ok, agent} = Agent.start_link(fn -> MapSet.new() end) {:ok, agent} = Agent.start_link(fn -> MapSet.new() end)
assert {:error, ["I fail"]} = assert {:error, error} =
Reactor.Executor.run(reactor, %{agent: agent}, %{}, max_iterations: 100) Reactor.Executor.run(reactor, %{agent: agent}, %{}, max_iterations: 100)
assert Exception.message(error) =~ "I fail"
effects = Agent.get(agent, & &1) effects = Agent.get(agent, & &1)
assert MapSet.size(effects) == 0 assert MapSet.size(effects) == 0

View file

@ -1,7 +1,7 @@
defmodule Reactor.PlannerTest do defmodule Reactor.PlannerTest do
@moduledoc false @moduledoc false
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Reactor.{Builder, Error.PlanError, Info, Planner} alias Reactor.{Builder, Error.Internal.PlanError, Info, Planner}
describe "plan/1" do describe "plan/1" do
test "when the argument is not a reactor, it returns an error" do test "when the argument is not a reactor, it returns an error" do

View file

@ -1,7 +1,7 @@
defmodule Reactor.Step.AroundTest do defmodule Reactor.Step.AroundTest do
@moduledoc false @moduledoc false
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Reactor.{Builder, Step.Around, Step.ReturnAllArguments} alias Reactor.{Builder, Error.Invalid.MissingInputError, Step.Around, Step.ReturnAllArguments}
setup do setup do
context = %{current_step: %{name: :marty}} context = %{current_step: %{name: :marty}}
@ -27,7 +27,7 @@ defmodule Reactor.Step.AroundTest do
assert {:error, error} = assert {:error, error} =
Around.run(%{}, context, Keyword.put(options, :fun, {Marty, :marty, []})) Around.run(%{}, context, Keyword.put(options, :fun, {Marty, :marty, []}))
assert error =~ ~r/`Marty.marty\/4` to be exported/i assert Exception.message(error) =~ ~r/`Marty.marty\/4` to be exported/i
end end
test "when passed steps which are not steps, it returns an error", %{ test "when passed steps which are not steps, it returns an error", %{
@ -42,8 +42,12 @@ defmodule Reactor.Step.AroundTest do
context: context, context: context,
options: options options: options
} do } do
assert {:error, [error]} = Around.run(%{}, context, options) assert {:error,
assert error =~ ~r/missing input `arg`/i %{
errors: [
%MissingInputError{step: %{name: :example}, argument: %{source: %{name: :arg}}}
]
}} = Around.run(%{}, context, options)
end end
test "when the around function fails before calling the callback, it returns an error", %{ test "when the around function fails before calling the callback, it returns an error", %{

View file

@ -1,7 +1,7 @@
defmodule Reactor.Step.GroupTest do defmodule Reactor.Step.GroupTest do
@moduledoc false @moduledoc false
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Reactor.{Builder, Step.Group, Step.ReturnAllArguments} alias Reactor.{Builder, Error.Invalid.MissingInputError, Step.Group, Step.ReturnAllArguments}
setup do setup do
context = %{current_step: %{name: :marty}} context = %{current_step: %{name: :marty}}
@ -63,8 +63,8 @@ defmodule Reactor.Step.GroupTest do
context: context, context: context,
options: options options: options
} do } do
assert {:error, [error]} = Group.run(%{}, context, options) assert {:error, %{errors: [%MissingInputError{argument: %{name: :arg}}]}} =
assert error =~ ~r/missing input `arg`/i Group.run(%{}, context, options)
end end
test "when the before function fails, it returns an error", %{ test "when the before function fails, it returns an error", %{

View file

@ -20,7 +20,7 @@ defmodule Reactor.Step.TransformAllTest do
test "when the function raises, it returns an error" do test "when the function raises, it returns an error" do
assert {:error, error} = Step.TransformAll.run(%{a: 1}, %{}, fun: fn _ -> raise "hell" end) assert {:error, error} = Step.TransformAll.run(%{a: 1}, %{}, fun: fn _ -> raise "hell" end)
assert Exception.message(error) == "hell" assert Exception.message(error) =~ "hell"
end end
end end
end end

View file

@ -9,7 +9,7 @@ defmodule Reactor.Step.TransformTest do
describe "run/3" do describe "run/3" do
test "when the value argument is missing" do test "when the value argument is missing" do
assert {:error, error} = run(%{}, %{}, []) assert {:error, error} = run(%{}, %{current_step: :current_step}, [])
assert Exception.message(error) =~ "argument is missing" assert Exception.message(error) =~ "argument is missing"
end end