improvement: Move can?/2 and async?/1 into Reactor.Step behaviour.

Makes it a little bit easier for steps to define their own capabilities at runtime.
This commit is contained in:
James Harton 2024-02-15 10:33:08 +13:00
parent 2de8f356d2
commit 8cc6882883
Signed by: james
GPG key ID: 90E82DAA13F624F4
3 changed files with 111 additions and 68 deletions

View file

@ -16,7 +16,6 @@ defmodule Reactor.Executor.StepRunner do
{:ok, any, [Step.t()]} | :retry | {:retry, any} | {:error | :halt, any}
def run(reactor, state, step, concurrency_key) do
with {:ok, arguments} <- get_step_arguments(reactor, step),
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
metadata = %{
@ -28,11 +27,10 @@ defmodule Reactor.Executor.StepRunner do
metadata_stack = Process.get(:__reactor__, [])
Process.put(:__reactor__, [metadata | metadata_stack])
result = do_run(module, options, arguments, context)
result = do_run(step, arguments, context)
Process.put(:__reactor__, metadata_stack)
result
end
after
end
@doc """
@ -42,53 +40,47 @@ defmodule Reactor.Executor.StepRunner do
:ok | {:error, any}
def undo(reactor, state, step, value, concurrency_key) do
with {:ok, arguments} <- get_step_arguments(reactor, step),
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
do_undo(value, module, options, arguments, context)
do_undo(value, step, arguments, context)
end
end
defp module_and_opts(%{impl: {module, options}}) when is_atom(module) and is_list(options),
do: {module, options}
defp do_undo(value, step, arguments, context, undo_count \\ 0)
defp module_and_opts(%{impl: module}) when is_atom(module), do: {module, []}
defp do_undo(_value, step, _arguments, _context, @max_undo_count),
do: {:error, "`undo/4` retried #{@max_undo_count} times on step `#{inspect(step.name)}`."}
defp do_undo(value, module, options, arguments, context, undo_count \\ 0)
defp do_undo(_value, module, _options, _arguments, _context, @max_undo_count),
do: {:error, "`#{inspect(module)}.undo/4` retried #{@max_undo_count} times."}
defp do_undo(value, module, options, arguments, context, undo_count) do
case module.undo(value, arguments, context, options) do
defp do_undo(value, step, arguments, context, undo_count) do
case Step.undo(step, value, arguments, context) do
:ok -> :ok
:retry -> do_undo(value, module, options, arguments, context, undo_count + 1)
:retry -> do_undo(value, step, arguments, context, undo_count + 1)
end
end
defp do_run(module, options, arguments, context) do
case module.run(arguments, context, options) do
defp do_run(step, arguments, context) do
case Step.run(step, arguments, context) do
{:ok, value} -> {:ok, value, []}
{:ok, value, steps} when is_list(steps) -> {:ok, value, steps}
{:retry, reason} -> {:retry, reason}
:retry -> :retry
{:error, reason} -> maybe_compensate(module, reason, arguments, context, options)
{:error, reason} -> maybe_compensate(step, reason, arguments, context)
{:halt, value} -> {:halt, value}
end
rescue
reason -> maybe_compensate(module, reason, arguments, context, options)
reason -> maybe_compensate(step, reason, arguments, context)
end
defp maybe_compensate(module, reason, arguments, context, options) do
if Step.can?(module, :compensate) do
compensate(module, reason, arguments, context, options)
defp maybe_compensate(step, reason, arguments, context) do
if Step.can?(step, :compensate) do
compensate(step, reason, arguments, context)
else
{:error, reason}
end
end
defp compensate(module, reason, arguments, context, options) do
case module.compensate(reason, arguments, context, options) do
defp compensate(step, reason, arguments, context) do
case Step.compensate(step, reason, arguments, context) do
{:continue, value} -> {:ok, value}
{:retry, reason} -> {:retry, reason}
:retry -> {:retry, reason}
@ -98,7 +90,7 @@ defmodule Reactor.Executor.StepRunner do
rescue
error ->
Logger.error(fn ->
"Warning: `#{inspect(module)}.compensate/4` raised an error:\n" <>
"Warning: step `#{inspect(step.name)}` `compensate/4` raised an error:\n" <>
Exception.format(:error, error, __STACKTRACE__)
end)

View file

@ -159,32 +159,55 @@ defmodule Reactor.Step do
options :: keyword
) :: undo_result
@doc """
Detect the capabilities of the step at runtime.
> This callback is automatically defined by `use Reactor.Step` however you're
> free to override it if you need specific behaviour.
Whenever Reactor would like to either undo a change made by the step, or
compensate a step failure this callback is called to detect whether the step
module is capable of the desired action.
The default implementation of this callback checks to see if the optional
callback is defined on the current module.
"""
@callback can?(step :: Step.t(), capability()) :: boolean
@doc """
Detect if the step can be run asynchronously at runtime.
> This callback is automatically defined by `use Reactor.Step` however you're
> free to override it if you need a specific behaviour.
This callback is called when Reactor is deciding whether to run a step
asynchronously.
The default implementation of this callback checks returns the the value of
the steps's `async?` key if it is boolean, or calls it with the steps's
options if it is a function.
"""
@callback async?(step) :: boolean
@optional_callbacks compensate: 4, undo: 4
@doc """
Find out of a step has a capability.
"""
@spec can?(module | Step.t(), capability()) :: boolean
def can?(%Step{impl: {module, _opts}}, capability) when is_atom(module),
do: can?(module, capability)
def can?(%Step{impl: module}, capability) when is_atom(module),
do: can?(module, capability)
def can?(module, capability) when is_atom(module) and capability in ~w[undo compensate]a,
do: function_exported?(module, capability, 4)
def can?(_step, _capability), do: false
@spec can?(Step.t(), capability()) :: boolean
def can?(step, capability) when is_struct(step, Step) and capability in ~w[undo compensate]a,
do:
module_and_options_from_step(step, fn module, _options -> module.can?(step, capability) end)
@doc """
Execute a step.
"""
@spec run(Step.t(), arguments :: Reactor.inputs(), context :: Reactor.context()) :: run_result()
def run(%{impl: {module, options}}, arguments, context) when is_atom(module),
do: module.run(arguments, context, options)
def run(%{impl: module}, arguments, context) when is_atom(module),
do: module.run(arguments, context, [])
def run(step, arguments, context),
do:
module_and_options_from_step(step, fn module, options ->
module.run(arguments, context, options)
end)
@doc """
Compensate a step
@ -195,41 +218,59 @@ defmodule Reactor.Step do
arguments :: Reactor.inputs(),
context :: Reactor.context()
) :: compensate_result()
def compensate(%{impl: {module, options}}, reason, arguments, context) when is_atom(module),
do: module.compensate(reason, arguments, context, options)
def compensate(%{impl: module}, reason, arguments, context) when is_atom(module),
do: module.compensate(reason, arguments, context, [])
def compensate(step, reason, arguments, context),
do:
module_and_options_from_step(step, fn module, options ->
module.compensate(reason, arguments, context, options)
end)
@doc """
Undo a step
"""
@spec undo(Step.t(), value :: any, arguments :: Reactor.inputs(), context :: Reactor.context()) ::
undo_result()
def undo(%{impl: {module, options}}, value, arguments, context) when is_atom(module),
do: module.undo(value, arguments, context, options)
def undo(step, value, arguments, context),
do:
module_and_options_from_step(step, fn module, options ->
module.undo(value, arguments, context, options)
end)
def undo(%{impl: module}, value, arguments, context) when is_atom(module),
do: module.undo(value, arguments, context, [])
@spec __using__(any()) ::
{:@, [{:column, 7} | {:context, Reactor.Step} | {:imports, [...]}, ...],
[{:behaviour, [...], [...]}, ...]}
@doc """
Is the step able to be run asynchronously?
"""
@spec async?(Step.t()) :: boolean
def async?(%{async?: async}) when is_boolean(async), do: async
def async?(step),
do: module_and_options_from_step(step, fn module, _opts -> module.async?(step) end)
def async?(%{async?: fun, impl: {_, opts}}) when is_function(fun, 1),
do: fun.(opts)
defp module_and_options_from_step(%{impl: {module, options}} = step, fun)
when is_struct(step, Step) and is_atom(module) and is_list(options) and is_function(fun, 2),
do: fun.(module, options)
def async?(%{async?: fun}) when is_function(fun, 1), do: fun.([])
def async?(_), do: false
defp module_and_options_from_step(%{impl: module} = step, fun)
when is_struct(step, Step) and is_atom(module) and is_function(fun, 2),
do: fun.(module, [])
@doc false
@spec __using__(keyword) :: Macro.output()
defmacro __using__(_opts) do
quote do
@behaviour unquote(__MODULE__)
@doc false
@impl unquote(__MODULE__)
def can?(_step, capability), do: function_exported?(__MODULE__, capability, 4)
@doc false
@impl unquote(__MODULE__)
def async?(step) when is_boolean(step.async?), do: step.async?
def async?(%{async?: fun, impl: {_, opts}}) when is_function(fun, 1),
do: fun.(opts)
def async?(%{async?: fun}) when is_function(fun, 1), do: fun.([])
def async?(_), do: false
defoverridable can?: 2, async?: 1
end
end
end

View file

@ -4,20 +4,30 @@ defmodule Reactor.StepTest do
alias Reactor.{Builder, Step}
describe "can/2" do
test "when the module defines `undo/4`, it can undo" do
assert Step.can?(Example.Step.Undoable, :undo)
defp step_for_module(module), do: %Step{impl: module}
test "when the step defines `undo/4`, it can undo" do
assert Example.Step.Undoable
|> step_for_module()
|> Step.can?(:undo)
end
test "when the module does not define `undo/4`, it cannot undo" do
refute Step.can?(Example.Step.Greeter, :undo)
test "when the step does not define `undo/4`, it cannot undo" do
refute Example.Step.Greeter
|> step_for_module()
|> Step.can?(:undo)
end
test "when the module defines `compensate/4`, it can compensate" do
assert Step.can?(Example.Step.Compensable, :compensate)
test "when the step defines `compensate/4`, it can compensate" do
assert Example.Step.Compensable
|> step_for_module()
|> Step.can?(:compensate)
end
test "when the module does not defined `compensate/4`, it cannot compensate" do
refute Step.can?(Example.Step.Greeter, :compensate)
test "when the step does not defined `compensate/4`, it cannot compensate" do
refute Example.Step.Greeter
|> step_for_module()
|> Step.can?(:compensate)
end
end