mirror of
https://github.com/ash-project/reactor.git
synced 2024-09-19 21:03:28 +12:00
improvement: Move can?/2
and async?/1
into Reactor.Step
behaviour. (#87)
Makes it a little bit easier for steps to define their own capabilities at runtime.
This commit is contained in:
parent
2de8f356d2
commit
769825874e
3 changed files with 111 additions and 68 deletions
|
@ -16,7 +16,6 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
{:ok, any, [Step.t()]} | :retry | {:retry, any} | {:error | :halt, any}
|
{:ok, any, [Step.t()]} | :retry | {:retry, any} | {:error | :halt, any}
|
||||||
def run(reactor, state, step, concurrency_key) do
|
def run(reactor, state, step, concurrency_key) do
|
||||||
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
||||||
{module, options} <- module_and_opts(step),
|
|
||||||
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
|
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
|
||||||
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
||||||
metadata = %{
|
metadata = %{
|
||||||
|
@ -28,11 +27,10 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
|
|
||||||
metadata_stack = Process.get(:__reactor__, [])
|
metadata_stack = Process.get(:__reactor__, [])
|
||||||
Process.put(:__reactor__, [metadata | metadata_stack])
|
Process.put(:__reactor__, [metadata | metadata_stack])
|
||||||
result = do_run(module, options, arguments, context)
|
result = do_run(step, arguments, context)
|
||||||
Process.put(:__reactor__, metadata_stack)
|
Process.put(:__reactor__, metadata_stack)
|
||||||
result
|
result
|
||||||
end
|
end
|
||||||
after
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
@ -42,53 +40,47 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
:ok | {:error, any}
|
:ok | {:error, any}
|
||||||
def undo(reactor, state, step, value, concurrency_key) do
|
def undo(reactor, state, step, value, concurrency_key) do
|
||||||
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
||||||
{module, options} <- module_and_opts(step),
|
|
||||||
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
|
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
|
||||||
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
||||||
do_undo(value, module, options, arguments, context)
|
do_undo(value, step, arguments, context)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp module_and_opts(%{impl: {module, options}}) when is_atom(module) and is_list(options),
|
defp do_undo(value, step, arguments, context, undo_count \\ 0)
|
||||||
do: {module, options}
|
|
||||||
|
|
||||||
defp module_and_opts(%{impl: module}) when is_atom(module), do: {module, []}
|
defp do_undo(_value, step, _arguments, _context, @max_undo_count),
|
||||||
|
do: {:error, "`undo/4` retried #{@max_undo_count} times on step `#{inspect(step.name)}`."}
|
||||||
|
|
||||||
defp do_undo(value, module, options, arguments, context, undo_count \\ 0)
|
defp do_undo(value, step, arguments, context, undo_count) do
|
||||||
|
case Step.undo(step, value, arguments, context) do
|
||||||
defp do_undo(_value, module, _options, _arguments, _context, @max_undo_count),
|
|
||||||
do: {:error, "`#{inspect(module)}.undo/4` retried #{@max_undo_count} times."}
|
|
||||||
|
|
||||||
defp do_undo(value, module, options, arguments, context, undo_count) do
|
|
||||||
case module.undo(value, arguments, context, options) do
|
|
||||||
:ok -> :ok
|
:ok -> :ok
|
||||||
:retry -> do_undo(value, module, options, arguments, context, undo_count + 1)
|
:retry -> do_undo(value, step, arguments, context, undo_count + 1)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp do_run(module, options, arguments, context) do
|
defp do_run(step, arguments, context) do
|
||||||
case module.run(arguments, context, options) do
|
case Step.run(step, arguments, context) do
|
||||||
{:ok, value} -> {:ok, value, []}
|
{:ok, value} -> {:ok, value, []}
|
||||||
{:ok, value, steps} when is_list(steps) -> {:ok, value, steps}
|
{:ok, value, steps} when is_list(steps) -> {:ok, value, steps}
|
||||||
{:retry, reason} -> {:retry, reason}
|
{:retry, reason} -> {:retry, reason}
|
||||||
:retry -> :retry
|
:retry -> :retry
|
||||||
{:error, reason} -> maybe_compensate(module, reason, arguments, context, options)
|
{:error, reason} -> maybe_compensate(step, reason, arguments, context)
|
||||||
{:halt, value} -> {:halt, value}
|
{:halt, value} -> {:halt, value}
|
||||||
end
|
end
|
||||||
rescue
|
rescue
|
||||||
reason -> maybe_compensate(module, reason, arguments, context, options)
|
reason -> maybe_compensate(step, reason, arguments, context)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_compensate(module, reason, arguments, context, options) do
|
defp maybe_compensate(step, reason, arguments, context) do
|
||||||
if Step.can?(module, :compensate) do
|
if Step.can?(step, :compensate) do
|
||||||
compensate(module, reason, arguments, context, options)
|
compensate(step, reason, arguments, context)
|
||||||
else
|
else
|
||||||
{:error, reason}
|
{:error, reason}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp compensate(module, reason, arguments, context, options) do
|
defp compensate(step, reason, arguments, context) do
|
||||||
case module.compensate(reason, arguments, context, options) do
|
case Step.compensate(step, reason, arguments, context) do
|
||||||
{:continue, value} -> {:ok, value}
|
{:continue, value} -> {:ok, value}
|
||||||
{:retry, reason} -> {:retry, reason}
|
{:retry, reason} -> {:retry, reason}
|
||||||
:retry -> {:retry, reason}
|
:retry -> {:retry, reason}
|
||||||
|
@ -98,7 +90,7 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
rescue
|
rescue
|
||||||
error ->
|
error ->
|
||||||
Logger.error(fn ->
|
Logger.error(fn ->
|
||||||
"Warning: `#{inspect(module)}.compensate/4` raised an error:\n" <>
|
"Warning: step `#{inspect(step.name)}` `compensate/4` raised an error:\n" <>
|
||||||
Exception.format(:error, error, __STACKTRACE__)
|
Exception.format(:error, error, __STACKTRACE__)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
|
|
@ -159,32 +159,55 @@ defmodule Reactor.Step do
|
||||||
options :: keyword
|
options :: keyword
|
||||||
) :: undo_result
|
) :: undo_result
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Detect the capabilities of the step at runtime.
|
||||||
|
|
||||||
|
> This callback is automatically defined by `use Reactor.Step` however you're
|
||||||
|
> free to override it if you need specific behaviour.
|
||||||
|
|
||||||
|
Whenever Reactor would like to either undo a change made by the step, or
|
||||||
|
compensate a step failure this callback is called to detect whether the step
|
||||||
|
module is capable of the desired action.
|
||||||
|
|
||||||
|
The default implementation of this callback checks to see if the optional
|
||||||
|
callback is defined on the current module.
|
||||||
|
"""
|
||||||
|
@callback can?(step :: Step.t(), capability()) :: boolean
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Detect if the step can be run asynchronously at runtime.
|
||||||
|
|
||||||
|
> This callback is automatically defined by `use Reactor.Step` however you're
|
||||||
|
> free to override it if you need a specific behaviour.
|
||||||
|
|
||||||
|
This callback is called when Reactor is deciding whether to run a step
|
||||||
|
asynchronously.
|
||||||
|
|
||||||
|
The default implementation of this callback checks returns the the value of
|
||||||
|
the steps's `async?` key if it is boolean, or calls it with the steps's
|
||||||
|
options if it is a function.
|
||||||
|
"""
|
||||||
|
@callback async?(step) :: boolean
|
||||||
|
|
||||||
@optional_callbacks compensate: 4, undo: 4
|
@optional_callbacks compensate: 4, undo: 4
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Find out of a step has a capability.
|
Find out of a step has a capability.
|
||||||
"""
|
"""
|
||||||
@spec can?(module | Step.t(), capability()) :: boolean
|
@spec can?(Step.t(), capability()) :: boolean
|
||||||
def can?(%Step{impl: {module, _opts}}, capability) when is_atom(module),
|
def can?(step, capability) when is_struct(step, Step) and capability in ~w[undo compensate]a,
|
||||||
do: can?(module, capability)
|
do:
|
||||||
|
module_and_options_from_step(step, fn module, _options -> module.can?(step, capability) end)
|
||||||
def can?(%Step{impl: module}, capability) when is_atom(module),
|
|
||||||
do: can?(module, capability)
|
|
||||||
|
|
||||||
def can?(module, capability) when is_atom(module) and capability in ~w[undo compensate]a,
|
|
||||||
do: function_exported?(module, capability, 4)
|
|
||||||
|
|
||||||
def can?(_step, _capability), do: false
|
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Execute a step.
|
Execute a step.
|
||||||
"""
|
"""
|
||||||
@spec run(Step.t(), arguments :: Reactor.inputs(), context :: Reactor.context()) :: run_result()
|
@spec run(Step.t(), arguments :: Reactor.inputs(), context :: Reactor.context()) :: run_result()
|
||||||
def run(%{impl: {module, options}}, arguments, context) when is_atom(module),
|
def run(step, arguments, context),
|
||||||
do: module.run(arguments, context, options)
|
do:
|
||||||
|
module_and_options_from_step(step, fn module, options ->
|
||||||
def run(%{impl: module}, arguments, context) when is_atom(module),
|
module.run(arguments, context, options)
|
||||||
do: module.run(arguments, context, [])
|
end)
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Compensate a step
|
Compensate a step
|
||||||
|
@ -195,41 +218,59 @@ defmodule Reactor.Step do
|
||||||
arguments :: Reactor.inputs(),
|
arguments :: Reactor.inputs(),
|
||||||
context :: Reactor.context()
|
context :: Reactor.context()
|
||||||
) :: compensate_result()
|
) :: compensate_result()
|
||||||
def compensate(%{impl: {module, options}}, reason, arguments, context) when is_atom(module),
|
def compensate(step, reason, arguments, context),
|
||||||
do: module.compensate(reason, arguments, context, options)
|
do:
|
||||||
|
module_and_options_from_step(step, fn module, options ->
|
||||||
def compensate(%{impl: module}, reason, arguments, context) when is_atom(module),
|
module.compensate(reason, arguments, context, options)
|
||||||
do: module.compensate(reason, arguments, context, [])
|
end)
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Undo a step
|
Undo a step
|
||||||
"""
|
"""
|
||||||
@spec undo(Step.t(), value :: any, arguments :: Reactor.inputs(), context :: Reactor.context()) ::
|
@spec undo(Step.t(), value :: any, arguments :: Reactor.inputs(), context :: Reactor.context()) ::
|
||||||
undo_result()
|
undo_result()
|
||||||
def undo(%{impl: {module, options}}, value, arguments, context) when is_atom(module),
|
def undo(step, value, arguments, context),
|
||||||
do: module.undo(value, arguments, context, options)
|
do:
|
||||||
|
module_and_options_from_step(step, fn module, options ->
|
||||||
|
module.undo(value, arguments, context, options)
|
||||||
|
end)
|
||||||
|
|
||||||
def undo(%{impl: module}, value, arguments, context) when is_atom(module),
|
|
||||||
do: module.undo(value, arguments, context, [])
|
|
||||||
|
|
||||||
@spec __using__(any()) ::
|
|
||||||
{:@, [{:column, 7} | {:context, Reactor.Step} | {:imports, [...]}, ...],
|
|
||||||
[{:behaviour, [...], [...]}, ...]}
|
|
||||||
@doc """
|
@doc """
|
||||||
Is the step able to be run asynchronously?
|
Is the step able to be run asynchronously?
|
||||||
"""
|
"""
|
||||||
@spec async?(Step.t()) :: boolean
|
@spec async?(Step.t()) :: boolean
|
||||||
def async?(%{async?: async}) when is_boolean(async), do: async
|
def async?(step),
|
||||||
|
do: module_and_options_from_step(step, fn module, _opts -> module.async?(step) end)
|
||||||
|
|
||||||
def async?(%{async?: fun, impl: {_, opts}}) when is_function(fun, 1),
|
defp module_and_options_from_step(%{impl: {module, options}} = step, fun)
|
||||||
do: fun.(opts)
|
when is_struct(step, Step) and is_atom(module) and is_list(options) and is_function(fun, 2),
|
||||||
|
do: fun.(module, options)
|
||||||
|
|
||||||
def async?(%{async?: fun}) when is_function(fun, 1), do: fun.([])
|
defp module_and_options_from_step(%{impl: module} = step, fun)
|
||||||
def async?(_), do: false
|
when is_struct(step, Step) and is_atom(module) and is_function(fun, 2),
|
||||||
|
do: fun.(module, [])
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@spec __using__(keyword) :: Macro.output()
|
||||||
defmacro __using__(_opts) do
|
defmacro __using__(_opts) do
|
||||||
quote do
|
quote do
|
||||||
@behaviour unquote(__MODULE__)
|
@behaviour unquote(__MODULE__)
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl unquote(__MODULE__)
|
||||||
|
def can?(_step, capability), do: function_exported?(__MODULE__, capability, 4)
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl unquote(__MODULE__)
|
||||||
|
def async?(step) when is_boolean(step.async?), do: step.async?
|
||||||
|
|
||||||
|
def async?(%{async?: fun, impl: {_, opts}}) when is_function(fun, 1),
|
||||||
|
do: fun.(opts)
|
||||||
|
|
||||||
|
def async?(%{async?: fun}) when is_function(fun, 1), do: fun.([])
|
||||||
|
def async?(_), do: false
|
||||||
|
|
||||||
|
defoverridable can?: 2, async?: 1
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -4,20 +4,30 @@ defmodule Reactor.StepTest do
|
||||||
alias Reactor.{Builder, Step}
|
alias Reactor.{Builder, Step}
|
||||||
|
|
||||||
describe "can/2" do
|
describe "can/2" do
|
||||||
test "when the module defines `undo/4`, it can undo" do
|
defp step_for_module(module), do: %Step{impl: module}
|
||||||
assert Step.can?(Example.Step.Undoable, :undo)
|
|
||||||
|
test "when the step defines `undo/4`, it can undo" do
|
||||||
|
assert Example.Step.Undoable
|
||||||
|
|> step_for_module()
|
||||||
|
|> Step.can?(:undo)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the module does not define `undo/4`, it cannot undo" do
|
test "when the step does not define `undo/4`, it cannot undo" do
|
||||||
refute Step.can?(Example.Step.Greeter, :undo)
|
refute Example.Step.Greeter
|
||||||
|
|> step_for_module()
|
||||||
|
|> Step.can?(:undo)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the module defines `compensate/4`, it can compensate" do
|
test "when the step defines `compensate/4`, it can compensate" do
|
||||||
assert Step.can?(Example.Step.Compensable, :compensate)
|
assert Example.Step.Compensable
|
||||||
|
|> step_for_module()
|
||||||
|
|> Step.can?(:compensate)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the module does not defined `compensate/4`, it cannot compensate" do
|
test "when the step does not defined `compensate/4`, it cannot compensate" do
|
||||||
refute Step.can?(Example.Step.Greeter, :compensate)
|
refute Example.Step.Greeter
|
||||||
|
|> step_for_module()
|
||||||
|
|> Step.can?(:compensate)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue