improvement: Add ability to compose reactors.

This commit is contained in:
James Harton 2023-06-12 16:24:09 +12:00 committed by James Harton
parent 4d133fd50a
commit 1af160c6b3
52 changed files with 2718 additions and 454 deletions

View file

@ -4,6 +4,8 @@ spark_locals_without_parens = [
argument: 2,
argument: 3,
async?: 1,
compose: 2,
compose: 3,
input: 1,
input: 2,
max_retries: 1,

View file

@ -2,6 +2,7 @@
"cSpell.words": [
"backoff",
"casted",
"mappish"
"mappish",
"Planable"
]
}

View file

@ -51,13 +51,14 @@ defmodule Reactor do
"""
defstruct context: %{},
id: nil,
inputs: [],
intermediate_results: %{},
plan: nil,
undo: [],
return: nil,
state: :pending,
steps: []
steps: [],
undo: []
use Spark.Dsl, default_extensions: [extensions: Dsl]
@ -75,6 +76,7 @@ defmodule Reactor do
@type t :: %Reactor{
context: context,
id: any,
inputs: [atom],
intermediate_results: %{any => any},
plan: nil | Graph.t(),

View file

@ -9,7 +9,7 @@ defmodule Reactor.Argument do
@type t :: %Argument{
name: atom,
source: Template.t(),
source: Template.Input.t() | Template.Result.t() | Template.Value.t(),
transform: nil | (any -> any) | {module, keyword} | mfa
}
@ -45,7 +45,7 @@ defmodule Reactor.Argument do
## Example
iex> Argument.from_input(:argument_name, :step_name, &Atom.to_string/1)
iex> Argument.from_result(:argument_name, :step_name, &Atom.to_string/1)
"""
@spec from_result(atom, any, nil | (any -> any)) :: Argument.t()
@ -53,6 +53,17 @@ defmodule Reactor.Argument do
when is_atom(name) and maybe_transform(transform),
do: %Argument{name: name, source: %Template.Result{name: result_name}, transform: transform}
@doc """
Build an argument which refers to a statically defined value.
## Example
iex> Argument.from_value(:argument_name, 10)
"""
@spec from_value(atom, any, nil | (any -> any)) :: Argument.t()
def from_value(name, value, transform \\ nil) when is_atom(name) and maybe_transform(transform),
do: %Argument{name: name, source: %Template.Value{value: value}, transform: transform}
@doc """
Validate that the argument is an Argument struct.
"""
@ -68,6 +79,11 @@ defmodule Reactor.Argument do
"""
defguard is_from_result(argument) when is_struct(argument.source, Template.Result)
@doc """
Validate that the argument contains a static value.
"""
defguard is_from_value(argument) when is_struct(argument.source, Template.Value)
@doc """
Validate that the argument has a transform.
"""

View file

@ -62,4 +62,32 @@ defmodule Reactor.Argument.Templates do
def result(link_name) do
%Template.Result{name: link_name}
end
@doc ~S"""
The `value` template helper for the Reactor DSL.
## Example
```elixir
defmodule ExampleReactor do
use Reactor
input :number
step :times_three do
argument :lhs, input(:number)
# here: -------↓↓↓↓↓
argument :rhs, value(3)
impl fn args, _, _ ->
{:ok, args.lhs * args.rhs}
end
end
end
```
"""
@spec value(any) :: Template.Value.t()
def value(value) do
%Template.Value{value: value}
end
end

View file

@ -18,15 +18,10 @@ defmodule Reactor.Builder do
```
"""
alias Reactor.{Argument, Step, Template}
import Argument, only: :macros
alias Reactor.{Argument, Builder, Step}
import Reactor, only: :macros
import Reactor.Utils
defguardp is_mfa(mfa)
when tuple_size(mfa) == 2 and is_atom(elem(mfa, 0)) and
is_list(elem(mfa, 1))
@type step_options :: [async? | max_retries() | arguments_transform | context]
@typedoc "Should the step be run asynchronously?"
@ -48,9 +43,13 @@ defmodule Reactor.Builder do
@doc """
Build a new, empty Reactor.
Optionally an identifier for the Reactor. This is primarily used for recursive
composition tracking.
"""
@spec new :: Reactor.t()
def new, do: %Reactor{}
@spec new(any) :: Reactor.t()
def new(id \\ make_ref()),
do: %Reactor{id: id, context: %{private: %{composed_reactors: MapSet.new([id])}}}
@doc """
Add a named input to the Reactor.
@ -62,42 +61,22 @@ defmodule Reactor.Builder do
def add_input(reactor, name, transform \\ nil)
def add_input(reactor, _name, _transform) when not is_reactor(reactor),
do: {:error, ArgumentError.exception("`reactor`: not a Reactor")}
do: {:error, argument_error(:reactor, "not a Reactor", reactor)}
def add_input(reactor, name, nil) do
step = %Step{
arguments: [],
async?: true,
impl: {Step.Input, name: name},
name: {:input, name},
max_retries: 0,
ref: make_ref()
}
def add_input(reactor, name, transform),
do: Builder.Input.add_input(reactor, name, transform)
{:ok, %{reactor | inputs: [name | reactor.inputs], steps: [step | reactor.steps]}}
end
@doc """
Raising version of `add_input/2..3`.
"""
@spec add_input!(Reactor.t(), any, nil | (any -> any)) :: Reactor.t() | no_return
def add_input!(reactor, name, transform \\ nil)
def add_input(reactor, name, transform)
when is_function(transform, 1) or
(tuple_size(transform) == 2 and is_atom(elem(transform, 0)) and
is_list(elem(transform, 1))) do
input_step = %Step{
arguments: [],
async?: true,
impl: {Step.Input, name: name},
name: {:raw_input, name},
max_retries: 0,
ref: make_ref()
}
transform_step = build_transform_step({:raw_input, name}, {:input, name}, transform)
{:ok,
%{
reactor
| inputs: [name | reactor.inputs],
steps: [input_step, transform_step | reactor.steps]
}}
def add_input!(reactor, name, transform) do
case add_input(reactor, name, transform) do
{:ok, reactor} -> reactor
{:error, reason} -> raise reason
end
end
@doc """
@ -117,88 +96,67 @@ defmodule Reactor.Builder do
def add_step(reactor, name, impl, arguments \\ [], options \\ [])
def add_step(reactor, _name, _impl, _arguments, _options) when not is_reactor(reactor),
do: {:error, ArgumentError.exception("`reactor`: not a Reactor")}
do: {:error, argument_error(:reactor, "not a Reactor", reactor)}
def add_step(_reactor, _name, _impl, arguments, _options) when not is_list(arguments),
do: {:error, ArgumentError.exception("`arguments` is not a list")}
do: {:error, argument_error(:arguments, "not a list", arguments)}
def add_step(_reactor, _name, _impl, _arguments, options) when not is_list(options),
do: {:error, ArgumentError.exception("`options` is not a list")}
do: {:error, argument_error(:options, "not a list", options)}
def add_step(reactor, name, impl, arguments, options) do
with {:ok, arguments} <- assert_all_are_arguments(arguments),
:ok <- assert_is_step_impl(impl),
{:ok, arguments, argument_transform_steps} <-
build_argument_transform_steps(arguments, name),
{:ok, arguments, step_transform_step} <-
maybe_build_step_transform_step(arguments, name, options[:transform]) do
context =
if step_transform_step do
options
|> Keyword.get(:context, %{})
|> deep_merge(%{private: %{replace_arguments: :value}})
else
Keyword.get(options, :context, %{})
end
def add_step(reactor, name, impl, arguments, options),
do: Builder.Step.add_step(reactor, name, impl, arguments, options)
steps =
[
%Step{
arguments: arguments,
async?: Keyword.get(options, :async?, true),
context: context,
impl: impl,
name: name,
max_retries: Keyword.get(options, :max_retries, 100),
ref: make_ref()
}
]
|> Enum.concat(argument_transform_steps)
|> maybe_append(step_transform_step)
|> Enum.concat(reactor.steps)
@doc """
Raising version of `add_step/3..5`.
"""
@spec add_step!(Reactor.t(), name :: any, impl, [step_argument], step_options) ::
Reactor.t() | no_return
def add_step!(reactor, name, impl, arguments \\ [], options \\ [])
{:ok, %{reactor | steps: steps}}
def add_step!(reactor, name, impl, arguments, options) do
case add_step(reactor, name, impl, arguments, options) do
{:ok, reactor} -> reactor
{:error, reason} -> raise reason
end
end
@doc """
Build a step which can be added to a reactor at runtime.
Note that the built step doesn't support argument transformations - you should
add an additional step to do the transformation needed (this is what
`add_step/5` does anyway).
Note that the built step doesn't support transformations - you should add an
additional step to do the transformation needed (this is what `add_step/5`
does anyway).
"""
@spec new_step(name :: any, impl, [step_argument], step_options) ::
{:ok, Step.t()} | {:error, any}
@spec new_step(any, impl, [step_argument], step_options) :: {:ok, Step.t()} | {:error, any}
def new_step(name, impl, arguments \\ [], options \\ [])
def new_step(_name, _impl, arguments, _options) when not is_list(arguments),
do: {:error, ArgumentError.exception("`arguments` is not a list")}
do: {:error, argument_error(:arguments, "not a list", arguments)}
def new_step(_name, _impl, _arguments, options) when not is_list(options),
do: {:error, ArgumentError.exception("`options` is not a list")}
do: {:error, argument_error(:options, "not a list", options)}
def new_step(name, impl, arguments, options) do
with {:ok, arguments} <- assert_all_are_arguments(arguments),
:ok <- assert_is_step_impl(impl) do
step = %Step{
arguments: arguments,
async?: Keyword.get(options, :async?, true),
context: Keyword.get(options, :context, %{}),
impl: impl,
name: name,
max_retries: Keyword.get(options, :max_retries, 100),
ref: make_ref()
}
def new_step(name, impl, arguments, options),
do: Builder.Step.new_step(name, impl, arguments, options)
{:ok, step}
@doc """
Raising version of `new_step/2..4`.
"""
@spec new_step!(any, impl, [step_argument], step_options) :: Step.t() | no_return
def new_step!(name, impl, arguments \\ [], options \\ [])
def new_step!(name, impl, arguments, options) do
case new_step(name, impl, arguments, options) do
{:ok, step} -> step
{:error, reason} -> raise reason
end
end
@doc """
Specify the return value of the Reactor.
The return value must be the result of a completed step.
The return value must be the name of a step.
"""
@spec return(Reactor.t(), any) :: {:ok, Reactor.t()} | {:error, any}
def return(reactor, name) do
@ -209,127 +167,59 @@ defmodule Reactor.Builder do
if name in step_names do
{:ok, %{reactor | return: name}}
else
{:error, ArgumentError.exception("`#{inspect(name)}` is not an existing step name")}
{:error, argument_error(:name, "not an existing step name.", name)}
end
end
defp assert_all_are_arguments(arguments) do
Enum.reduce_while(arguments, {:ok, []}, fn
argument, {:ok, arguments} when is_argument(argument) ->
{:cont, {:ok, [argument | arguments]}}
{name, {:input, source}}, {:ok, arguments} ->
{:cont, {:ok, [Argument.from_input(name, source) | arguments]}}
{name, {:result, source}}, {:ok, arguments} ->
{:cont, {:ok, [Argument.from_result(name, source) | arguments]}}
not_argument, _ ->
{:halt,
{:error,
ArgumentError.exception(
"Value `#{inspect(not_argument)}` is not a `Reactor.Argument` struct."
)}}
end)
end
defp assert_is_step_impl({impl, opts}) when is_list(opts), do: assert_is_step_impl(impl)
defp assert_is_step_impl(impl) when is_atom(impl) do
if Spark.implements_behaviour?(impl, Step) do
:ok
else
{:error,
ArgumentError.exception(
"Module `#{inspect(impl)}` does not implement the `Reactor.Step` behaviour."
)}
@doc """
Raising version of `return/2`.
"""
@spec return!(Reactor.t(), any) :: Reactor.t() | no_return
def return!(reactor, name) do
case return(reactor, name) do
{:ok, reactor} -> reactor
{:error, reason} -> raise reason
end
end
defp build_argument_transform_steps(arguments, step_name) do
arguments
|> Enum.reduce_while({:ok, [], []}, fn
argument, {:ok, arguments, steps}
when is_from_input(argument) and has_transform(argument) ->
transform_step_name = {:__reactor__, :transform, argument.name, step_name}
@doc """
Compose another Reactor inside this one.
step =
build_transform_step(
{:input, argument.source.name},
transform_step_name,
argument.transform
)
Whenever possible this function will extract the steps from inner Reactor and
place them inside the parent Reactor. In order to achieve this the composer
will rename the steps to ensure that there are no conflicts.
argument = %Argument{
name: argument.name,
source: %Template.Result{name: transform_step_name}
}
If you're attempting to create a recursive Reactor (ie compose a Reactor
within itself) then this will be detected and runtime composition will be used
instead. See `Reactor.Step.Compose` for more details.
"""
@spec compose(Reactor.t(), atom, Reactor.t() | module, [step_argument]) ::
{:ok, Reactor.t()} | {:error, any}
def compose(reactor, _name, _inner_reactor, _arguments) when not is_reactor(reactor),
do: {:error, argument_error(:reactor, "not a Reactor", reactor)}
{:cont, {:ok, [argument | arguments], [%{step | transform: nil} | steps]}}
def compose(_reactor, name, _inner_reactor, _arguments) when not is_atom(name),
do: {:error, argument_error(:name, "not an atom", name)}
argument, {:ok, arguments, steps}
when is_from_result(argument) and has_transform(argument) ->
transform_step_name = {:__reactor__, :transform, argument.name, step_name}
def compose(_reactor, _name, inner_reactor, _arguments)
when not is_reactor(inner_reactor) and not is_atom(inner_reactor),
do: {:error, argument_error(:inner_reactor, "not a Reactor", inner_reactor)}
step =
build_transform_step(
argument.source.name,
transform_step_name,
argument.transform
)
def compose(_reactor, _name, _inner_reactor, arguments) when not is_list(arguments),
do: {:error, argument_error(:arguments, "not a list", arguments)}
argument = %Argument{
name: argument.name,
source: %Template.Result{name: transform_step_name}
}
def compose(reactor, name, inner_reactor, arguments),
do: Builder.Compose.compose(reactor, name, inner_reactor, arguments)
{:cont, {:ok, [argument | arguments], [%{step | transform: nil} | steps]}}
argument, {:ok, arguments, steps} when is_from_input(argument) ->
argument = %{argument | source: %Template.Result{name: {:input, argument.source.name}}}
{:cont, {:ok, [argument | arguments], steps}}
argument, {:ok, arguments, steps} when is_from_result(argument) ->
{:cont, {:ok, [argument | arguments], steps}}
end)
end
defp maybe_build_step_transform_step(arguments, _name, nil), do: {:ok, arguments, nil}
defp maybe_build_step_transform_step(arguments, name, transform)
when is_function(transform, 1),
do: maybe_build_step_transform_step(arguments, name, {Step.TransformAll, fun: transform})
defp maybe_build_step_transform_step(arguments, name, transform) do
step = %Step{
arguments: arguments,
async?: true,
impl: transform,
name: {:__reactor__, :transform, name},
max_retries: 0,
ref: make_ref()
}
{:ok, [Argument.from_result(:value, step.name)], step}
end
defp build_transform_step(input_name, step_name, transform) when is_function(transform, 1),
do: build_transform_step(input_name, step_name, {Step.Transform, fun: transform})
defp build_transform_step(input_name, step_name, transform) when is_mfa(transform) do
%Step{
arguments: [
%Argument{
name: :value,
source: %Template.Result{name: input_name}
}
],
async?: true,
impl: transform,
name: step_name,
max_retries: 0,
ref: make_ref()
}
@doc """
Raising version of `compose/4`.
"""
@spec compose!(Reactor.t(), atom, Reactor.t() | module, [step_argument]) ::
Reactor.t() | no_return
def compose!(reactor, name, inner_reactor, arguments) do
case compose(reactor, name, inner_reactor, arguments) do
{:ok, reactor} -> reactor
{:error, reason} -> raise reason
end
end
end

View file

@ -0,0 +1,23 @@
defmodule Reactor.Builder.Argument do
@moduledoc false
import Reactor.Argument, only: :macros
import Reactor.Utils
alias Reactor.Argument
@doc """
Given a list of argument structs or keywords convert them all into Argument
structs if possible, otherwise error.
"""
@spec assert_all_are_arguments([Argument.t() | {atom, {:input | :result, any} | any}]) ::
{:ok, [Argument.t()]} | {:error, Exception.t()}
def assert_all_are_arguments(arguments) do
map_while_ok(arguments, fn
argument when is_argument(argument) -> {:ok, argument}
{name, {:input, source}} -> {:ok, Argument.from_input(name, source)}
{name, {:result, source}} -> {:ok, Argument.from_result(name, source)}
{name, value} -> {:ok, Argument.from_value(name, value)}
_ -> {:error, argument_error(:arguments, "contains a non-argument value.", arguments)}
end)
end
end

View file

@ -0,0 +1,209 @@
defmodule Reactor.Builder.Compose do
@moduledoc """
Handle composition of Reactors for the builder.
The composition logic was getting complicated enough that it seemed sensible
to extract it from the builder - if only to aid readability.
You should not use this module directly, but instead use
`Reactor.Builder.compose/4`.
"""
import Reactor, only: :macros
import Reactor.Argument, only: :macros
import Reactor.Builder.Argument
import Reactor.Utils
alias Reactor.{Argument, Builder, Error.ComposeError, Step}
@doc """
Compose another Reactor inside this one.
"""
@spec compose(Reactor.t(), atom, Reactor.t() | module, [Builder.step_argument()]) ::
{:ok, Reactor.t()} | {:error, any}
def compose(reactor, name, inner_reactor, arguments) when is_atom(inner_reactor) do
if compose_would_be_recursive?(reactor, inner_reactor) do
do_runtime_compose(reactor, name, inner_reactor, arguments)
else
case Reactor.Info.to_struct(inner_reactor) do
{:ok, inner_reactor} -> compose(reactor, name, inner_reactor, arguments)
{:error, reason} -> {:error, reason}
end
end
end
def compose(reactor, name, inner_reactor, arguments) when not is_nil(inner_reactor.plan) do
steps =
inner_reactor.plan
|> Graph.vertices()
|> Enum.concat(inner_reactor.steps)
compose(
reactor,
name,
%{inner_reactor | steps: steps, plan: nil},
arguments
)
end
def compose(reactor, name, inner_reactor, arguments)
when is_reactor(reactor) and is_atom(name) and is_reactor(inner_reactor) and
is_list(arguments) do
if compose_would_be_recursive?(reactor, inner_reactor.id) do
do_runtime_compose(reactor, name, inner_reactor, arguments)
else
do_static_compose(reactor, name, inner_reactor, arguments)
end
end
defp do_runtime_compose(reactor, name, inner_reactor, arguments) do
Builder.add_step(reactor, name, {Step.Compose, reactor: inner_reactor}, arguments,
max_retries: 0
)
end
def do_static_compose(reactor, name, inner_reactor, arguments) do
with {:ok, arguments} <- assert_all_are_arguments(arguments),
:ok <- assert_arguments_match_inner_reactor_inputs(arguments, inner_reactor),
{:ok, steps} <- rewrite_steps(inner_reactor, name, arguments),
{:ok, return} <- build_return_step(reactor, inner_reactor, name) do
steps =
steps
|> Enum.concat(reactor.steps)
|> Enum.concat([return])
reactor =
reactor
|> Map.put(:steps, steps)
|> add_composed_reactor(inner_reactor)
{:ok, reactor}
end
end
defp get_composed_reactors(reactor) do
reactor
|> Map.get(:context, %{})
|> Map.get(:private, %{})
|> Map.get(:composed_reactors, MapSet.new())
end
defp add_composed_reactor(reactor, inner_reactor) do
composed_reactors =
reactor
|> get_composed_reactors()
|> MapSet.put(inner_reactor.id)
%{
reactor
| context: deep_merge(reactor.context, %{private: %{composed_reactors: composed_reactors}})
}
end
defp compose_would_be_recursive?(reactor, id) when reactor.id == id, do: true
defp compose_would_be_recursive?(reactor, id) do
reactor
|> get_composed_reactors()
|> MapSet.member?(id)
end
defp build_return_step(reactor, inner_reactor, _name) when is_nil(inner_reactor.return),
do:
{:error,
ComposeError.exception(
outer_reactor: reactor,
inner_reactor: inner_reactor,
message: "The inner Reactor must have an explicit return value."
)}
defp build_return_step(_reactor, inner_reactor, name) do
{:ok,
%Step{
arguments: [
Argument.from_result(:value, {:__reactor__, :compose, name, inner_reactor.return})
],
name: name,
async?: true,
impl: {Step.AnonFn, fun: &{:ok, Map.fetch!(&1, :value)}},
max_retries: 0,
ref: make_ref()
}}
end
defp assert_arguments_match_inner_reactor_inputs(arguments, inner_reactor) do
required_arguments = MapSet.new(inner_reactor.inputs)
provided_arguments = MapSet.new(arguments, & &1.name)
required_arguments
|> MapSet.difference(provided_arguments)
|> Enum.to_list()
|> case do
[] ->
:ok
[missing] ->
{:error,
ComposeError.exception(
inner_reactor: inner_reactor,
arguments: arguments,
message: "Missing argument for `#{missing}` input."
)}
missing ->
missing = sentence(missing, &"`#{&1}`", ", ", " and ")
{:error,
ComposeError.exception(
inner_reactor: inner_reactor,
arguments: arguments,
message: "Missing arguments for the following Reactor inputs; #{missing}"
)}
end
end
defp rewrite_steps(reactor, name, input_arguments) do
input_arguments = Map.new(input_arguments, &{&1.name, &1})
reactor
|> extract_steps()
|> map_while_ok(&rewrite_step(&1, name, input_arguments))
end
defp rewrite_step(step, name, input_arguments) do
with {:ok, arguments} <- rewrite_arguments(step.arguments, name, input_arguments) do
step = %{
step
| arguments: arguments,
name: {:__reactor__, :compose, name, step.name},
impl: {Step.ComposeWrapper, original: step.impl, prefix: [:__reactor__, :compose, name]}
}
{:ok, step}
end
end
defp extract_steps(reactor) when is_nil(reactor.plan), do: reactor.steps
defp extract_steps(reactor) do
reactor.plan
|> Graph.vertices()
|> Enum.concat(reactor.steps)
end
defp rewrite_arguments(arguments, name, input_arguments),
do: map_while_ok(arguments, &rewrite_argument(&1, name, input_arguments))
defp rewrite_argument(argument, _name, input_arguments) when is_from_input(argument) do
input_argument = Map.fetch!(input_arguments, argument.source.name)
{:ok, %{argument | source: input_argument.source}}
end
defp rewrite_argument(argument, name, _input_arguments) when is_from_result(argument) do
source = %{argument.source | name: {:__reactor__, :compose, name, argument.source.name}}
{:ok, %{argument | source: source}}
end
defp rewrite_argument(argument, _name, _input_arguments) when is_from_value(argument),
do: {:ok, argument}
end

View file

@ -0,0 +1,38 @@
defmodule Reactor.Builder.Input do
@moduledoc """
Handle adding inputs to Reactors for the builder.
You should not use this module directly, but instead use
`Reactor.Builder.add_input/3`.
"""
alias Reactor.{Argument, Step}
import Reactor.Utils
@doc """
Add a named input to the reactor.
"""
@spec add_input(Reactor.t(), any, nil | (any -> any) | {Step.step(), keyword}) ::
{:ok, Reactor.t()} | {:error, any}
def add_input(reactor, name, nil), do: {:ok, %{reactor | inputs: [name | reactor.inputs]}}
def add_input(reactor, name, transform) when is_function(transform, 1),
do: add_input(reactor, name, {Step.Transform, fun: transform})
def add_input(reactor, name, transform)
when tuple_size(transform) == 2 and is_atom(elem(transform, 0)) and
is_list(elem(transform, 1)) do
transform_step = %Step{
arguments: [Argument.from_input(:value, name)],
async?: true,
impl: transform,
name: {:__reactor__, :transform, :input, name},
max_retries: 0,
ref: make_ref()
}
{:ok, %{reactor | inputs: [name | reactor.inputs], steps: [transform_step | reactor.steps]}}
end
def add_input(_reactor, _name, transform),
do: {:error, argument_error(:transform, "Invalid transform function", transform)}
end

279
lib/reactor/builder/step.ex Normal file
View file

@ -0,0 +1,279 @@
defmodule Reactor.Builder.Step do
@moduledoc """
Handle building and adding steps to Reactors for the builder.
You should not use this module directly, but instead use
`Reactor.Builder.new_step/4` and `Reactor.Builder.add_step/5`.
"""
alias Reactor.{Argument, Builder, Step, Template}
import Reactor.Argument, only: :macros
import Reactor.Utils
import Reactor.Builder.Argument
@doc """
Build and add a new step to a Reactor.
"""
@spec add_step(
Reactor.t(),
any,
Builder.impl(),
[Builder.step_argument()],
Builder.step_options()
) :: {:ok, Reactor.t()} | {:error, any}
def add_step(reactor, name, impl, arguments, options) do
with {:ok, arguments} <- assert_all_are_arguments(arguments),
{:ok, arguments} <- maybe_rewrite_input_arguments(reactor, arguments),
:ok <- assert_is_step_impl(impl),
{:ok, {arguments, argument_transform_steps}} <-
build_argument_transform_steps(arguments, name),
{:ok, arguments, step_transform_step} <-
maybe_build_step_transform_all_step(arguments, name, options[:transform]),
{:ok, async} <- validate_async_option(options),
{:ok, context} <- validate_context_option(options),
{:ok, max_retries} <- validate_max_retries_option(options) do
context =
if step_transform_step do
deep_merge(context, %{private: %{replace_arguments: :value}})
else
context
end
steps =
[
%Step{
arguments: arguments,
async?: async,
context: context,
impl: impl,
name: name,
max_retries: max_retries,
ref: make_ref()
}
]
|> Enum.concat(argument_transform_steps)
|> maybe_append(step_transform_step)
|> Enum.concat(reactor.steps)
{:ok, %{reactor | steps: steps}}
end
end
@doc """
Dynamically build a new step for later use.
You're most likely to use this when dynamically returning new steps from an
existing step.
"""
@spec new_step(any, Builder.impl(), [Builder.step_argument()], Builder.step_options()) ::
{:ok, Step.t()} | {:error, any}
def new_step(name, impl, arguments, options) do
with {:ok, arguments} <- assert_all_are_arguments(arguments),
:ok <- assert_no_argument_transforms(arguments),
:ok <- assert_is_step_impl(impl),
{:ok, async} <- validate_async_option(options),
{:ok, context} <- validate_context_option(options),
{:ok, max_retries} <- validate_max_retries_option(options),
:ok <- validate_no_transform_option(options) do
step = %Step{
arguments: arguments,
async?: async,
context: context,
impl: impl,
name: name,
max_retries: max_retries,
ref: make_ref()
}
{:ok, step}
end
end
defp validate_async_option(options) do
options
|> Keyword.get(:async?, true)
|> case do
value when is_boolean(value) ->
{:ok, value}
value ->
{:error, argument_error(:options, "Invalid value for the `async?` option.", value)}
end
end
defp validate_context_option(options) do
options
|> Keyword.get(:context, %{})
|> case do
value when is_map(value) ->
{:ok, value}
value ->
{:error,
argument_error(:options, "Invalid value for the `context` option: must be a map.", value)}
end
end
defp validate_max_retries_option(options) do
options
|> Keyword.get(:max_retries, 100)
|> case do
:infinity ->
{:ok, :infinity}
value when is_integer(value) and value >= 0 ->
{:ok, value}
value ->
{:error,
argument_error(
:options,
"Invalid value for the `max_retries` option: must be a non-negative integer or `:infinity`.",
value
)}
end
end
defp validate_no_transform_option(options) do
if Keyword.has_key?(options, :transform) do
{:error,
argument_error(:options, "Adding transforms to dynamic steps is not supported.", options)}
else
:ok
end
end
defp maybe_rewrite_input_arguments(reactor, arguments) do
existing_step_names = MapSet.new(reactor.steps, & &1.name)
map_while_ok(arguments, fn
argument when is_from_input(argument) ->
potential_rewrite_step_name = {:__reactor__, :transform, :input, argument.source.name}
if MapSet.member?(existing_step_names, potential_rewrite_step_name) do
{:ok, Argument.from_result(argument.name, potential_rewrite_step_name)}
else
{:ok, argument}
end
argument when is_from_result(argument) or is_from_value(argument) ->
{:ok, argument}
end)
end
defp assert_is_step_impl({impl, opts}) when is_list(opts), do: assert_is_step_impl(impl)
defp assert_is_step_impl(impl) when is_atom(impl) do
if Spark.implements_behaviour?(impl, Step) do
:ok
else
{:error,
argument_error(:impl, "Module does not implement the `Reactor.Step` behaviour.", impl)}
end
end
defp build_argument_transform_steps(arguments, step_name) do
arguments
|> reduce_while_ok({[], []}, fn
argument, {arguments, steps} when is_from_input(argument) and has_transform(argument) ->
transform_step_name = {:__reactor__, :transform, argument.name, step_name}
step =
build_transform_step(
argument.source,
transform_step_name,
argument.transform
)
argument = %Argument{
name: argument.name,
source: %Template.Result{name: transform_step_name}
}
{:ok, {[argument | arguments], [%{step | transform: nil} | steps]}}
argument, {arguments, steps} when is_from_result(argument) and has_transform(argument) ->
transform_step_name = {:__reactor__, :transform, argument.name, step_name}
step =
build_transform_step(
argument.source,
transform_step_name,
argument.transform
)
argument = %Argument{
name: argument.name,
source: %Template.Result{name: transform_step_name}
}
{:ok, {[argument | arguments], [%{step | transform: nil} | steps]}}
argument, {arguments, steps} ->
{:ok, {[argument | arguments], steps}}
end)
end
defp maybe_build_step_transform_all_step(arguments, _name, nil), do: {:ok, arguments, nil}
defp maybe_build_step_transform_all_step(arguments, name, transform)
when is_function(transform, 1),
do:
maybe_build_step_transform_all_step(arguments, name, {Step.TransformAll, fun: transform})
defp maybe_build_step_transform_all_step(arguments, name, transform) do
step = %Step{
arguments: arguments,
async?: true,
impl: transform,
name: {:__reactor__, :transform, name},
max_retries: 0,
ref: make_ref()
}
{:ok, [Argument.from_result(:value, step.name)], step}
end
defp build_transform_step(argument_source, step_name, transform) when is_function(transform, 1),
do: build_transform_step(argument_source, step_name, {Step.Transform, fun: transform})
defp build_transform_step(argument_source, step_name, transform)
when tuple_size(transform) == 2 and is_atom(elem(transform, 0)) and
is_list(elem(transform, 1)) do
%Step{
arguments: [
%Argument{
name: :value,
source: argument_source
}
],
async?: true,
impl: transform,
name: step_name,
max_retries: 0,
ref: make_ref()
}
end
defp assert_no_argument_transforms(arguments) do
arguments
|> Enum.reject(&is_nil(&1.transform))
|> case do
[] ->
:ok
[argument] ->
{:error,
argument_error(
:arguments,
"Argument `#{argument.name}` has a transform attached.",
argument
)}
arguments ->
sentence = sentence(arguments, &"`#{&1.name}`", ", ", " and ")
{:error,
argument_error(:arguments, "Arguments #{sentence} have transforms attached.", arguments)}
end
end
end

View file

@ -72,6 +72,9 @@ defmodule Reactor.Dsl do
argument :user_id, result(:create_user) do
transform & &1.id
end
""",
"""
argument :three, value(3)
"""
],
args: [:name, {:optional, :source}],
@ -87,7 +90,9 @@ defmodule Reactor.Dsl do
"""
],
source: [
type: {:or, [{:struct, Template.Input}, {:struct, Template.Result}]},
type:
{:or,
[{:struct, Template.Input}, {:struct, Template.Result}, {:struct, Template.Value}]},
required: true,
doc: """
What to use as the source of the argument.
@ -191,6 +196,38 @@ defmodule Reactor.Dsl do
]
}
@compose %Entity{
name: :compose,
describe: """
Compose another Reactor into this one.
Allows place another Reactor into this one as if it were a single step.
""",
args: [:name, :reactor],
target: Dsl.Compose,
no_depend_modules: [:reactor],
entities: [arguments: [@argument]],
schema: [
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
Allows the result of the composed reactor to be depended upon by steps
in this reactor.
"""
],
reactor: [
type: {:or, [{:struct, Reactor}, {:spark, Reactor.Dsl}]},
required: true,
doc: """
The reactor module or struct to compose upon.
"""
]
]
}
@reactor %Section{
name: :reactor,
describe: "The top-level reactor DSL",
@ -203,11 +240,12 @@ defmodule Reactor.Dsl do
"""
]
],
entities: [@input, @step],
entities: [@input, @step, @compose],
top_level?: true
}
use Extension,
sections: [@reactor],
transformers: [Dsl.Transformer]
transformers: [Dsl.Transformer],
verifiers: [Dsl.PlanableVerifier]
end

View file

@ -0,0 +1,14 @@
defmodule Reactor.Dsl.Compose do
@moduledoc """
The `compose` DSL entity struct.
See `Reactor.Dsl`.
"""
defstruct arguments: [], name: nil, reactor: nil
@type t :: %__MODULE__{
arguments: [Reactor.Argument.t()],
name: any,
reactor: module | Reactor.t()
}
end

View file

@ -0,0 +1,32 @@
defmodule Reactor.Dsl.PlanableVerifier do
@moduledoc """
Verifies that the Reactor is not cyclic.
"""
use Spark.Dsl.Verifier
alias Spark.{Dsl, Dsl.Verifier, Error.DslError}
alias Reactor.{Info, Planner}
@doc """
Ensure that a DSL-based Reactor is not cyclic.
"""
@impl true
@spec verify(Dsl.t()) :: :ok | {:error, any}
def verify(dsl_state) do
with {:ok, reactor} <- Info.to_struct(dsl_state),
{:ok, _} <- Planner.plan(reactor) do
:ok
else
{:error, reason} when is_binary(reason) ->
{:error,
DslError.exception(
module: Verifier.get_persisted(dsl_state, :module),
path: [:reactor, :step],
message: reason
)}
{:error, reason} when is_exception(reason) ->
{:error, reason}
end
end
end

View file

@ -1,18 +1,38 @@
defmodule Reactor.Dsl.Transformer do
@moduledoc false
alias Reactor.Step
alias Reactor.{Dsl.Compose, Step}
alias Spark.{Dsl, Dsl.Transformer, Error.DslError}
use Transformer
@doc false
@spec transform(Dsl.t()) :: {:ok, Dsl.t()} | {:error, DslError.t()}
def transform(dsl_state) do
step_names =
dsl_state
|> Transformer.get_entities([:reactor])
|> Enum.filter(&is_struct(&1, Step))
|> Enum.map(& &1.name)
with {:ok, step_names} <- step_names(dsl_state),
{:ok, dsl_state} <- maybe_set_return(dsl_state, step_names) do
validate_return(dsl_state, step_names)
end
end
defp step_names(dsl_state) do
dsl_state
|> Transformer.get_entities([:reactor])
|> Enum.filter(&(is_struct(&1, Step) || is_struct(&1, Compose)))
|> Enum.map(& &1.name)
|> case do
[] ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor],
message: "Reactor contains no steps"
)}
step_names ->
{:ok, step_names}
end
end
defp maybe_set_return(dsl_state, step_names) do
case Transformer.get_option(dsl_state, [:reactor], :return) do
nil ->
dsl_state =
@ -21,18 +41,23 @@ defmodule Reactor.Dsl.Transformer do
{:ok, dsl_state}
return_name ->
if return_name in step_names do
{:ok, dsl_state}
else
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor],
message:
"Return value `#{inspect(return_name)}` does not correspond with an existing step"
)}
end
_ ->
{:ok, dsl_state}
end
end
defp validate_return(dsl_state, step_names) do
name = Transformer.get_option(dsl_state, [:reactor], :return)
if name in step_names do
{:ok, dsl_state}
else
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor],
message: "Return value `#{inspect(name)}` does not correspond with an existing step"
)}
end
end
end

View file

@ -0,0 +1,52 @@
defmodule Reactor.Error.ComposeError do
defexception [:outer_reactor, :inner_reactor, :message, :arguments]
import Reactor.Utils
@impl true
def exception(attrs), do: struct(__MODULE__, attrs)
@impl true
def message(error) do
[
"""
# Unable to compose Reactors
#{error.message}
"""
]
|> maybe_append_result(fn ->
if error.arguments do
"""
## Arguments
```
#{inspect(error.arguments)}
```
"""
end
end)
|> maybe_append_result(fn ->
if error.inner_reactor do
"""
## Inner Reactor
```
#{inspect(error.inner_reactor)}
```
"""
end
end)
|> maybe_append_result(fn ->
if error.outer_reactor do
"""
## Outer Reactor
```
#{inspect(error.outer_reactor)}
```
"""
end
end)
|> Enum.join("\n")
end
end

View file

@ -0,0 +1,52 @@
defmodule Reactor.Error.PlanError do
defexception [:reactor, :graph, :step, :message]
import Reactor.Utils
@impl true
def exception(attrs), do: struct(__MODULE__, attrs)
@impl true
def message(error) do
[
"""
# Unable to plan Reactor
#{error.message}
"""
]
|> maybe_append_result(fn ->
if error.reactor do
"""
## Reactor
```
#{inspect(error.reactor)}
```
"""
end
end)
|> maybe_append_result(fn ->
if error.step do
"""
## Step
```
#{inspect(error.step)}
```
"""
end
end)
|> maybe_append_result(fn ->
if error.graph do
"""
## Graph
```
#{inspect(error.graph)}
```
"""
end
end)
|> Enum.join("\n")
end
end

View file

@ -0,0 +1,9 @@
defmodule Reactor.Error.TransformError do
defexception input: nil, output: nil, message: nil
@impl true
def exception(attrs), do: struct(__MODULE__, attrs)
@impl true
def message(error), do: error.message
end

View file

@ -48,6 +48,9 @@ defmodule Reactor.Executor do
{:ok, any} | {:halted, Reactor.t()} | {:error, any}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
def run(reactor, _inputs, _context, _options) when is_nil(reactor.return),
do: {:error, ArgumentError.exception("`reactor` has no return value")}
def run(reactor, inputs, context, options) when reactor.state in ~w[pending halted]a do
case Executor.Init.init(reactor, inputs, context, options) do
{:ok, reactor, state} -> execute(reactor, state)

View file

@ -5,6 +5,7 @@ defmodule Reactor.Executor.Init do
alias Reactor.Executor
import Reactor, only: :macros
import Reactor.Utils
@doc false
@spec init(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) ::
@ -19,15 +20,10 @@ defmodule Reactor.Executor.Init do
{:ok, options} <- into_map(options) do
state = Executor.State.init(options)
private =
context
|> Map.get(:private, %{})
|> Map.put(:inputs, inputs)
context =
reactor.context
|> Map.merge(context)
|> Map.put(:private, private)
|> deep_merge(context)
|> deep_merge(%{private: %{inputs: inputs}})
{:ok, %{reactor | context: context}, state}
end
@ -53,11 +49,11 @@ defmodule Reactor.Executor.Init do
missing_inputs =
valid_input_names
|> MapSet.difference(provided_input_names)
|> Enum.map_join(", ", &"`#{inspect(&1)}`")
|> sentence(&"`#{inspect(&1)}`", ", ", " and ")
{:error,
ArgumentError.exception(
message: "Reactor is missing the following inputs: #{missing_inputs}"
message: "Reactor is missing the following inputs; #{missing_inputs}"
)}
end
end

View file

@ -2,8 +2,9 @@ defmodule Reactor.Executor.StepRunner do
@moduledoc """
Run an individual step, including compensation if possible.
"""
alias Reactor.{Step, Template}
alias Reactor.Step
import Reactor.Utils
import Reactor.Argument, only: :macros
require Logger
@max_undo_count 5
@ -88,25 +89,40 @@ defmodule Reactor.Executor.StepRunner do
end
defp get_step_arguments(reactor, step) do
Enum.reduce_while(step.arguments, {:ok, %{}}, fn argument, {:ok, arguments} ->
with %Template.Result{name: dependency_name} <- argument.source,
{:ok, value} <- Map.fetch(reactor.intermediate_results, dependency_name) do
{:cont, {:ok, Map.put(arguments, argument.name, value)}}
else
%Template.Input{} ->
{:halt,
{:error,
"Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` is invalid"}}
reduce_while_ok(step.arguments, %{}, fn
argument, arguments when is_from_input(argument) ->
case Map.fetch(reactor.context.private.inputs, argument.source.name) do
{:ok, value} ->
{:ok, Map.put(arguments, argument.name, value)}
:error ->
{:halt,
{:error,
"Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` is missing"}}
end
:error ->
{:error,
"Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` relies on missing input `#{argument.source.name}`"}
end
argument, arguments when is_from_result(argument) ->
case Map.fetch(reactor.intermediate_results, argument.source.name) do
{:ok, value} ->
{:ok, Map.put(arguments, argument.name, value)}
:error ->
{:error,
"Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` is missing"}
end
argument, arguments when is_from_value(argument) ->
{:ok, Map.put(arguments, argument.name, argument.source.value)}
end)
end
defp build_context(reactor, step), do: {:ok, deep_merge(step.context, reactor.context)}
defp build_context(reactor, step) do
context =
step.context
|> deep_merge(reactor.context)
|> Map.put(:current_step, step)
{:ok, context}
end
defp maybe_replace_arguments(arguments, context) when is_nil(context.private.replace_arguments),
do: {:ok, arguments}

View file

@ -4,54 +4,51 @@ defmodule Reactor.Info do
"""
use Spark.InfoGenerator, sections: [:reactor], extension: Reactor.Dsl
alias Reactor.{Builder, Input, Step}
alias Reactor.{Builder, Dsl.Compose, Input, Step}
import Reactor.Utils
@doc """
Convert a reactor DSL module into a reactor struct.
"""
@spec to_struct(module) :: {:ok, Reactor.t()} | {:error, any}
def to_struct(module) when is_atom(module) do
with :ok <- assert_is_reactor_module(module),
{:ok, reactor} <- entities_to_struct(module) do
@spec to_struct(module | Reactor.t() | Spark.Dsl.t()) :: {:ok, Reactor.t()} | {:error, any}
def to_struct(reactor) when is_struct(reactor, Reactor), do: {:ok, reactor}
def to_struct(module) do
with {:ok, reactor} <- entities_to_struct(module) do
maybe_set_return(module, reactor)
end
end
@doc """
Raising version of `to_struct/1`.
"""
@spec to_struct!(module | Reactor.t() | Spark.Dsl.t()) :: Reactor.t() | no_return
def to_struct!(reactor) do
case to_struct(reactor) do
{:ok, reactor} -> reactor
{:error, reason} -> raise reason
end
end
defp entities_to_struct(module) do
module
|> reactor()
|> Enum.reduce_while({:ok, Builder.new()}, fn
input, {:ok, reactor} when is_struct(input, Input) ->
case Builder.add_input(reactor, input.name, input.transform) do
{:ok, reactor} -> {:cont, {:ok, reactor}}
{:error, reason} -> {:halt, {:error, reason}}
end
|> reduce_while_ok(Builder.new(module), fn
input, reactor when is_struct(input, Input) ->
Builder.add_input(reactor, input.name, input.transform)
step, {:ok, reactor} when is_struct(step, Step) ->
case Builder.add_step(reactor, step.name, step.impl, step.arguments,
async?: step.async?,
max_retries: step.max_retries,
transform: step.transform
) do
{:ok, reactor} -> {:cont, {:ok, reactor}}
{:error, reason} -> {:halt, {:error, reason}}
end
step, reactor when is_struct(step, Step) ->
Builder.add_step(reactor, step.name, step.impl, step.arguments,
async?: step.async?,
max_retries: step.max_retries,
transform: step.transform
)
compose, reactor when is_struct(compose, Compose) ->
Builder.compose(reactor, compose.name, compose.reactor, compose.arguments)
end)
end
defp assert_is_reactor_module(reactor) when is_atom(reactor) do
Code.ensure_loaded!(reactor)
if reactor.spark_is() == Reactor do
:ok
else
{:error, "Module `#{inspect(reactor)}` is not a valid Reactor module"}
end
rescue
_error in [ArgumentError, UndefinedFunctionError] ->
{:error, "Module `#{inspect(reactor)}` is not a valid Reactor module"}
end
defp maybe_set_return(module, reactor) do
case reactor_return(module) do
{:ok, value} -> {:ok, %{reactor | return: value}}

View file

@ -6,9 +6,10 @@ defmodule Reactor.Planner do
between them representing their dependencies (arguments).
"""
alias Reactor.Step
alias Reactor.{Error.PlanError, Step}
import Reactor, only: :macros
import Reactor.Argument, only: :macros
import Reactor.Utils
@doc """
Build an execution plan for a Reactor.
@ -17,18 +18,29 @@ defmodule Reactor.Planner do
"""
@spec plan(Reactor.t()) :: {:ok, Reactor.t()} | {:error, any}
def plan(reactor) when not is_reactor(reactor),
do: {:error, ArgumentError.exception("`reactor`: not a Reactor")}
do: {:error, argument_error(:reactor, "not a Reactor", reactor)}
def plan(reactor) when is_nil(reactor.plan),
do: plan(%{reactor | plan: empty_graph()})
def plan(reactor) do
with {:ok, graph} <- reduce_steps_into_graph(reactor.plan, reactor.steps),
:ok <- assert_graph_not_cyclic(graph) do
:ok <- assert_graph_not_cyclic(reactor, graph) do
{:ok, %{reactor | steps: [], plan: graph}}
end
end
@doc """
Raising version of `plan/1`.
"""
@spec plan!(Reactor.t()) :: Reactor.t() | no_return
def plan!(reactor) do
case plan(reactor) do
{:ok, reactor} -> reactor
{:error, reason} -> raise reason
end
end
defp empty_graph, do: Graph.new(type: :directed, vertex_identifier: & &1.ref)
defp reduce_steps_into_graph(graph, steps) do
@ -38,60 +50,64 @@ defmodule Reactor.Planner do
|> Enum.concat(steps)
|> Map.new(&{&1.name, &1})
Enum.reduce_while(steps, {:ok, graph}, fn
step, {:ok, graph} when is_struct(step, Step) ->
steps
|> reduce_while_ok(graph, fn
step, graph when is_struct(step, Step) ->
graph
|> Graph.add_vertex(step, step.name)
|> reduce_arguments_into_graph(step, steps_by_name)
|> case do
{:ok, graph} -> {:cont, {:ok, graph}}
{:error, reason} -> {:halt, {:error, reason}}
end
not_step, _ ->
{:halt, {:error, "Value `#{inspect(not_step)}` is not a `Reactor.Step` struct."}}
{:error,
PlanError.exception(
graph: graph,
step: not_step,
message: "Value is not a `Reactor.Step` struct."
)}
end)
end
defp reduce_arguments_into_graph(graph, current_step, steps_by_name) do
Enum.reduce_while(current_step.arguments, {:ok, graph}, fn
argument, {:ok, graph} when is_argument(argument) ->
dependency_name =
case argument do
argument when is_from_result(argument) -> argument.source.name
argument when is_from_input(argument) -> {:input, argument.source.name}
end
reduce_while_ok(current_step.arguments, graph, fn
argument, graph when is_argument(argument) and is_from_result(argument) ->
dependency_name = argument.source.name
case Map.fetch(steps_by_name, dependency_name) do
{:ok, dependency} when dependency.name == current_step.name ->
{:cont, {:ok, graph}}
{:ok, graph}
{:ok, dependency} ->
{:cont,
{:ok,
Graph.add_edge(
graph,
dependency,
current_step,
label: {:argument, argument.name, :for, current_step.name}
)}}
{:ok,
Graph.add_edge(graph, dependency, current_step,
label: {:argument, argument.name, :for, current_step.name}
)}
:error ->
{:halt,
{:error,
"Step `#{inspect(current_step.name)}` depends on the result of a step named `#{inspect(argument.source.name)}` which cannot be found"}}
{:error,
PlanError.exception(
graph: graph,
step: current_step,
message:
"Step `#{inspect(current_step.name)}` depends on the result of a step named `#{inspect(argument.source.name)}` which cannot be found"
)}
end
_argument, _graph ->
{:halt, {:error, ArgumentError.exception("`argument` is not an argument.")}}
argument, graph
when is_argument(argument) and (is_from_input(argument) or is_from_value(argument)) ->
{:ok, graph}
end)
end
defp assert_graph_not_cyclic(graph) do
defp assert_graph_not_cyclic(reactor, graph) do
if Graph.is_acyclic?(graph) do
:ok
else
{:error, "Reactor contains cyclic dependencies."}
{:error,
PlanError.exception(
reactor: reactor,
graph: graph,
message: "Reactor contains cyclic dependencies."
)}
end
end
end

View file

@ -27,6 +27,8 @@ defmodule Reactor.Step do
transform: nil | (any -> any) | {module, keyword} | mfa
}
@type step :: module
@typedoc """
Optional capabilities which may be implemented by the step module.
@ -94,9 +96,6 @@ defmodule Reactor.Step do
This provides you the opportunity to handle the error in a number of ways and
direct the reactor as to what to do next.
This callback is only called if `c:can?/1` returns `true` for the
`:compensate` capability.
## Arguments
- `reason` - the error reason returned from `c:run/3`.
- `arguments` - the arguments passed to the step.
@ -127,9 +126,6 @@ defmodule Reactor.Step do
This callback is called when the reactor encounters an unhandled error later
in it's execution run and must undo the work previously done.
This callback is only called if `c:can?/1` returns `true` for the `:undo`
capability.
## Arguments
- `value` - the return value of the previously successful call to `c:run/3`.
@ -159,13 +155,11 @@ defmodule Reactor.Step do
Find out of a step has a capability.
"""
@spec can?(module | Step.t(), capability()) :: boolean
def can?(%Step{impl: {module, _opts}}, capability)
when is_atom(module) and capability in ~w[undo compensate]a,
do: function_exported?(module, capability, 4)
def can?(%Step{impl: {module, _opts}}, capability) when is_atom(module),
do: can?(module, capability)
def can?(%Step{impl: module}, capability)
when is_atom(module) and capability in ~w[undo compensate]a,
do: function_exported?(module, capability, 4)
def can?(%Step{impl: module}, capability) when is_atom(module),
do: can?(module, capability)
def can?(module, capability) when is_atom(module) and capability in ~w[undo compensate]a,
do: function_exported?(module, capability, 4)

View file

@ -13,6 +13,9 @@ defmodule Reactor.Step.AnonFn do
@spec run(Reactor.inputs(), Reactor.context(), keyword) :: {:ok | :error, any}
def run(arguments, context, options) do
case Keyword.pop(options, :fun) do
{fun, _opts} when is_function(fun, 1) ->
fun.(arguments)
{fun, _opts} when is_function(fun, 2) ->
fun.(arguments, context)

165
lib/reactor/step/compose.ex Normal file
View file

@ -0,0 +1,165 @@
defmodule Reactor.Step.Compose do
@moduledoc """
A built-in step which can embed one reactor inside another.
This is different to the `Builder.compose` and DSL `compose` methods. Those
methods build a new reactor by combining the steps of the two input reactors,
whereas this step expands the provided reactor at runtime and dynamically
inserts it's steps into the running reactor.
If emitting the reactor's steps into the current reactor would be recursive,
then the reactor is directly executed within the step using `Reactor.run/4`.
"""
use Reactor.Step
alias Reactor.{Argument, Builder, Error.ComposeError, Info, Step}
import Reactor, only: :macros
import Reactor.Argument, only: :macros
import Reactor.Utils
@doc false
@impl true
def run(arguments, context, options) do
reactor = Keyword.fetch!(options, :reactor)
reactor_id = get_reactor_id(reactor)
context
|> get_composed_reactors()
|> MapSet.member?(reactor_id)
|> if do
handle_recursive_reactor(reactor, arguments, context)
else
handle_non_recursive_reactor(reactor, arguments, context)
end
end
defp handle_recursive_reactor(reactor, arguments, context),
do: Reactor.run(reactor, arguments, context, [])
defp handle_non_recursive_reactor(reactor, arguments, context) when is_atom(reactor) do
with {:ok, reactor} <- Info.to_struct(reactor) do
handle_non_recursive_reactor(reactor, arguments, context)
end
end
defp handle_non_recursive_reactor(reactor, arguments, context) do
current_step = Map.fetch!(context, :current_step)
with :ok <- validate_arguments_match_inputs(arguments, reactor),
:ok <- validate_reactor_has_return(reactor),
{:ok, inner_steps} <- rewrite_steps(reactor, current_step.name, arguments),
{:ok, recursion_step} <- create_recursion_step(reactor, current_step.name) do
steps =
inner_steps
|> Enum.concat([recursion_step])
{:ok, nil, steps}
end
end
defp get_reactor_id(reactor) when is_atom(reactor), do: reactor
defp get_reactor_id(reactor) when is_reactor(reactor), do: reactor.id
defp get_composed_reactors(context) when not is_nil(context.private.composed_reactors),
do: context.private.composed_reactors
defp get_composed_reactors(_context), do: MapSet.new()
defp validate_reactor_has_return(reactor) when is_nil(reactor.return),
do:
{:error,
ComposeError.exception(
inner_reactor: reactor,
message: "The inner Reactor must have an explicit return value."
)}
defp validate_reactor_has_return(reactor) do
if Enum.any?(reactor.steps, &(&1.name == reactor.return)) do
:ok
else
{:error,
ComposeError.exception(
inner_reactor: reactor,
message:
"The inner Reactor return value does not correspond with an existing Reactor step."
)}
end
end
defp create_recursion_step(reactor, name) do
Builder.new_step(
name,
{Step.AnonFn, fun: fn args, _, _ -> {:ok, args.value} end},
[value: {:result, {__MODULE__, name, reactor.return}}],
max_retries: 0
)
end
defp validate_arguments_match_inputs(arguments, reactor) do
argument_names = arguments |> Map.keys() |> MapSet.new()
input_names = MapSet.new(reactor.inputs)
input_names
|> MapSet.difference(argument_names)
|> Enum.to_list()
|> case do
[] ->
:ok
[input] ->
{:error,
ComposeError.exception(
inner_reactor: reactor,
arguments: arguments,
message: "Missing argument for input `#{input}`"
)}
inputs ->
inputs = sentence(inputs, &"`#{&1}`", ", ", " and ")
{:error,
ComposeError.exception(
inner_reactor: reactor,
arguments: arguments,
message: "Missing arguments for inputs #{inputs}"
)}
end
end
defp rewrite_steps(reactor, name, inputs) when not is_nil(reactor.plan) do
steps =
reactor.plan
|> Graph.vertices()
|> Enum.concat(reactor.steps)
rewrite_steps(%{reactor | steps: steps, plan: nil}, name, inputs)
end
defp rewrite_steps(reactor, name, inputs) do
reactor.steps
|> map_while_ok(&rewrite_step(&1, name, inputs))
end
defp rewrite_step(step, name, inputs) do
with {:ok, arguments} <- map_while_ok(step.arguments, &rewrite_argument(&1, name, inputs)) do
{:ok,
%{
step
| arguments: arguments,
name: {__MODULE__, name, step.name},
impl: {Step.ComposeWrapper, original: step.impl, prefix: [__MODULE__, name]}
}}
end
end
defp rewrite_argument(argument, _name, inputs) when is_from_input(argument) do
value = Map.fetch!(inputs, argument.source.name)
{:ok, Argument.from_value(argument.name, value)}
end
defp rewrite_argument(argument, name, _inputs) when is_from_result(argument),
do: {:ok, Argument.from_result(argument.name, {__MODULE__, name, argument.source.name})}
defp rewrite_argument(argument, _name, _inputs) when is_from_value(argument),
do: {:ok, argument}
end

View file

@ -0,0 +1,143 @@
defmodule Reactor.Step.ComposeWrapper do
@moduledoc """
When doing run-time composition of Reactors we need to dynamically rewrite any
dynamically emitted steps to have the correct namespace.
Yes, this gets hairy, fast.
This is dynamically injected into steps by `Reactor.Step.Compose` - you
probably don't want to use this unless you're sure what you're doing.
## Options
* `original` - the original value of the Step's `impl` key.
* `prefix` - a list of values to be placed in the `name` before the original value.
"""
use Reactor.Step
alias Reactor.Argument
import Reactor.Argument, only: :macros
import Reactor.Utils
@doc false
@impl true
def run(arguments, context, options) do
with {:ok, impl} <- validate_original_option(options, context.current_step),
{:ok, prefix} <- validate_prefix_option(options, context.current_step) do
case do_run(impl, arguments, context) do
{:ok, value} -> {:ok, value}
{:ok, value, steps} -> {:ok, value, rewrite_steps(steps, prefix)}
{stop, reason} when stop in ~w[halt error]a -> {stop, reason}
end
end
end
@doc false
@impl true
def compensate(reason, arguments, context, options) do
case get_original_option(options, context.current_step) do
{:ok, {impl, opts}} -> impl.compensate(reason, arguments, context, opts)
{:ok, impl} -> impl.compensate(reason, arguments, context, [])
{:error, _} -> :ok
end
rescue
UndefinedFunctionError -> :ok
end
@doc false
@impl true
def undo(value, arguments, context, options) do
case get_original_option(options, context.current_step) do
{:ok, {impl, opts}} -> impl.undo(value, arguments, context, opts)
{:ok, impl} -> impl.undo(value, arguments, context, [])
{:error, reason} -> {:error, reason}
end
rescue
UndefinedFunctionError -> :ok
end
defp validate_original_option(options, current_step) do
with {:ok, original} <- get_original_option(options, current_step),
{module, opts} <- get_module_and_options(original) do
if Spark.implements_behaviour?(module, Reactor.Step) do
{:ok, {module, opts}}
else
{:error,
argument_error(
:options,
"Step `#{current_step.name}` module `#{inspect(module)}` does not implement the `Reactor.Step` behaviour.",
opts
)}
end
end
end
defp get_original_option(options, current_step) do
with :error <- Keyword.fetch(options, :original) do
{:error,
argument_error(
:options,
"Step `#{current_step.name}` is missing the `original` option.",
options
)}
end
end
defp validate_prefix_option(options, current_step) do
case Keyword.fetch(options, :prefix) do
{:ok, [_ | _] = prefix} ->
{:ok, prefix}
:error ->
{:error,
argument_error(
:options,
"Step `#{current_step.name}` has missing `prefix` option.",
options
)}
_ ->
{:error,
argument_error(
:options,
"Step `#{current_step.name}` has invalid `prefix` option.",
options
)}
end
end
defp get_module_and_options(impl) when is_atom(impl), do: {impl, []}
defp get_module_and_options({impl, options}) when is_atom(impl) and is_list(options),
do: {impl, options}
defp do_run({module, options}, arguments, context) when is_atom(module) and is_list(options),
do: module.run(arguments, context, options)
defp rewrite_steps(steps, prefix) do
steps
|> Enum.map(fn step ->
name =
prefix
|> Enum.concat([step.name])
|> List.to_tuple()
arguments = Enum.map(step.arguments, &rewrite_argument(&1, prefix))
%{step | name: name, arguments: arguments}
end)
end
defp rewrite_argument(argument, prefix) when is_from_result(argument) do
source =
prefix
|> Enum.concat([argument.source.name])
|> List.to_tuple()
Argument.from_result(argument.name, source)
end
defp rewrite_argument(argument, _prefix)
when is_from_input(argument) or is_from_value(argument),
do: argument
end

View file

@ -1,28 +0,0 @@
defmodule Reactor.Step.Input do
@moduledoc """
A built-in step which emits a reactor input.
"""
use Reactor.Step
@doc false
@impl true
@spec run(Reactor.inputs(), Reactor.context(), keyword) :: {:ok | :error, any}
def run(_arguments, context, options) do
case Keyword.fetch(options, :name) do
{:ok, name} ->
with {:ok, private} <- Map.fetch(context, :private),
{:ok, inputs} <- Map.fetch(private, :inputs),
{:ok, value} <- Map.fetch(inputs, name) do
{:ok, value}
else
:error ->
{:error,
ArgumentError.exception("Reactor is missing an input named `#{inspect(name)}`")}
end
:error ->
{:error, ArgumentError.exception("Missing `:name` option in `Input` step")}
end
end
end

View file

@ -29,9 +29,6 @@ defmodule Reactor.Step.Transform do
{{m, f, a}, _opts} when is_atom(m) and is_atom(f) and is_list(a) ->
{:ok, apply(m, f, [value | a])}
{nil, opts} ->
raise "Invalid options given to `run/3` callback: `#{inspect(opts)}`."
end
rescue
error -> {:error, error}

View file

@ -7,7 +7,7 @@ defmodule Reactor.Step.TransformAll do
"""
use Reactor.Step
alias Reactor.Step.Transform
alias Reactor.{Error.TransformError, Step.Transform}
@doc false
@impl true
@ -17,8 +17,13 @@ defmodule Reactor.Step.TransformAll do
{:ok, result} when is_map(result) ->
{:ok, result}
{:ok, _other} ->
{:error, "Step transformers must return a map to use as replacement arguments."}
{:ok, result} ->
{:error,
TransformError.exception(
input: arguments,
output: result,
message: "Step transformers must return a map to use as replacement arguments."
)}
{:error, reason} ->
{:error, reason}

View file

@ -1,9 +0,0 @@
defmodule Reactor.Template do
@moduledoc false
alias Reactor.Template
@typedoc """
An input or result template.
"""
@type t :: Template.Input.t() | Template.Result.t()
end

View file

@ -0,0 +1,9 @@
defmodule Reactor.Template.Value do
@moduledoc """
A statically `value` template.
"""
defstruct value: nil
@type t :: %__MODULE__{value: any}
end

View file

@ -21,4 +21,113 @@ defmodule Reactor.Utils do
@spec maybe_append(Enumerable.t(), any) :: Enumerable.t()
def maybe_append(collection, nil), do: collection
def maybe_append(collection, value), do: Enum.concat(collection, [value])
@doc """
Append a non-nil result of the callback function to the enumerable.
"""
@spec maybe_append_result(Enumerable.t(), (() -> any)) :: Enumerable.t()
def maybe_append_result(collection, callback) do
case callback.() do
nil -> collection
value -> Enum.concat(collection, [value])
end
end
@doc """
A joiner that replaces the last join value with a different one.
"""
@spec sentence(Enumerable.t(), (any -> binary), binary, binary) :: binary
def sentence(enumerable, mapper \\ &"`#{&1}`", joiner \\ ", ", last_joiner \\ " or ")
def sentence(enumerable, mapper, joiner, last_joiner),
do:
enumerable
|> Enum.to_list()
|> do_sentence([], mapper, joiner, last_joiner)
defp do_sentence([], [], _, _, _), do: []
defp do_sentence([value], [], mapper, _, _), do: [mapper.(value)]
defp do_sentence([value], zipped, mapper, _joiner, last_joiner) do
[mapper.(value), last_joiner | zipped]
|> Enum.reverse()
|> Enum.join()
end
defp do_sentence([head | tail], [], mapper, joiner, last_joiner),
do: do_sentence(tail, [mapper.(head)], mapper, joiner, last_joiner)
defp do_sentence([head | tail], zipped, mapper, joiner, last_joiner),
do: do_sentence(tail, [mapper.(head), joiner | zipped], mapper, joiner, last_joiner)
@doc """
Perform a map over an enumerable provided that the mapper function continues
to return ok tuples.
"""
@spec map_while_ok(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) ::
{:ok, Enumerable.t(output)} | {:error, any}
when input: any, output: any
def map_while_ok(inputs, mapper, preserve_order? \\ false)
def map_while_ok(inputs, mapper, false) when is_function(mapper, 1) do
reduce_while_ok(inputs, [], fn input, acc ->
case mapper.(input) do
{:ok, value} -> {:ok, [value | acc]}
{:error, reason} -> {:error, reason}
end
end)
end
def map_while_ok(inputs, mapper, true) do
case map_while_ok(inputs, mapper, false) do
{:ok, outputs} -> {:ok, Enum.reverse(outputs)}
{:error, reason} -> {:error, reason}
end
end
@doc """
Perform a reduction over an enumerable provided that the reduction function
returns an ok tuple.
"""
@spec reduce_while_ok(Enumerable.t(input), acc, (input, acc -> {:ok, acc} | {:error, any})) ::
{:ok, acc} | {:error, any}
when input: any, acc: any
def reduce_while_ok(inputs, default \\ [], reducer) when is_function(reducer, 2) do
Enum.reduce_while(inputs, {:ok, default}, fn input, {:ok, acc} ->
case reducer.(input, acc) do
{:ok, acc} -> {:cont, {:ok, acc}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
@type argument_error :: %ArgumentError{
__exception__: true,
message: binary
}
@doc """
A wrapper for defining an ArgumentError with a consistent error message format.
"""
@spec argument_error(String.Chars.t(), String.Chars.t(), any) :: argument_error
def argument_error(argument_name, reason, value) do
message =
case value do
nil ->
"`#{argument_name}` #{reason}"
value ->
"""
`#{argument_name}` #{reason}
## Value of `#{argument_name}`
```
#{inspect(value)}
```
"""
end
ArgumentError.exception(message: message)
end
end

View file

@ -15,4 +15,10 @@ defmodule Reactor.Argument.TemplatesTest do
assert %Template.Result{name: :marty} = result(:marty)
end
end
describe "value/1" do
test "it creates a value template" do
assert %Template.Value{value: :marty} = value(:marty)
end
end
end

View file

@ -63,4 +63,34 @@ defmodule Reactor.ArgumentTest do
} = Argument.from_result(:argument_name, :step_name, transform)
end
end
describe "from_value/2" do
test "when given no transformation it creates an argument" do
assert %Argument{
name: :argument_name,
source: %Template.Value{value: 32},
transform: nil
} = Argument.from_value(:argument_name, 32)
end
test "when given a function transformation it creates an argument" do
transform = &Atom.to_string/1
assert %Argument{
name: :argument_name,
source: %Template.Value{value: 32},
transform: ^transform
} = Argument.from_value(:argument_name, 32, transform)
end
test "when given an MFA transformation it creates an argument" do
transform = {Atom, :to_string, []}
assert %Argument{
name: :argument_name,
source: %Template.Value{value: 32},
transform: ^transform
} = Argument.from_value(:argument_name, 32, transform)
end
end
end

View file

@ -0,0 +1,36 @@
defmodule Reactor.Builder.ArgumentTest do
@moduledoc false
use ExUnit.Case, async: true
import Reactor.Builder.Argument
alias Reactor.{Argument, Template}
describe "asset_all_are_arguments/1" do
test "when given an argument with an input tuple, it returns an input argument" do
assert {:ok, [%Argument{name: :marty, source: %Template.Input{name: :doc}}]} =
assert_all_are_arguments(marty: {:input, :doc})
end
test "when given an argument with an result tuple, it returns an result argument" do
assert {:ok, [%Argument{name: :marty, source: %Template.Result{name: :doc}}]} =
assert_all_are_arguments(marty: {:result, :doc})
end
test "when given an argument with a value, it returns a value argument" do
assert {:ok, [%Argument{name: :marty, source: %Template.Value{value: :doc}}]} =
assert_all_are_arguments(marty: :doc)
end
test "when given an argument struct, it returns the argument struct" do
argument = Argument.from_value(:marty, :doc)
assert {:ok, [^argument]} = assert_all_are_arguments([argument])
end
test "when given any other value, it returns an error" do
assert {:error, error} = assert_all_are_arguments([:marty])
assert Exception.message(error) =~ ~r/contains a non-argument value/
end
end
end

View file

@ -0,0 +1,222 @@
defmodule Reactor.Builder.ComposeTest do
use ExUnit.Case, async: true
alias Reactor.{Argument, Builder, Builder.Compose, Error.ComposeError, Planner, Step, Template}
require Reactor.Argument
describe "compose/4" do
defmodule ShoutStep do
@moduledoc false
use Reactor.Step
def run(%{message: message}, _, _) do
{:ok, String.upcase(message)}
end
end
defmodule InnerReactor do
@moduledoc false
use Reactor
input :message
step :shout, ShoutStep do
argument :message, input(:message)
end
end
test "when the inner reactor is a module and would be recursive, it adds a compose step" do
assert {:ok, reactor} =
InnerReactor
|> Builder.new()
|> Compose.compose(:recurse, InnerReactor, message: {:input, :message})
assert recurse_step =
reactor.steps
|> Enum.find(&(&1.name == :recurse))
assert {Step.Compose, [reactor: InnerReactor]} = recurse_step.impl
assert [%Argument{name: :message, source: %Template.Input{name: :message}}] =
recurse_step.arguments
end
test "when the inner reactor is a struct and would be recursive, it adds a compose step" do
inner_reactor =
Builder.new()
|> Builder.add_input!(:message)
|> Builder.add_step!(:shout, ShoutStep, message: {:input, :message})
|> Builder.return!(:shout)
assert {:ok, outer_reactor} =
inner_reactor
|> Builder.compose(:recurse, inner_reactor, message: {:input, :message})
assert recurse_step =
outer_reactor.steps
|> Enum.find(&(&1.name == :recurse))
assert {Step.Compose, [reactor: ^inner_reactor]} = recurse_step.impl
assert [%Argument{name: :message, source: %Template.Input{name: :message}}] =
recurse_step.arguments
end
test "when the inner reactor is already planned, steps are taken from the plan" do
inner_reactor =
Builder.new()
|> Builder.add_input!(:message)
|> Builder.add_step!(:shout, ShoutStep, message: {:input, :message})
|> Builder.return!(:shout)
|> Planner.plan!()
assert [] = inner_reactor.steps
assert is_struct(inner_reactor.plan, Graph)
assert {:ok, outer_reactor} =
Builder.new()
|> Builder.add_input!(:name)
|> Compose.compose(:shout_at, inner_reactor, message: {:input, :name})
assert {:__reactor__, :compose, :shout_at, :shout} in Enum.map(
outer_reactor.steps,
& &1.name
)
end
test "when the inner reactor is not already planned, steps are taken from the reactor" do
inner_reactor =
Builder.new()
|> Builder.add_input!(:message)
|> Builder.add_step!(:shout, ShoutStep, message: {:input, :message})
|> Builder.return!(:shout)
assert {:ok, outer_reactor} =
Builder.new()
|> Builder.add_input!(:name)
|> Compose.compose(:shout_at, inner_reactor, message: {:input, :name})
assert {:__reactor__, :compose, :shout_at, :shout} in Enum.map(
outer_reactor.steps,
& &1.name
)
end
test "when the inner reactor does not have a return value, it returns an error" do
inner_reactor =
Builder.new()
|> Builder.add_input!(:message)
|> Builder.add_step!(:shout, ShoutStep, message: {:input, :message})
assert {:error, %ComposeError{} = error} =
Builder.new()
|> Builder.add_input!(:name)
|> Compose.compose(:shout_at, inner_reactor, message: {:input, :name})
assert Exception.message(error) =~ ~r/must have an explicit return value/i
end
test "when provided an invalid argument, it returns an error" do
inner_reactor =
Builder.new()
|> Builder.add_input!(:message)
|> Builder.add_step!(:shout, ShoutStep, message: {:input, :message})
assert {:error, %ArgumentError{} = error} =
Builder.new()
|> Builder.add_input!(:name)
|> Compose.compose(:shout_at, inner_reactor, [:marty])
assert Exception.message(error) =~ ~r/contains a non-argument value/i
end
test "when not all inner reactor inputs are covered by the provided arguments, it returns an error" do
inner_reactor =
Builder.new()
|> Builder.add_input!(:message)
|> Builder.add_step!(:shout, ShoutStep, message: {:input, :message})
assert {:error, %ComposeError{} = error} =
Builder.new()
|> Builder.add_input!(:name)
|> Compose.compose(:shout_at, inner_reactor, [])
assert Exception.message(error) =~ ~r/missing argument for `message` input/i
end
test "inner steps are rewritten in the generated reactor" do
{inner_reactor, outer_reactor} = multi_step_composed_reactor()
steps_by_name = outer_reactor.steps |> Map.new(&{&1.name, &1})
for step <- inner_reactor.steps do
assert outer_step = Map.get(steps_by_name, {:__reactor__, :compose, :shout, step.name})
outer_arguments_by_name = outer_step.arguments |> Map.new(&{&1.name, &1})
for argument <- step.arguments do
assert outer_argument = Map.get(outer_arguments_by_name, argument.name)
if Argument.is_from_result(argument) do
assert outer_argument.source.name ==
{:__reactor__, :compose, :shout, argument.source.name}
end
end
end
end
test "a return step is generated" do
{inner_reactor, outer_reactor} = multi_step_composed_reactor()
steps_by_name = outer_reactor.steps |> Map.new(&{&1.name, &1})
assert return_step = Map.get(steps_by_name, :shout)
assert [return_value] = return_step.arguments
assert Argument.is_from_result(return_value)
assert return_value.name == :value
assert return_value.source.name == {:__reactor__, :compose, :shout, inner_reactor.return}
end
test "the ID of the inner reactor is stored in the outer reactor context" do
{inner_reactor, outer_reactor} = multi_step_composed_reactor()
assert inner_reactor.id in Enum.to_list(outer_reactor.context.private.composed_reactors)
end
end
defmodule GreeterStep do
@moduledoc false
use Reactor.Step
def run(%{first_name: first_name, last_name: last_name}, _, _) do
{:ok, "Hello #{first_name} #{last_name}"}
end
end
defp multi_step_composed_reactor do
shouty_reactor =
Builder.new()
|> Builder.add_input!(:first_name, &String.upcase/1)
|> Builder.add_input!(:last_name, &String.upcase/1)
|> Builder.add_step!(:greet, GreeterStep,
first_name: {:input, :first_name},
last_name: {:input, :last_name}
)
|> Builder.return!(:greet)
composed_reactor =
Builder.new()
|> Builder.add_input!(:user)
|> Builder.add_step!(:first_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :first_name)},
user: {:input, :user}
)
|> Builder.add_step!(:last_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :last_name)},
user: {:input, :user}
)
|> Builder.compose!(:shout, shouty_reactor,
first_name: {:result, :first_name},
last_name: {:result, :last_name}
)
|> Builder.return!(:shout)
{shouty_reactor, composed_reactor}
end
end

View file

@ -0,0 +1,48 @@
defmodule Reactor.Builder.InputTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.{Builder, Builder.Input, Step}
describe "add_input/3" do
test "when the input has no transform, it is added to the reactor" do
assert {:ok, reactor} =
Builder.new()
|> Input.add_input(:marty, nil)
assert :marty in reactor.inputs
assert [] = reactor.steps
end
test "when the input has a transform function the input and a transform step are added to the reactor" do
assert {:ok, reactor} =
Builder.new()
|> Input.add_input(:marty, &Function.identity/1)
assert :marty in reactor.inputs
assert [step] = reactor.steps
assert step.name == {:__reactor__, :transform, :input, :marty}
assert step.impl == {Step.Transform, fun: &Function.identity/1}
end
test "when the input has a transform impl the input and a transform step are added to the reactor" do
assert {:ok, reactor} =
Builder.new()
|> Input.add_input(:marty, {Step.Transform, fun: &Function.identity/1})
assert :marty in reactor.inputs
assert [step] = reactor.steps
assert step.name == {:__reactor__, :transform, :input, :marty}
assert step.impl == {Step.Transform, fun: &Function.identity/1}
end
test "when the transform is not valid, it returns an error" do
assert {:error, %ArgumentError{} = error} =
Builder.new()
|> Input.add_input(:marty, :doc)
assert Exception.message(error) =~ ~r/invalid transform function/i
end
end
end

View file

@ -0,0 +1,234 @@
defmodule Reactor.Builder.StepTest do
@moduledoc false
use ExUnit.Case, async: true
require Reactor.Argument
alias Reactor.{Argument, Builder, Step}
defmodule GreeterStep do
@moduledoc false
use Reactor.Step
def run(%{first_name: first_name, last_name: last_name}, _, _) do
{:ok, "Hello #{first_name} #{last_name}"}
end
end
describe "add_step/5" do
test "when given an invalid argument it returns an error" do
reactor = Builder.new()
assert {:error, %ArgumentError{} = error} =
Builder.Step.add_step(reactor, :greet, GreeterStep, [:marty], [])
assert Exception.message(error) =~ ~r/non-argument/i
end
test "when the impl is not a `Reactor.Step` it returns an error" do
reactor = Builder.new()
assert {:error, %ArgumentError{} = error} =
Builder.Step.add_step(reactor, :greet, Kernel, [], [])
assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i
end
test "when the step depends on a transformed input, its arguments are rewritten" do
reactor =
Builder.new()
|> Builder.add_input!(:first_name, &String.upcase/1)
assert {:ok, reactor} =
Builder.Step.add_step(
reactor,
:greet,
GreeterStep,
[first_name: {:input, :first_name}],
[]
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{arguments: [argument]} = steps_by_name[:greet]
assert argument.name == :first_name
assert Argument.is_from_result(argument)
assert argument.source.name == {:__reactor__, :transform, :input, :first_name}
end
test "when the step has argument transforms, it adds transformation steps" do
transform = &Function.identity/1
reactor =
Builder.new()
|> Builder.add_input!(:first_name)
assert {:ok, reactor} =
Builder.Step.add_step(
reactor,
:greet,
GreeterStep,
[Argument.from_input(:first_name, :first_name, transform)],
[]
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{arguments: [argument]} = steps_by_name[:greet]
assert argument.name == :first_name
assert Argument.is_from_result(argument)
assert argument.source.name == {:__reactor__, :transform, :first_name, :greet}
assert %Step{arguments: [argument], impl: {Step.Transform, fun: ^transform}} =
Map.get(steps_by_name, {:__reactor__, :transform, :first_name, :greet})
assert Argument.is_from_input(argument)
assert argument.source.name == :first_name
end
test "when the step has a transform, it adds a transform all step" do
transform = &Function.identity/1
reactor =
Builder.new()
|> Builder.add_input!(:first_name)
assert {:ok, reactor} =
Builder.Step.add_step(
reactor,
:greet,
GreeterStep,
[first_name: {:input, :first_name}],
transform: transform
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{arguments: [argument], context: context} = steps_by_name[:greet]
assert :value = context.private.replace_arguments
assert argument.name == :value
assert Argument.is_from_result(argument)
assert argument.source.name == {:__reactor__, :transform, :greet}
assert %Step{
arguments: [argument],
impl: {Step.TransformAll, fun: ^transform}
} = Map.get(steps_by_name, {:__reactor__, :transform, :greet})
assert argument.name == :first_name
assert Argument.is_from_input(argument)
assert argument.source.name == :first_name
end
test "it defaults to an async step" do
assert {:ok, reactor} =
Builder.new()
|> Builder.Step.add_step(
:greet,
GreeterStep,
[],
[]
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{async?: true} = steps_by_name[:greet]
end
test "synchronous steps can be asked for" do
assert {:ok, reactor} =
Builder.new()
|> Builder.Step.add_step(
:greet,
GreeterStep,
[],
async?: false
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{async?: false} = steps_by_name[:greet]
end
test "additional context can be provided" do
assert {:ok, reactor} =
Builder.new()
|> Builder.Step.add_step(
:greet,
GreeterStep,
[],
context: %{awesome?: true}
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{context: %{awesome?: true}} = steps_by_name[:greet]
end
test "max retries defaults to 100" do
assert {:ok, reactor} =
Builder.new()
|> Builder.Step.add_step(
:greet,
GreeterStep,
[],
[]
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{max_retries: 100} = steps_by_name[:greet]
end
test "max retries can be provided" do
assert {:ok, reactor} =
Builder.new()
|> Builder.Step.add_step(
:greet,
GreeterStep,
[],
max_retries: 99
)
steps_by_name = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{max_retries: 99} = steps_by_name[:greet]
end
end
describe "new_step/4" do
test "it builds a step" do
assert {:ok, %Step{}} = Builder.Step.new_step(:marty, GreeterStep, [], [])
end
test "when given an invalid argument it returns an error" do
assert {:error, error} = Builder.Step.new_step(:marty, GreeterStep, [:doc], [])
assert Exception.message(error) =~ "non-argument"
end
test "when the impl is not a `Reactor.Step` it returns an error" do
assert {:error, %ArgumentError{} = error} = Builder.Step.new_step(:greet, Kernel, [], [])
assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i
end
test "when the step relies on transformed arguments, it returns an error" do
assert {:error, %ArgumentError{} = error} =
Builder.Step.new_step(
:greet,
GreeterStep,
[
Argument.from_input(:first_name, :name, &String.upcase/1)
],
[]
)
assert Exception.message(error) =~ ~r/has a transform attached/i
end
test "when the step wants a transform option, it returns an error" do
assert {:error, %ArgumentError{} = error} =
Builder.Step.new_step(
:greet,
GreeterStep,
[first_name: {:input, :first_name}],
transform: &Function.identity/1
)
assert Exception.message(error) =~ ~r/adding transforms to dynamic steps is not supported/i
end
end
end

View file

@ -16,26 +16,21 @@ defmodule Reactor.BuilderTest do
assert Exception.message(error) =~ "not a Reactor"
end
test "when the input doesn't have a transformer, it adds the input step directly" do
test "when the input doesn't have a transformer, it adds the input" do
{:ok, reactor} = add_input(new(), :marty)
assert :marty in reactor.inputs
[step] = reactor.steps
assert step.name == {:input, :marty}
assert step.impl == {Step.Input, name: :marty}
end
test "when the input has a transformer it adds a transform step and an input step" do
test "when the input has a transformer it adds a transform step and the input" do
{:ok, reactor} = add_input(new(), :marty, &String.upcase/1)
assert :marty in reactor.inputs
[input_step, transform_step] = reactor.steps
assert input_step.name == {:raw_input, :marty}
assert input_step.impl == {Step.Input, name: :marty}
[transform_step] = reactor.steps
assert transform_step.name == {:input, :marty}
assert transform_step.name == {:__reactor__, :transform, :input, :marty}
assert transform_step.impl == {Step.Transform, fun: &String.upcase/1}
assert [argument] = transform_step.arguments
assert argument.name == :value
assert argument.source == %Template.Result{name: {:raw_input, :marty}}
assert argument.source == %Template.Input{name: :marty}
end
end
@ -61,27 +56,29 @@ defmodule Reactor.BuilderTest do
test "when the arguments option is not a list, it returns an error" do
reactor = new()
assert {:error, error} = add_step(reactor, :marty, Noop, :wat)
assert Exception.message(error) =~ "is not a list"
assert Exception.message(error) =~ "not a list"
end
test "when the options option is not a list, it returns an error" do
reactor = new()
assert {:error, error} = add_step(reactor, :marty, Noop, [], :wat)
assert Exception.message(error) =~ "is not a list"
assert Exception.message(error) =~ "not a list"
end
test "when an argument is an input tuple, it is converted to a argument struct in the step" do
test "when an argument is an input tuple, it is converted into an argument struct" do
reactor = new()
assert {:ok, %{steps: [step]}} = add_step(reactor, :marty, Noop, mentor: {:input, :doc})
# this is a `result` not an `input` argument because it the input is
# emitted as a separate step.
assert [%Argument{name: :mentor, source: %Template.Result{name: {:input, :doc}}}] =
step.arguments
assert [
%Argument{
name: :mentor,
source: %Template.Input{name: :doc}
}
] = step.arguments
end
test "when an argument is a result tuple, it is converted to a argument struct in the step" do
test "when an argument is a result tuple, it is converted to a argument struct" do
reactor = new()
assert {:ok, %{steps: [step]}} =
@ -106,7 +103,7 @@ defmodule Reactor.BuilderTest do
test "when an argument is anything else, it is an error" do
reactor = new()
assert {:error, error} = add_step(reactor, :marty, Noop, [:wat])
assert Exception.message(error) =~ "is not a `Reactor.Argument` struct"
assert Exception.message(error) =~ "contains a non-argument value"
end
test "when an argument has a transformation function, it adds a transformation step to the reactor" do
@ -131,7 +128,7 @@ defmodule Reactor.BuilderTest do
} = steps[{:__reactor__, :transform, :mentor, :marty}]
end
test "when the step has an argument transformation function, it adds the collect and transformation step to the reactor" do
test "when the step has an argument transformation function, it adds a transformation step to the reactor" do
reactor = new()
assert {:ok, reactor} =
@ -156,13 +153,72 @@ defmodule Reactor.BuilderTest do
]
} = steps[:add_user_to_org]
assert %Step{
arguments: [
%Argument{name: :user, source: %Template.Result{name: :create_user}},
%Argument{name: :org, source: %Template.Result{name: :create_org}}
],
impl: {Step.TransformAll, [fun: _]}
} = steps[{:__reactor__, :transform, :add_user_to_org}]
assert %Step{arguments: arguments, impl: {Step.TransformAll, [fun: _]}} =
steps[{:__reactor__, :transform, :add_user_to_org}]
arguments = Enum.map(arguments, &{&1.name, &1.source.name})
assert {:user, :create_user} in arguments
assert {:org, :create_org} in arguments
end
end
describe "compose/2" do
defmodule GreeterStep do
@moduledoc false
use Reactor.Step
def run(%{first_name: first_name, last_name: last_name}, _, _) do
{:ok, "Hello #{first_name} #{last_name}"}
end
end
test "when the reactor argument is not a reactor struct" do
assert_raise ArgumentError, ~r/reactor.*not a Reactor/, fn ->
compose!(:marty, :doc, new(), [])
end
end
test "when the other reactor argument is not a reactor struct" do
assert_raise ArgumentError, ~r/inner_reactor.*not a Reactor/, fn ->
compose!(new(), :doc, 123, [])
end
end
test "when the other arguments argument is not a list" do
assert_raise ArgumentError, ~r/arguments.*not a list/, fn ->
compose!(new(), :doc, new(), :marty)
end
end
test "it can compose two reactors together" do
shouty_reactor =
new()
|> add_input!(:first_name, &String.upcase/1)
|> add_input!(:last_name, &String.upcase/1)
|> add_step!(:greet, GreeterStep,
first_name: {:input, :first_name},
last_name: {:input, :last_name}
)
|> return!(:greet)
composite_reactor =
new()
|> add_input!(:user)
|> add_step!(:first_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :first_name)},
user: {:input, :user}
)
|> add_step!(:last_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :last_name)},
user: {:input, :user}
)
|> compose!(:shout, shouty_reactor,
first_name: {:result, :first_name},
last_name: {:result, :last_name}
)
|> return!(:shout)
assert {:ok, "Hello MARTY MCFLY"} =
Reactor.run(composite_reactor, %{user: %{first_name: "Marty", last_name: "McFly"}})
end
end
end

View file

@ -0,0 +1,35 @@
defmodule Reactor.Dsl.PlanableVerifierTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.{Dsl.PlanableVerifier, Error.PlanError}
test "is a Spark verifier" do
assert Spark.implements_behaviour?(PlanableVerifier, Spark.Dsl.Verifier)
end
defmodule NoopStep do
@moduledoc false
use Reactor.Step
def run(_, _, _), do: {:ok, :noop}
end
describe "verify/1" do
test "refuses to compile cyclic reactors" do
assert_raise PlanError, ~r/cyclic/i, fn ->
defmodule DegenerateReactor do
@moduledoc false
use Reactor
step :a, NoopStep do
argument :b, result(:b)
end
step :b, NoopStep do
argument :a, result(:a)
end
end
end
end
end
end

View file

@ -0,0 +1,47 @@
defmodule Reactor.Dsl.TransformerTest do
@moduledoc false
use ExUnit.Case, async: true
alias Spark.{Dsl.Extension, Error.DslError}
defmodule Noop do
@moduledoc false
use Reactor.Step
def run(_, _, _), do: {:ok, :noop}
end
defmodule NoReturnReactor do
@moduledoc false
use Reactor
step :a, Noop
end
describe "transform/1" do
test "when the Reactor has no explicit return, it uses the last step" do
assert :a = Extension.get_opt(NoReturnReactor, [:reactor], :return)
end
test "when the Reactor has a return that is unknown, it raises a DSL error" do
assert_raise DslError, ~r/return value/i, fn ->
defmodule InvalidReturnReactor do
@moduledoc false
use Reactor
step :a, Noop
return :b
end
end
end
test "when the Reactor has no steps, it raises a DSL error" do
assert_raise DslError, ~r/no steps/i, fn ->
defmodule EmptyReactor do
@moduledoc false
use Reactor
end
end
end
end
end

View file

@ -0,0 +1,26 @@
defmodule Reactor.InfoTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.{Builder, Info}
describe "to_struct/1" do
test "when passed a DSL module, it generates a Reactor struct" do
assert {:ok, %Reactor{} = reactor} = Info.to_struct(Example.BasicReactor)
assert reactor.id == Example.BasicReactor
assert reactor.steps |> hd() |> Map.get(:name) == :verify
end
test "when passed a Reactor struct, it returns it unchanged" do
reactor =
Builder.new()
|> Builder.add_input!(:age)
|> Builder.add_input!(:country)
|> Builder.add_step!(:verify, Example.BasicReactor.DrinkingAgeVerifier,
age: {:input, :age},
country: {:input, :country}
)
assert {:ok, ^reactor} = Info.to_struct(reactor)
end
end
end

View file

@ -0,0 +1,95 @@
defmodule Reactor.PlannerTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.{Builder, Error.PlanError, Info, Planner}
describe "plan/1" do
test "when the argument is not a reactor, it returns an error" do
{:error, %ArgumentError{} = error} = Planner.plan(:marty)
assert Exception.message(error) =~ ~r/not a reactor/i
end
test "when the reactor has no existing plan, it creates one" do
{:ok, reactor} = Info.to_struct(Example.BasicReactor)
refute reactor.plan
{:ok, reactor} = Planner.plan(reactor)
assert reactor.plan
end
test "when the reactor already has a plan, it amends it" do
reactor =
Example.BasicReactor
|> Info.to_struct!()
|> Planner.plan!()
|> Builder.add_step!(:second_step, Example.BasicReactor.DrinkingAgeVerifier)
{:ok, reactor} = Planner.plan(reactor)
assert [] = reactor.steps
planned_step_names =
reactor.plan
|> Graph.vertices()
|> MapSet.new(& &1.name)
expected_step_names = MapSet.new([:verify, :second_step])
assert MapSet.equal?(expected_step_names, planned_step_names)
end
test "it converts steps and arguments into a DAG" do
{:ok, reactor} =
Builder.new()
|> Builder.add_step!(:a, Example.BasicReactor.DrinkingAgeVerifier)
|> Builder.add_step!(:b, Example.BasicReactor.DrinkingAgeVerifier, a: {:result, :a})
|> Planner.plan()
assert [] = reactor.steps
created_graph_vertices =
reactor.plan
|> Graph.vertices()
|> MapSet.new(& &1.name)
expected_graph_vertices = MapSet.new([:a, :b])
assert MapSet.equal?(created_graph_vertices, expected_graph_vertices)
created_graph_edges =
reactor.plan
|> Graph.edges()
|> Enum.map(& &1.label)
expected_graph_edges = [{:argument, :a, :for, :b}]
assert created_graph_edges == expected_graph_edges
end
test "when the created graph would be cyclic, it returns an error" do
assert {:error, %PlanError{} = error} =
Builder.new()
|> Builder.add_step!(:a, Example.BasicReactor.DrinkingAgeVerifier, b: {:result, :b})
|> Builder.add_step!(:b, Example.BasicReactor.DrinkingAgeVerifier, a: {:result, :a})
|> Planner.plan()
assert Exception.message(error) =~ ~r/cyclic/i
end
test "when given an invalid step, it returns an error" do
assert {:error, %PlanError{} = error} =
Builder.new()
|> Map.put(:steps, [%{name: :marty}])
|> Planner.plan()
assert Exception.message(error) =~ ~r/not a `Reactor.Step` struct/
end
test "when an argument depends on an unknown step, it returns an error" do
assert {:error, %PlanError{} = error} =
Builder.new()
|> Builder.add_step!(:a, Example.BasicReactor.DrinkingAgeVerifier, a: {:result, :b})
|> Planner.plan()
assert Exception.message(error) =~ ~r/cannot be found/i
end
end
end

View file

@ -8,6 +8,14 @@ defmodule Reactor.Step.AnonFnTest do
end
describe "run/3" do
test "it can handle 1 arity anonymous functions" do
fun = fn arguments ->
arguments.first_name
end
assert :marty = run(%{first_name: :marty}, %{}, fun: fun)
end
test "it can handle 2 arity anonymous functions" do
fun = fn arguments, _ ->
arguments.first_name

View file

@ -0,0 +1,72 @@
defmodule Reactor.Step.ComposeTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.{Argument, Step}
import Reactor.Builder
require Reactor.Argument
test "it is a step" do
assert Spark.implements_behaviour?(Step.Compose, Step)
end
defmodule GreeterStep do
@moduledoc false
use Reactor.Step
def run(%{whom: whom}, _, _) do
{:ok, "Hello #{whom}"}
end
end
describe "run/3" do
test "when the composition would be recursive, it just runs the reactor directly" do
inner_reactor =
new()
|> add_input!(:whom)
|> add_step!(:greet, GreeterStep, whom: {:input, :whom})
|> return!(:greet)
assert {:ok, "Hello Marty McFly"} =
Step.Compose.run(
%{whom: "Marty McFly"},
%{
current_step: %{name: :greet_marty},
private: %{composed_reactors: MapSet.new([inner_reactor.id])}
},
reactor: inner_reactor
)
end
test "when the composition is not recursive, it emits rewritten steps" do
inner_reactor =
new()
|> add_input!(:whom)
|> add_step!(:greet, GreeterStep, whom: {:input, :whom})
|> return!(:greet)
assert {:ok, nil, new_steps} =
Step.Compose.run(%{whom: "Marty McFly"}, %{current_step: %{name: :greet_marty}},
reactor: inner_reactor
)
new_steps_by_name = Map.new(new_steps, &{&1.name, &1})
assert %Step{arguments: [argument], impl: impl} =
Map.get(new_steps_by_name, {Step.Compose, :greet_marty, :greet})
assert Argument.is_from_value(argument)
assert argument.name == :whom
assert argument.source.value == "Marty McFly"
assert {Step.ComposeWrapper, [original: GreeterStep, prefix: [Step.Compose, :greet_marty]]} =
impl
assert %Step{arguments: [argument], impl: {Step.AnonFn, _}} =
Map.get(new_steps_by_name, :greet_marty)
assert Argument.is_from_result(argument)
assert argument.name == :value
assert argument.source.name == {Step.Compose, :greet_marty, :greet}
end
end
end

View file

@ -0,0 +1,106 @@
defmodule Reactor.Step.ComposeWrapperTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.{Builder, Step}
test "it is a step" do
assert Spark.implements_behaviour?(Step.ComposeWrapper, Step)
end
describe "run/3" do
test "when the `original` option is missing, it returns an error" do
assert {:error, error} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, prefix: [:a, :b])
assert Exception.message(error) =~ ~r/missing/i
end
test "when the `original` option is a non-step module, it returns an error" do
assert {:error, error} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}},
prefix: [:a, :b],
original: Kernel
)
assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i
end
test "when the `original` option refers to a non-step module, it returns an error" do
assert {:error, error} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}},
prefix: [:a, :b],
original: {Kernel, []}
)
assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i
end
test "when the `prefix` option is an empty list, it returns an error" do
assert {:error, error} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}},
prefix: [],
original: {Step.AnonFn, fun: fn args -> {:ok, args.a + 1} end}
)
assert Exception.message(error) =~ ~r/invalid `prefix` option/i
end
test "when the `prefix` option is not a list, it returns an error" do
assert {:error, error} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}},
prefix: :marty,
original: {Step.AnonFn, fun: fn args -> {:ok, args.a + 1} end}
)
assert Exception.message(error) =~ ~r/invalid `prefix` option/i
end
test "when the original step returns an ok tuple, it returns it" do
assert {:ok, 2} =
Step.ComposeWrapper.run(%{a: 1}, %{current_step: %{name: :foo}},
prefix: [:a, :b],
original: {Step.AnonFn, fun: fn args -> {:ok, args.a + 1} end}
)
end
test "when the original step returns an error tuple, it returns it" do
assert {:error, :wat} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}},
prefix: [:a, :b],
original: {Step.AnonFn, fun: fn _ -> {:error, :wat} end}
)
end
test "when the original step returns a halt tuple, it returns it" do
assert {:halt, :wat} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}},
prefix: [:a, :b],
original: {Step.AnonFn, fun: fn _ -> {:halt, :wat} end}
)
end
test "when the original step returns new dynamic steps, it rewrites them" do
[new_c, new_d] = [
Builder.new_step!(:c, {Step.AnonFn, fun: fn args -> {:ok, args.b} end}, b: {:input, :b}),
Builder.new_step!(:d, {Step.AnonFn, fun: fn args -> {:ok, args.c + 1} end},
c: {:result, :c}
)
]
assert {:ok, _, [rewritten_c, rewritten_d]} =
Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}},
prefix: [:a, :b],
original: {Step.AnonFn, fun: fn _ -> {:ok, nil, [new_c, new_d]} end}
)
assert rewritten_c.name == {:a, :b, new_c.name}
assert [{rewritten_arg, new_arg}] = Enum.zip(rewritten_c.arguments, new_c.arguments)
assert rewritten_arg.source.name == new_arg.source.name
assert rewritten_d.name == {:a, :b, new_d.name}
assert [{rewritten_arg, new_arg}] = Enum.zip(rewritten_d.arguments, new_d.arguments)
assert rewritten_arg.source.name == {:a, :b, new_arg.source.name}
end
end
end

View file

@ -1,25 +0,0 @@
defmodule Reactor.Step.InputTest do
@moduledoc false
use ExUnit.Case, async: true
import Reactor.Step.Input
test "it is a step" do
assert Spark.implements_behaviour?(Reactor.Step.Input, Reactor.Step)
end
describe "run/3" do
test "when the input is present in the private context it returns it" do
assert {:ok, :marty} = run(%{}, %{private: %{inputs: %{name: :marty}}}, name: :name)
end
test "when the input is not present in the private context it returns an error" do
assert {:error, error} = run(%{}, %{}, name: :name)
assert Exception.message(error) =~ "missing an input"
end
test "when the name option is not present it returns an error" do
assert {:error, error} = run(%{}, %{}, [])
assert Exception.message(error) =~ "Missing `:name` option"
end
end
end

View file

@ -0,0 +1,26 @@
defmodule Reactor.Step.TransformAllTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.Step
test "it is a step" do
assert Spark.implements_behaviour?(Step.TransformAll, Step)
end
describe "run/3" do
test "it applies the function to the `value` argument" do
assert {:ok, %{a: 2}} =
Step.TransformAll.run(%{a: 1}, %{}, fun: &Map.update(&1, :a, 1, fn v -> v * 2 end))
end
test "when the function returns a non-map value it returns an error" do
assert {:error, error} = Step.TransformAll.run(%{a: 1}, %{}, fun: fn _ -> :wat end)
assert Exception.message(error) =~ ~r/must return a map/i
end
test "when the function raises, it returns an error" do
assert {:error, error} = Step.TransformAll.run(%{a: 1}, %{}, fun: fn _ -> raise "hell" end)
assert Exception.message(error) == "hell"
end
end
end

View file

@ -4,7 +4,7 @@ defmodule Reactor.Step.TransformTest do
import Reactor.Step.Transform
test "it is a step" do
assert Spark.implements_behaviour?(Reactor.Step.Input, Reactor.Step)
assert Spark.implements_behaviour?(Reactor.Step.Transform, Reactor.Step)
end
describe "run/3" do
@ -13,11 +13,6 @@ defmodule Reactor.Step.TransformTest do
assert Exception.message(error) =~ "argument is missing"
end
test "when the function option is missing" do
assert {:error, error} = run(%{value: :marty}, %{}, [])
assert Exception.message(error) =~ "Invalid options"
end
test "it applies the transform" do
assert {:ok, "marty"} = run(%{value: :marty}, %{}, fun: &Atom.to_string/1)
end

View file

@ -0,0 +1,47 @@
defmodule Reactor.StepTest do
@moduledoc false
use ExUnit.Case, async: true
alias Reactor.{Builder, Step}
describe "can/2" do
test "when the module defines `undo/4`, it can undo" do
assert Step.can?(Example.Step.Undoable, :undo)
end
test "when the module does not define `undo/4`, it cannot undo" do
refute Step.can?(Example.Step.Greeter, :undo)
end
test "when the module defines `compensate/4`, it can compensate" do
assert Step.can?(Example.Step.Compensable, :compensate)
end
test "when the module does not defined `compensate/4`, it cannot compensate" do
refute Step.can?(Example.Step.Greeter, :compensate)
end
end
describe "run/3" do
test "it runs the step" do
step = Builder.new_step!(:greet, Example.Step.Greeter, whom: {:input, :whom})
assert {:ok, "Hello, Marty!"} = Step.run(step, %{whom: "Marty"}, %{})
end
end
describe "compensate/4" do
test "it runs the step's compensation callback" do
step = Builder.new_step!(:compensate, Example.Step.Compensable)
assert :ok = Step.compensate(step, "No plutonium", %{}, %{})
end
end
describe "undo/4" do
test "it runs the step's undo callback" do
step = Builder.new_step!(:undo, Example.Step.Undoable)
assert :ok = Step.undo(step, :marty, %{}, %{})
end
end
end

View file

@ -0,0 +1,93 @@
defmodule Reactor.UtilsTest do
@moduledoc false
use ExUnit.Case, async: true
import Reactor.Utils
describe "deep_merge/2" do
test "it can deeply merge two maps" do
lhs = %{a: %{b: %{c: %{d: :e}}}}
rhs = %{a: %{b: %{c: %{e: :f}}, g: :h}}
assert deep_merge(lhs, rhs) == %{a: %{b: %{c: %{d: :e, e: :f}}, g: :h}}
end
end
describe "maybe_append/2" do
test "it appends non-nil values to the collection" do
assert [1, 2, 3] = maybe_append([1, 2], 3)
end
test "it does not append nil values to the collection" do
assert [1, 2] = maybe_append([1, 2], nil)
end
end
describe "maybe_append_result/2" do
test "when the function returns a non-nil value, it is appended to the collection" do
assert [1, 2, 3] = maybe_append_result([1, 2], fn -> 3 end)
end
test "when the function returns a nil value, it is not appended to the collection" do
assert [1, 2] = maybe_append_result([1, 2], fn -> nil end)
end
end
describe "sentence/4" do
test "it converts a list of values into a sentence" do
assert "a, b or c" = sentence(~w[a b c]a, &to_string/1, ", ", " or ")
end
end
describe "map_while_ok/3" do
test "when all the map functions return an ok tuple, it maps the collection" do
assert {:ok, [2, 4, 6]} = map_while_ok([1, 2, 3], &{:ok, &1 * 2}, true)
end
test "when one of the map functions returns an error tuple, it returns the error" do
assert {:error, :fail} =
map_while_ok(
[1, 2, 3],
fn
i when rem(i, 2) == 0 -> {:error, :fail}
i -> {:ok, i * 2}
end,
true
)
end
test "it doesn't preserve order by default" do
assert {:ok, [6, 4, 2]} = map_while_ok([1, 2, 3], &{:ok, &1 * 2})
end
end
describe "reduce_while_ok/3" do
test "when all the reduce functions return an ok tuple, it reduces into an ok tuple" do
assert {:ok, 12} = reduce_while_ok([1, 2, 3], 0, &{:ok, &2 + &1 * 2})
end
test "when one of the reduce functions returns an error tuple, it returns the error" do
assert {:error, :fail} =
reduce_while_ok([1, 2, 3], 0, fn
i, _acc when rem(i, 2) == 0 -> {:error, :fail}
i, acc -> {:ok, acc + i * 2}
end)
end
end
describe "argument_error/3" do
test "it consistently formats the argument error message" do
message = """
`fruit` is not fruit
## Value of `fruit`
```
:pepperoni
```
"""
assert %ArgumentError{message: ^message} =
argument_error(:fruit, "is not fruit", :pepperoni)
end
end
end

View file

@ -1,19 +0,0 @@
defmodule Example.CyclicReactor do
@moduledoc false
use Reactor
defmodule Noop do
use Reactor.Step
@moduledoc false
def run(_, _, _), do: {:ok, :noop}
end
step :a, Noop do
argument :b, result(:b)
end
step :b, Noop do
argument :a, result(:a)
end
end