diff --git a/.formatter.exs b/.formatter.exs index b61cc1e..f47b589 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -4,6 +4,8 @@ spark_locals_without_parens = [ argument: 2, argument: 3, async?: 1, + compose: 2, + compose: 3, input: 1, input: 2, max_retries: 1, diff --git a/.vscode/settings.json b/.vscode/settings.json index 9bbc6d6..c37723d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,7 @@ "cSpell.words": [ "backoff", "casted", - "mappish" + "mappish", + "Planable" ] } diff --git a/lib/reactor.ex b/lib/reactor.ex index f4dc3a8..c38b292 100644 --- a/lib/reactor.ex +++ b/lib/reactor.ex @@ -51,13 +51,14 @@ defmodule Reactor do """ defstruct context: %{}, + id: nil, inputs: [], intermediate_results: %{}, plan: nil, - undo: [], return: nil, state: :pending, - steps: [] + steps: [], + undo: [] use Spark.Dsl, default_extensions: [extensions: Dsl] @@ -75,6 +76,7 @@ defmodule Reactor do @type t :: %Reactor{ context: context, + id: any, inputs: [atom], intermediate_results: %{any => any}, plan: nil | Graph.t(), diff --git a/lib/reactor/argument.ex b/lib/reactor/argument.ex index 302cd1e..fdbd8b0 100644 --- a/lib/reactor/argument.ex +++ b/lib/reactor/argument.ex @@ -9,7 +9,7 @@ defmodule Reactor.Argument do @type t :: %Argument{ name: atom, - source: Template.t(), + source: Template.Input.t() | Template.Result.t() | Template.Value.t(), transform: nil | (any -> any) | {module, keyword} | mfa } @@ -45,7 +45,7 @@ defmodule Reactor.Argument do ## Example - iex> Argument.from_input(:argument_name, :step_name, &Atom.to_string/1) + iex> Argument.from_result(:argument_name, :step_name, &Atom.to_string/1) """ @spec from_result(atom, any, nil | (any -> any)) :: Argument.t() @@ -53,6 +53,17 @@ defmodule Reactor.Argument do when is_atom(name) and maybe_transform(transform), do: %Argument{name: name, source: %Template.Result{name: result_name}, transform: transform} + @doc """ + Build an argument which refers to a statically defined value. + + ## Example + + iex> Argument.from_value(:argument_name, 10) + """ + @spec from_value(atom, any, nil | (any -> any)) :: Argument.t() + def from_value(name, value, transform \\ nil) when is_atom(name) and maybe_transform(transform), + do: %Argument{name: name, source: %Template.Value{value: value}, transform: transform} + @doc """ Validate that the argument is an Argument struct. """ @@ -68,6 +79,11 @@ defmodule Reactor.Argument do """ defguard is_from_result(argument) when is_struct(argument.source, Template.Result) + @doc """ + Validate that the argument contains a static value. + """ + defguard is_from_value(argument) when is_struct(argument.source, Template.Value) + @doc """ Validate that the argument has a transform. """ diff --git a/lib/reactor/argument/templates.ex b/lib/reactor/argument/templates.ex index 4f83b6b..38b4593 100644 --- a/lib/reactor/argument/templates.ex +++ b/lib/reactor/argument/templates.ex @@ -62,4 +62,32 @@ defmodule Reactor.Argument.Templates do def result(link_name) do %Template.Result{name: link_name} end + + @doc ~S""" + The `value` template helper for the Reactor DSL. + + ## Example + + ```elixir + defmodule ExampleReactor do + use Reactor + + input :number + + step :times_three do + argument :lhs, input(:number) + # here: -------↓↓↓↓↓ + argument :rhs, value(3) + + impl fn args, _, _ -> + {:ok, args.lhs * args.rhs} + end + end + end + ``` + """ + @spec value(any) :: Template.Value.t() + def value(value) do + %Template.Value{value: value} + end end diff --git a/lib/reactor/builder.ex b/lib/reactor/builder.ex index dcb9a52..98e2cb5 100644 --- a/lib/reactor/builder.ex +++ b/lib/reactor/builder.ex @@ -18,15 +18,10 @@ defmodule Reactor.Builder do ``` """ - alias Reactor.{Argument, Step, Template} - import Argument, only: :macros + alias Reactor.{Argument, Builder, Step} import Reactor, only: :macros import Reactor.Utils - defguardp is_mfa(mfa) - when tuple_size(mfa) == 2 and is_atom(elem(mfa, 0)) and - is_list(elem(mfa, 1)) - @type step_options :: [async? | max_retries() | arguments_transform | context] @typedoc "Should the step be run asynchronously?" @@ -48,9 +43,13 @@ defmodule Reactor.Builder do @doc """ Build a new, empty Reactor. + + Optionally an identifier for the Reactor. This is primarily used for recursive + composition tracking. """ - @spec new :: Reactor.t() - def new, do: %Reactor{} + @spec new(any) :: Reactor.t() + def new(id \\ make_ref()), + do: %Reactor{id: id, context: %{private: %{composed_reactors: MapSet.new([id])}}} @doc """ Add a named input to the Reactor. @@ -62,42 +61,22 @@ defmodule Reactor.Builder do def add_input(reactor, name, transform \\ nil) def add_input(reactor, _name, _transform) when not is_reactor(reactor), - do: {:error, ArgumentError.exception("`reactor`: not a Reactor")} + do: {:error, argument_error(:reactor, "not a Reactor", reactor)} - def add_input(reactor, name, nil) do - step = %Step{ - arguments: [], - async?: true, - impl: {Step.Input, name: name}, - name: {:input, name}, - max_retries: 0, - ref: make_ref() - } + def add_input(reactor, name, transform), + do: Builder.Input.add_input(reactor, name, transform) - {:ok, %{reactor | inputs: [name | reactor.inputs], steps: [step | reactor.steps]}} - end + @doc """ + Raising version of `add_input/2..3`. + """ + @spec add_input!(Reactor.t(), any, nil | (any -> any)) :: Reactor.t() | no_return + def add_input!(reactor, name, transform \\ nil) - def add_input(reactor, name, transform) - when is_function(transform, 1) or - (tuple_size(transform) == 2 and is_atom(elem(transform, 0)) and - is_list(elem(transform, 1))) do - input_step = %Step{ - arguments: [], - async?: true, - impl: {Step.Input, name: name}, - name: {:raw_input, name}, - max_retries: 0, - ref: make_ref() - } - - transform_step = build_transform_step({:raw_input, name}, {:input, name}, transform) - - {:ok, - %{ - reactor - | inputs: [name | reactor.inputs], - steps: [input_step, transform_step | reactor.steps] - }} + def add_input!(reactor, name, transform) do + case add_input(reactor, name, transform) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason + end end @doc """ @@ -117,88 +96,67 @@ defmodule Reactor.Builder do def add_step(reactor, name, impl, arguments \\ [], options \\ []) def add_step(reactor, _name, _impl, _arguments, _options) when not is_reactor(reactor), - do: {:error, ArgumentError.exception("`reactor`: not a Reactor")} + do: {:error, argument_error(:reactor, "not a Reactor", reactor)} def add_step(_reactor, _name, _impl, arguments, _options) when not is_list(arguments), - do: {:error, ArgumentError.exception("`arguments` is not a list")} + do: {:error, argument_error(:arguments, "not a list", arguments)} def add_step(_reactor, _name, _impl, _arguments, options) when not is_list(options), - do: {:error, ArgumentError.exception("`options` is not a list")} + do: {:error, argument_error(:options, "not a list", options)} - def add_step(reactor, name, impl, arguments, options) do - with {:ok, arguments} <- assert_all_are_arguments(arguments), - :ok <- assert_is_step_impl(impl), - {:ok, arguments, argument_transform_steps} <- - build_argument_transform_steps(arguments, name), - {:ok, arguments, step_transform_step} <- - maybe_build_step_transform_step(arguments, name, options[:transform]) do - context = - if step_transform_step do - options - |> Keyword.get(:context, %{}) - |> deep_merge(%{private: %{replace_arguments: :value}}) - else - Keyword.get(options, :context, %{}) - end + def add_step(reactor, name, impl, arguments, options), + do: Builder.Step.add_step(reactor, name, impl, arguments, options) - steps = - [ - %Step{ - arguments: arguments, - async?: Keyword.get(options, :async?, true), - context: context, - impl: impl, - name: name, - max_retries: Keyword.get(options, :max_retries, 100), - ref: make_ref() - } - ] - |> Enum.concat(argument_transform_steps) - |> maybe_append(step_transform_step) - |> Enum.concat(reactor.steps) + @doc """ + Raising version of `add_step/3..5`. + """ + @spec add_step!(Reactor.t(), name :: any, impl, [step_argument], step_options) :: + Reactor.t() | no_return + def add_step!(reactor, name, impl, arguments \\ [], options \\ []) - {:ok, %{reactor | steps: steps}} + def add_step!(reactor, name, impl, arguments, options) do + case add_step(reactor, name, impl, arguments, options) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason end end @doc """ Build a step which can be added to a reactor at runtime. - Note that the built step doesn't support argument transformations - you should - add an additional step to do the transformation needed (this is what - `add_step/5` does anyway). + Note that the built step doesn't support transformations - you should add an + additional step to do the transformation needed (this is what `add_step/5` + does anyway). """ - @spec new_step(name :: any, impl, [step_argument], step_options) :: - {:ok, Step.t()} | {:error, any} + @spec new_step(any, impl, [step_argument], step_options) :: {:ok, Step.t()} | {:error, any} def new_step(name, impl, arguments \\ [], options \\ []) def new_step(_name, _impl, arguments, _options) when not is_list(arguments), - do: {:error, ArgumentError.exception("`arguments` is not a list")} + do: {:error, argument_error(:arguments, "not a list", arguments)} def new_step(_name, _impl, _arguments, options) when not is_list(options), - do: {:error, ArgumentError.exception("`options` is not a list")} + do: {:error, argument_error(:options, "not a list", options)} - def new_step(name, impl, arguments, options) do - with {:ok, arguments} <- assert_all_are_arguments(arguments), - :ok <- assert_is_step_impl(impl) do - step = %Step{ - arguments: arguments, - async?: Keyword.get(options, :async?, true), - context: Keyword.get(options, :context, %{}), - impl: impl, - name: name, - max_retries: Keyword.get(options, :max_retries, 100), - ref: make_ref() - } + def new_step(name, impl, arguments, options), + do: Builder.Step.new_step(name, impl, arguments, options) - {:ok, step} + @doc """ + Raising version of `new_step/2..4`. + """ + @spec new_step!(any, impl, [step_argument], step_options) :: Step.t() | no_return + def new_step!(name, impl, arguments \\ [], options \\ []) + + def new_step!(name, impl, arguments, options) do + case new_step(name, impl, arguments, options) do + {:ok, step} -> step + {:error, reason} -> raise reason end end @doc """ Specify the return value of the Reactor. - The return value must be the result of a completed step. + The return value must be the name of a step. """ @spec return(Reactor.t(), any) :: {:ok, Reactor.t()} | {:error, any} def return(reactor, name) do @@ -209,127 +167,59 @@ defmodule Reactor.Builder do if name in step_names do {:ok, %{reactor | return: name}} else - {:error, ArgumentError.exception("`#{inspect(name)}` is not an existing step name")} + {:error, argument_error(:name, "not an existing step name.", name)} end end - defp assert_all_are_arguments(arguments) do - Enum.reduce_while(arguments, {:ok, []}, fn - argument, {:ok, arguments} when is_argument(argument) -> - {:cont, {:ok, [argument | arguments]}} - - {name, {:input, source}}, {:ok, arguments} -> - {:cont, {:ok, [Argument.from_input(name, source) | arguments]}} - - {name, {:result, source}}, {:ok, arguments} -> - {:cont, {:ok, [Argument.from_result(name, source) | arguments]}} - - not_argument, _ -> - {:halt, - {:error, - ArgumentError.exception( - "Value `#{inspect(not_argument)}` is not a `Reactor.Argument` struct." - )}} - end) - end - - defp assert_is_step_impl({impl, opts}) when is_list(opts), do: assert_is_step_impl(impl) - - defp assert_is_step_impl(impl) when is_atom(impl) do - if Spark.implements_behaviour?(impl, Step) do - :ok - else - {:error, - ArgumentError.exception( - "Module `#{inspect(impl)}` does not implement the `Reactor.Step` behaviour." - )} + @doc """ + Raising version of `return/2`. + """ + @spec return!(Reactor.t(), any) :: Reactor.t() | no_return + def return!(reactor, name) do + case return(reactor, name) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason end end - defp build_argument_transform_steps(arguments, step_name) do - arguments - |> Enum.reduce_while({:ok, [], []}, fn - argument, {:ok, arguments, steps} - when is_from_input(argument) and has_transform(argument) -> - transform_step_name = {:__reactor__, :transform, argument.name, step_name} + @doc """ + Compose another Reactor inside this one. - step = - build_transform_step( - {:input, argument.source.name}, - transform_step_name, - argument.transform - ) + Whenever possible this function will extract the steps from inner Reactor and + place them inside the parent Reactor. In order to achieve this the composer + will rename the steps to ensure that there are no conflicts. - argument = %Argument{ - name: argument.name, - source: %Template.Result{name: transform_step_name} - } + If you're attempting to create a recursive Reactor (ie compose a Reactor + within itself) then this will be detected and runtime composition will be used + instead. See `Reactor.Step.Compose` for more details. + """ + @spec compose(Reactor.t(), atom, Reactor.t() | module, [step_argument]) :: + {:ok, Reactor.t()} | {:error, any} + def compose(reactor, _name, _inner_reactor, _arguments) when not is_reactor(reactor), + do: {:error, argument_error(:reactor, "not a Reactor", reactor)} - {:cont, {:ok, [argument | arguments], [%{step | transform: nil} | steps]}} + def compose(_reactor, name, _inner_reactor, _arguments) when not is_atom(name), + do: {:error, argument_error(:name, "not an atom", name)} - argument, {:ok, arguments, steps} - when is_from_result(argument) and has_transform(argument) -> - transform_step_name = {:__reactor__, :transform, argument.name, step_name} + def compose(_reactor, _name, inner_reactor, _arguments) + when not is_reactor(inner_reactor) and not is_atom(inner_reactor), + do: {:error, argument_error(:inner_reactor, "not a Reactor", inner_reactor)} - step = - build_transform_step( - argument.source.name, - transform_step_name, - argument.transform - ) + def compose(_reactor, _name, _inner_reactor, arguments) when not is_list(arguments), + do: {:error, argument_error(:arguments, "not a list", arguments)} - argument = %Argument{ - name: argument.name, - source: %Template.Result{name: transform_step_name} - } + def compose(reactor, name, inner_reactor, arguments), + do: Builder.Compose.compose(reactor, name, inner_reactor, arguments) - {:cont, {:ok, [argument | arguments], [%{step | transform: nil} | steps]}} - - argument, {:ok, arguments, steps} when is_from_input(argument) -> - argument = %{argument | source: %Template.Result{name: {:input, argument.source.name}}} - - {:cont, {:ok, [argument | arguments], steps}} - - argument, {:ok, arguments, steps} when is_from_result(argument) -> - {:cont, {:ok, [argument | arguments], steps}} - end) - end - - defp maybe_build_step_transform_step(arguments, _name, nil), do: {:ok, arguments, nil} - - defp maybe_build_step_transform_step(arguments, name, transform) - when is_function(transform, 1), - do: maybe_build_step_transform_step(arguments, name, {Step.TransformAll, fun: transform}) - - defp maybe_build_step_transform_step(arguments, name, transform) do - step = %Step{ - arguments: arguments, - async?: true, - impl: transform, - name: {:__reactor__, :transform, name}, - max_retries: 0, - ref: make_ref() - } - - {:ok, [Argument.from_result(:value, step.name)], step} - end - - defp build_transform_step(input_name, step_name, transform) when is_function(transform, 1), - do: build_transform_step(input_name, step_name, {Step.Transform, fun: transform}) - - defp build_transform_step(input_name, step_name, transform) when is_mfa(transform) do - %Step{ - arguments: [ - %Argument{ - name: :value, - source: %Template.Result{name: input_name} - } - ], - async?: true, - impl: transform, - name: step_name, - max_retries: 0, - ref: make_ref() - } + @doc """ + Raising version of `compose/4`. + """ + @spec compose!(Reactor.t(), atom, Reactor.t() | module, [step_argument]) :: + Reactor.t() | no_return + def compose!(reactor, name, inner_reactor, arguments) do + case compose(reactor, name, inner_reactor, arguments) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason + end end end diff --git a/lib/reactor/builder/argument.ex b/lib/reactor/builder/argument.ex new file mode 100644 index 0000000..bfd8c79 --- /dev/null +++ b/lib/reactor/builder/argument.ex @@ -0,0 +1,23 @@ +defmodule Reactor.Builder.Argument do + @moduledoc false + + import Reactor.Argument, only: :macros + import Reactor.Utils + alias Reactor.Argument + + @doc """ + Given a list of argument structs or keywords convert them all into Argument + structs if possible, otherwise error. + """ + @spec assert_all_are_arguments([Argument.t() | {atom, {:input | :result, any} | any}]) :: + {:ok, [Argument.t()]} | {:error, Exception.t()} + def assert_all_are_arguments(arguments) do + map_while_ok(arguments, fn + argument when is_argument(argument) -> {:ok, argument} + {name, {:input, source}} -> {:ok, Argument.from_input(name, source)} + {name, {:result, source}} -> {:ok, Argument.from_result(name, source)} + {name, value} -> {:ok, Argument.from_value(name, value)} + _ -> {:error, argument_error(:arguments, "contains a non-argument value.", arguments)} + end) + end +end diff --git a/lib/reactor/builder/compose.ex b/lib/reactor/builder/compose.ex new file mode 100644 index 0000000..2cc0662 --- /dev/null +++ b/lib/reactor/builder/compose.ex @@ -0,0 +1,209 @@ +defmodule Reactor.Builder.Compose do + @moduledoc """ + Handle composition of Reactors for the builder. + + The composition logic was getting complicated enough that it seemed sensible + to extract it from the builder - if only to aid readability. + + You should not use this module directly, but instead use + `Reactor.Builder.compose/4`. + """ + import Reactor, only: :macros + import Reactor.Argument, only: :macros + import Reactor.Builder.Argument + import Reactor.Utils + alias Reactor.{Argument, Builder, Error.ComposeError, Step} + + @doc """ + Compose another Reactor inside this one. + """ + @spec compose(Reactor.t(), atom, Reactor.t() | module, [Builder.step_argument()]) :: + {:ok, Reactor.t()} | {:error, any} + def compose(reactor, name, inner_reactor, arguments) when is_atom(inner_reactor) do + if compose_would_be_recursive?(reactor, inner_reactor) do + do_runtime_compose(reactor, name, inner_reactor, arguments) + else + case Reactor.Info.to_struct(inner_reactor) do + {:ok, inner_reactor} -> compose(reactor, name, inner_reactor, arguments) + {:error, reason} -> {:error, reason} + end + end + end + + def compose(reactor, name, inner_reactor, arguments) when not is_nil(inner_reactor.plan) do + steps = + inner_reactor.plan + |> Graph.vertices() + |> Enum.concat(inner_reactor.steps) + + compose( + reactor, + name, + %{inner_reactor | steps: steps, plan: nil}, + arguments + ) + end + + def compose(reactor, name, inner_reactor, arguments) + when is_reactor(reactor) and is_atom(name) and is_reactor(inner_reactor) and + is_list(arguments) do + if compose_would_be_recursive?(reactor, inner_reactor.id) do + do_runtime_compose(reactor, name, inner_reactor, arguments) + else + do_static_compose(reactor, name, inner_reactor, arguments) + end + end + + defp do_runtime_compose(reactor, name, inner_reactor, arguments) do + Builder.add_step(reactor, name, {Step.Compose, reactor: inner_reactor}, arguments, + max_retries: 0 + ) + end + + def do_static_compose(reactor, name, inner_reactor, arguments) do + with {:ok, arguments} <- assert_all_are_arguments(arguments), + :ok <- assert_arguments_match_inner_reactor_inputs(arguments, inner_reactor), + {:ok, steps} <- rewrite_steps(inner_reactor, name, arguments), + {:ok, return} <- build_return_step(reactor, inner_reactor, name) do + steps = + steps + |> Enum.concat(reactor.steps) + |> Enum.concat([return]) + + reactor = + reactor + |> Map.put(:steps, steps) + |> add_composed_reactor(inner_reactor) + + {:ok, reactor} + end + end + + defp get_composed_reactors(reactor) do + reactor + |> Map.get(:context, %{}) + |> Map.get(:private, %{}) + |> Map.get(:composed_reactors, MapSet.new()) + end + + defp add_composed_reactor(reactor, inner_reactor) do + composed_reactors = + reactor + |> get_composed_reactors() + |> MapSet.put(inner_reactor.id) + + %{ + reactor + | context: deep_merge(reactor.context, %{private: %{composed_reactors: composed_reactors}}) + } + end + + defp compose_would_be_recursive?(reactor, id) when reactor.id == id, do: true + + defp compose_would_be_recursive?(reactor, id) do + reactor + |> get_composed_reactors() + |> MapSet.member?(id) + end + + defp build_return_step(reactor, inner_reactor, _name) when is_nil(inner_reactor.return), + do: + {:error, + ComposeError.exception( + outer_reactor: reactor, + inner_reactor: inner_reactor, + message: "The inner Reactor must have an explicit return value." + )} + + defp build_return_step(_reactor, inner_reactor, name) do + {:ok, + %Step{ + arguments: [ + Argument.from_result(:value, {:__reactor__, :compose, name, inner_reactor.return}) + ], + name: name, + async?: true, + impl: {Step.AnonFn, fun: &{:ok, Map.fetch!(&1, :value)}}, + max_retries: 0, + ref: make_ref() + }} + end + + defp assert_arguments_match_inner_reactor_inputs(arguments, inner_reactor) do + required_arguments = MapSet.new(inner_reactor.inputs) + provided_arguments = MapSet.new(arguments, & &1.name) + + required_arguments + |> MapSet.difference(provided_arguments) + |> Enum.to_list() + |> case do + [] -> + :ok + + [missing] -> + {:error, + ComposeError.exception( + inner_reactor: inner_reactor, + arguments: arguments, + message: "Missing argument for `#{missing}` input." + )} + + missing -> + missing = sentence(missing, &"`#{&1}`", ", ", " and ") + + {:error, + ComposeError.exception( + inner_reactor: inner_reactor, + arguments: arguments, + message: "Missing arguments for the following Reactor inputs; #{missing}" + )} + end + end + + defp rewrite_steps(reactor, name, input_arguments) do + input_arguments = Map.new(input_arguments, &{&1.name, &1}) + + reactor + |> extract_steps() + |> map_while_ok(&rewrite_step(&1, name, input_arguments)) + end + + defp rewrite_step(step, name, input_arguments) do + with {:ok, arguments} <- rewrite_arguments(step.arguments, name, input_arguments) do + step = %{ + step + | arguments: arguments, + name: {:__reactor__, :compose, name, step.name}, + impl: {Step.ComposeWrapper, original: step.impl, prefix: [:__reactor__, :compose, name]} + } + + {:ok, step} + end + end + + defp extract_steps(reactor) when is_nil(reactor.plan), do: reactor.steps + + defp extract_steps(reactor) do + reactor.plan + |> Graph.vertices() + |> Enum.concat(reactor.steps) + end + + defp rewrite_arguments(arguments, name, input_arguments), + do: map_while_ok(arguments, &rewrite_argument(&1, name, input_arguments)) + + defp rewrite_argument(argument, _name, input_arguments) when is_from_input(argument) do + input_argument = Map.fetch!(input_arguments, argument.source.name) + + {:ok, %{argument | source: input_argument.source}} + end + + defp rewrite_argument(argument, name, _input_arguments) when is_from_result(argument) do + source = %{argument.source | name: {:__reactor__, :compose, name, argument.source.name}} + + {:ok, %{argument | source: source}} + end + + defp rewrite_argument(argument, _name, _input_arguments) when is_from_value(argument), + do: {:ok, argument} +end diff --git a/lib/reactor/builder/input.ex b/lib/reactor/builder/input.ex new file mode 100644 index 0000000..bc9dc42 --- /dev/null +++ b/lib/reactor/builder/input.ex @@ -0,0 +1,38 @@ +defmodule Reactor.Builder.Input do + @moduledoc """ + Handle adding inputs to Reactors for the builder. + + You should not use this module directly, but instead use + `Reactor.Builder.add_input/3`. + """ + alias Reactor.{Argument, Step} + import Reactor.Utils + + @doc """ + Add a named input to the reactor. + """ + @spec add_input(Reactor.t(), any, nil | (any -> any) | {Step.step(), keyword}) :: + {:ok, Reactor.t()} | {:error, any} + def add_input(reactor, name, nil), do: {:ok, %{reactor | inputs: [name | reactor.inputs]}} + + def add_input(reactor, name, transform) when is_function(transform, 1), + do: add_input(reactor, name, {Step.Transform, fun: transform}) + + def add_input(reactor, name, transform) + when tuple_size(transform) == 2 and is_atom(elem(transform, 0)) and + is_list(elem(transform, 1)) do + transform_step = %Step{ + arguments: [Argument.from_input(:value, name)], + async?: true, + impl: transform, + name: {:__reactor__, :transform, :input, name}, + max_retries: 0, + ref: make_ref() + } + + {:ok, %{reactor | inputs: [name | reactor.inputs], steps: [transform_step | reactor.steps]}} + end + + def add_input(_reactor, _name, transform), + do: {:error, argument_error(:transform, "Invalid transform function", transform)} +end diff --git a/lib/reactor/builder/step.ex b/lib/reactor/builder/step.ex new file mode 100644 index 0000000..22c8fc2 --- /dev/null +++ b/lib/reactor/builder/step.ex @@ -0,0 +1,279 @@ +defmodule Reactor.Builder.Step do + @moduledoc """ + Handle building and adding steps to Reactors for the builder. + + You should not use this module directly, but instead use + `Reactor.Builder.new_step/4` and `Reactor.Builder.add_step/5`. + """ + alias Reactor.{Argument, Builder, Step, Template} + import Reactor.Argument, only: :macros + import Reactor.Utils + import Reactor.Builder.Argument + + @doc """ + Build and add a new step to a Reactor. + """ + @spec add_step( + Reactor.t(), + any, + Builder.impl(), + [Builder.step_argument()], + Builder.step_options() + ) :: {:ok, Reactor.t()} | {:error, any} + def add_step(reactor, name, impl, arguments, options) do + with {:ok, arguments} <- assert_all_are_arguments(arguments), + {:ok, arguments} <- maybe_rewrite_input_arguments(reactor, arguments), + :ok <- assert_is_step_impl(impl), + {:ok, {arguments, argument_transform_steps}} <- + build_argument_transform_steps(arguments, name), + {:ok, arguments, step_transform_step} <- + maybe_build_step_transform_all_step(arguments, name, options[:transform]), + {:ok, async} <- validate_async_option(options), + {:ok, context} <- validate_context_option(options), + {:ok, max_retries} <- validate_max_retries_option(options) do + context = + if step_transform_step do + deep_merge(context, %{private: %{replace_arguments: :value}}) + else + context + end + + steps = + [ + %Step{ + arguments: arguments, + async?: async, + context: context, + impl: impl, + name: name, + max_retries: max_retries, + ref: make_ref() + } + ] + |> Enum.concat(argument_transform_steps) + |> maybe_append(step_transform_step) + |> Enum.concat(reactor.steps) + + {:ok, %{reactor | steps: steps}} + end + end + + @doc """ + Dynamically build a new step for later use. + + You're most likely to use this when dynamically returning new steps from an + existing step. + """ + @spec new_step(any, Builder.impl(), [Builder.step_argument()], Builder.step_options()) :: + {:ok, Step.t()} | {:error, any} + def new_step(name, impl, arguments, options) do + with {:ok, arguments} <- assert_all_are_arguments(arguments), + :ok <- assert_no_argument_transforms(arguments), + :ok <- assert_is_step_impl(impl), + {:ok, async} <- validate_async_option(options), + {:ok, context} <- validate_context_option(options), + {:ok, max_retries} <- validate_max_retries_option(options), + :ok <- validate_no_transform_option(options) do + step = %Step{ + arguments: arguments, + async?: async, + context: context, + impl: impl, + name: name, + max_retries: max_retries, + ref: make_ref() + } + + {:ok, step} + end + end + + defp validate_async_option(options) do + options + |> Keyword.get(:async?, true) + |> case do + value when is_boolean(value) -> + {:ok, value} + + value -> + {:error, argument_error(:options, "Invalid value for the `async?` option.", value)} + end + end + + defp validate_context_option(options) do + options + |> Keyword.get(:context, %{}) + |> case do + value when is_map(value) -> + {:ok, value} + + value -> + {:error, + argument_error(:options, "Invalid value for the `context` option: must be a map.", value)} + end + end + + defp validate_max_retries_option(options) do + options + |> Keyword.get(:max_retries, 100) + |> case do + :infinity -> + {:ok, :infinity} + + value when is_integer(value) and value >= 0 -> + {:ok, value} + + value -> + {:error, + argument_error( + :options, + "Invalid value for the `max_retries` option: must be a non-negative integer or `:infinity`.", + value + )} + end + end + + defp validate_no_transform_option(options) do + if Keyword.has_key?(options, :transform) do + {:error, + argument_error(:options, "Adding transforms to dynamic steps is not supported.", options)} + else + :ok + end + end + + defp maybe_rewrite_input_arguments(reactor, arguments) do + existing_step_names = MapSet.new(reactor.steps, & &1.name) + + map_while_ok(arguments, fn + argument when is_from_input(argument) -> + potential_rewrite_step_name = {:__reactor__, :transform, :input, argument.source.name} + + if MapSet.member?(existing_step_names, potential_rewrite_step_name) do + {:ok, Argument.from_result(argument.name, potential_rewrite_step_name)} + else + {:ok, argument} + end + + argument when is_from_result(argument) or is_from_value(argument) -> + {:ok, argument} + end) + end + + defp assert_is_step_impl({impl, opts}) when is_list(opts), do: assert_is_step_impl(impl) + + defp assert_is_step_impl(impl) when is_atom(impl) do + if Spark.implements_behaviour?(impl, Step) do + :ok + else + {:error, + argument_error(:impl, "Module does not implement the `Reactor.Step` behaviour.", impl)} + end + end + + defp build_argument_transform_steps(arguments, step_name) do + arguments + |> reduce_while_ok({[], []}, fn + argument, {arguments, steps} when is_from_input(argument) and has_transform(argument) -> + transform_step_name = {:__reactor__, :transform, argument.name, step_name} + + step = + build_transform_step( + argument.source, + transform_step_name, + argument.transform + ) + + argument = %Argument{ + name: argument.name, + source: %Template.Result{name: transform_step_name} + } + + {:ok, {[argument | arguments], [%{step | transform: nil} | steps]}} + + argument, {arguments, steps} when is_from_result(argument) and has_transform(argument) -> + transform_step_name = {:__reactor__, :transform, argument.name, step_name} + + step = + build_transform_step( + argument.source, + transform_step_name, + argument.transform + ) + + argument = %Argument{ + name: argument.name, + source: %Template.Result{name: transform_step_name} + } + + {:ok, {[argument | arguments], [%{step | transform: nil} | steps]}} + + argument, {arguments, steps} -> + {:ok, {[argument | arguments], steps}} + end) + end + + defp maybe_build_step_transform_all_step(arguments, _name, nil), do: {:ok, arguments, nil} + + defp maybe_build_step_transform_all_step(arguments, name, transform) + when is_function(transform, 1), + do: + maybe_build_step_transform_all_step(arguments, name, {Step.TransformAll, fun: transform}) + + defp maybe_build_step_transform_all_step(arguments, name, transform) do + step = %Step{ + arguments: arguments, + async?: true, + impl: transform, + name: {:__reactor__, :transform, name}, + max_retries: 0, + ref: make_ref() + } + + {:ok, [Argument.from_result(:value, step.name)], step} + end + + defp build_transform_step(argument_source, step_name, transform) when is_function(transform, 1), + do: build_transform_step(argument_source, step_name, {Step.Transform, fun: transform}) + + defp build_transform_step(argument_source, step_name, transform) + when tuple_size(transform) == 2 and is_atom(elem(transform, 0)) and + is_list(elem(transform, 1)) do + %Step{ + arguments: [ + %Argument{ + name: :value, + source: argument_source + } + ], + async?: true, + impl: transform, + name: step_name, + max_retries: 0, + ref: make_ref() + } + end + + defp assert_no_argument_transforms(arguments) do + arguments + |> Enum.reject(&is_nil(&1.transform)) + |> case do + [] -> + :ok + + [argument] -> + {:error, + argument_error( + :arguments, + "Argument `#{argument.name}` has a transform attached.", + argument + )} + + arguments -> + sentence = sentence(arguments, &"`#{&1.name}`", ", ", " and ") + + {:error, + argument_error(:arguments, "Arguments #{sentence} have transforms attached.", arguments)} + end + end +end diff --git a/lib/reactor/dsl.ex b/lib/reactor/dsl.ex index a065422..9fd3a67 100644 --- a/lib/reactor/dsl.ex +++ b/lib/reactor/dsl.ex @@ -72,6 +72,9 @@ defmodule Reactor.Dsl do argument :user_id, result(:create_user) do transform & &1.id end + """, + """ + argument :three, value(3) """ ], args: [:name, {:optional, :source}], @@ -87,7 +90,9 @@ defmodule Reactor.Dsl do """ ], source: [ - type: {:or, [{:struct, Template.Input}, {:struct, Template.Result}]}, + type: + {:or, + [{:struct, Template.Input}, {:struct, Template.Result}, {:struct, Template.Value}]}, required: true, doc: """ What to use as the source of the argument. @@ -191,6 +196,38 @@ defmodule Reactor.Dsl do ] } + @compose %Entity{ + name: :compose, + describe: """ + Compose another Reactor into this one. + + Allows place another Reactor into this one as if it were a single step. + """, + args: [:name, :reactor], + target: Dsl.Compose, + no_depend_modules: [:reactor], + entities: [arguments: [@argument]], + schema: [ + name: [ + type: :atom, + required: true, + doc: """ + A unique name for the step. + + Allows the result of the composed reactor to be depended upon by steps + in this reactor. + """ + ], + reactor: [ + type: {:or, [{:struct, Reactor}, {:spark, Reactor.Dsl}]}, + required: true, + doc: """ + The reactor module or struct to compose upon. + """ + ] + ] + } + @reactor %Section{ name: :reactor, describe: "The top-level reactor DSL", @@ -203,11 +240,12 @@ defmodule Reactor.Dsl do """ ] ], - entities: [@input, @step], + entities: [@input, @step, @compose], top_level?: true } use Extension, sections: [@reactor], - transformers: [Dsl.Transformer] + transformers: [Dsl.Transformer], + verifiers: [Dsl.PlanableVerifier] end diff --git a/lib/reactor/dsl/compose.ex b/lib/reactor/dsl/compose.ex new file mode 100644 index 0000000..5e50a60 --- /dev/null +++ b/lib/reactor/dsl/compose.ex @@ -0,0 +1,14 @@ +defmodule Reactor.Dsl.Compose do + @moduledoc """ + The `compose` DSL entity struct. + + See `Reactor.Dsl`. + """ + defstruct arguments: [], name: nil, reactor: nil + + @type t :: %__MODULE__{ + arguments: [Reactor.Argument.t()], + name: any, + reactor: module | Reactor.t() + } +end diff --git a/lib/reactor/dsl/planable_verifier.ex b/lib/reactor/dsl/planable_verifier.ex new file mode 100644 index 0000000..3a7a7bc --- /dev/null +++ b/lib/reactor/dsl/planable_verifier.ex @@ -0,0 +1,32 @@ +defmodule Reactor.Dsl.PlanableVerifier do + @moduledoc """ + Verifies that the Reactor is not cyclic. + """ + + use Spark.Dsl.Verifier + alias Spark.{Dsl, Dsl.Verifier, Error.DslError} + alias Reactor.{Info, Planner} + + @doc """ + Ensure that a DSL-based Reactor is not cyclic. + """ + @impl true + @spec verify(Dsl.t()) :: :ok | {:error, any} + def verify(dsl_state) do + with {:ok, reactor} <- Info.to_struct(dsl_state), + {:ok, _} <- Planner.plan(reactor) do + :ok + else + {:error, reason} when is_binary(reason) -> + {:error, + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, :step], + message: reason + )} + + {:error, reason} when is_exception(reason) -> + {:error, reason} + end + end +end diff --git a/lib/reactor/dsl/transformer.ex b/lib/reactor/dsl/transformer.ex index 6504e49..06c2c97 100644 --- a/lib/reactor/dsl/transformer.ex +++ b/lib/reactor/dsl/transformer.ex @@ -1,18 +1,38 @@ defmodule Reactor.Dsl.Transformer do @moduledoc false - alias Reactor.Step + alias Reactor.{Dsl.Compose, Step} alias Spark.{Dsl, Dsl.Transformer, Error.DslError} use Transformer @doc false @spec transform(Dsl.t()) :: {:ok, Dsl.t()} | {:error, DslError.t()} def transform(dsl_state) do - step_names = - dsl_state - |> Transformer.get_entities([:reactor]) - |> Enum.filter(&is_struct(&1, Step)) - |> Enum.map(& &1.name) + with {:ok, step_names} <- step_names(dsl_state), + {:ok, dsl_state} <- maybe_set_return(dsl_state, step_names) do + validate_return(dsl_state, step_names) + end + end + defp step_names(dsl_state) do + dsl_state + |> Transformer.get_entities([:reactor]) + |> Enum.filter(&(is_struct(&1, Step) || is_struct(&1, Compose))) + |> Enum.map(& &1.name) + |> case do + [] -> + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: [:reactor], + message: "Reactor contains no steps" + )} + + step_names -> + {:ok, step_names} + end + end + + defp maybe_set_return(dsl_state, step_names) do case Transformer.get_option(dsl_state, [:reactor], :return) do nil -> dsl_state = @@ -21,18 +41,23 @@ defmodule Reactor.Dsl.Transformer do {:ok, dsl_state} - return_name -> - if return_name in step_names do - {:ok, dsl_state} - else - {:error, - DslError.exception( - module: Transformer.get_persisted(dsl_state, :module), - path: [:reactor], - message: - "Return value `#{inspect(return_name)}` does not correspond with an existing step" - )} - end + _ -> + {:ok, dsl_state} + end + end + + defp validate_return(dsl_state, step_names) do + name = Transformer.get_option(dsl_state, [:reactor], :return) + + if name in step_names do + {:ok, dsl_state} + else + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: [:reactor], + message: "Return value `#{inspect(name)}` does not correspond with an existing step" + )} end end end diff --git a/lib/reactor/errors/compose_error.ex b/lib/reactor/errors/compose_error.ex new file mode 100644 index 0000000..7c81cf7 --- /dev/null +++ b/lib/reactor/errors/compose_error.ex @@ -0,0 +1,52 @@ +defmodule Reactor.Error.ComposeError do + defexception [:outer_reactor, :inner_reactor, :message, :arguments] + import Reactor.Utils + + @impl true + def exception(attrs), do: struct(__MODULE__, attrs) + + @impl true + def message(error) do + [ + """ + # Unable to compose Reactors + + #{error.message} + """ + ] + |> maybe_append_result(fn -> + if error.arguments do + """ + ## Arguments + + ``` + #{inspect(error.arguments)} + ``` + """ + end + end) + |> maybe_append_result(fn -> + if error.inner_reactor do + """ + ## Inner Reactor + + ``` + #{inspect(error.inner_reactor)} + ``` + """ + end + end) + |> maybe_append_result(fn -> + if error.outer_reactor do + """ + ## Outer Reactor + + ``` + #{inspect(error.outer_reactor)} + ``` + """ + end + end) + |> Enum.join("\n") + end +end diff --git a/lib/reactor/errors/plan_error.ex b/lib/reactor/errors/plan_error.ex new file mode 100644 index 0000000..0640c71 --- /dev/null +++ b/lib/reactor/errors/plan_error.ex @@ -0,0 +1,52 @@ +defmodule Reactor.Error.PlanError do + defexception [:reactor, :graph, :step, :message] + import Reactor.Utils + + @impl true + def exception(attrs), do: struct(__MODULE__, attrs) + + @impl true + def message(error) do + [ + """ + # Unable to plan Reactor + + #{error.message} + """ + ] + |> maybe_append_result(fn -> + if error.reactor do + """ + ## Reactor + + ``` + #{inspect(error.reactor)} + ``` + """ + end + end) + |> maybe_append_result(fn -> + if error.step do + """ + ## Step + + ``` + #{inspect(error.step)} + ``` + """ + end + end) + |> maybe_append_result(fn -> + if error.graph do + """ + ## Graph + + ``` + #{inspect(error.graph)} + ``` + """ + end + end) + |> Enum.join("\n") + end +end diff --git a/lib/reactor/errors/transform_error.ex b/lib/reactor/errors/transform_error.ex new file mode 100644 index 0000000..b6f2ebc --- /dev/null +++ b/lib/reactor/errors/transform_error.ex @@ -0,0 +1,9 @@ +defmodule Reactor.Error.TransformError do + defexception input: nil, output: nil, message: nil + + @impl true + def exception(attrs), do: struct(__MODULE__, attrs) + + @impl true + def message(error), do: error.message +end diff --git a/lib/reactor/executor.ex b/lib/reactor/executor.ex index 396b223..4ff34c3 100644 --- a/lib/reactor/executor.ex +++ b/lib/reactor/executor.ex @@ -48,6 +48,9 @@ defmodule Reactor.Executor do {:ok, any} | {:halted, Reactor.t()} | {:error, any} def run(reactor, inputs \\ %{}, context \\ %{}, options \\ []) + def run(reactor, _inputs, _context, _options) when is_nil(reactor.return), + do: {:error, ArgumentError.exception("`reactor` has no return value")} + def run(reactor, inputs, context, options) when reactor.state in ~w[pending halted]a do case Executor.Init.init(reactor, inputs, context, options) do {:ok, reactor, state} -> execute(reactor, state) diff --git a/lib/reactor/executor/init.ex b/lib/reactor/executor/init.ex index 745ee31..cfcf3e8 100644 --- a/lib/reactor/executor/init.ex +++ b/lib/reactor/executor/init.ex @@ -5,6 +5,7 @@ defmodule Reactor.Executor.Init do alias Reactor.Executor import Reactor, only: :macros + import Reactor.Utils @doc false @spec init(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) :: @@ -19,15 +20,10 @@ defmodule Reactor.Executor.Init do {:ok, options} <- into_map(options) do state = Executor.State.init(options) - private = - context - |> Map.get(:private, %{}) - |> Map.put(:inputs, inputs) - context = reactor.context - |> Map.merge(context) - |> Map.put(:private, private) + |> deep_merge(context) + |> deep_merge(%{private: %{inputs: inputs}}) {:ok, %{reactor | context: context}, state} end @@ -53,11 +49,11 @@ defmodule Reactor.Executor.Init do missing_inputs = valid_input_names |> MapSet.difference(provided_input_names) - |> Enum.map_join(", ", &"`#{inspect(&1)}`") + |> sentence(&"`#{inspect(&1)}`", ", ", " and ") {:error, ArgumentError.exception( - message: "Reactor is missing the following inputs: #{missing_inputs}" + message: "Reactor is missing the following inputs; #{missing_inputs}" )} end end diff --git a/lib/reactor/executor/step_runner.ex b/lib/reactor/executor/step_runner.ex index b05e633..ce29076 100644 --- a/lib/reactor/executor/step_runner.ex +++ b/lib/reactor/executor/step_runner.ex @@ -2,8 +2,9 @@ defmodule Reactor.Executor.StepRunner do @moduledoc """ Run an individual step, including compensation if possible. """ - alias Reactor.{Step, Template} + alias Reactor.Step import Reactor.Utils + import Reactor.Argument, only: :macros require Logger @max_undo_count 5 @@ -88,25 +89,40 @@ defmodule Reactor.Executor.StepRunner do end defp get_step_arguments(reactor, step) do - Enum.reduce_while(step.arguments, {:ok, %{}}, fn argument, {:ok, arguments} -> - with %Template.Result{name: dependency_name} <- argument.source, - {:ok, value} <- Map.fetch(reactor.intermediate_results, dependency_name) do - {:cont, {:ok, Map.put(arguments, argument.name, value)}} - else - %Template.Input{} -> - {:halt, - {:error, - "Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` is invalid"}} + reduce_while_ok(step.arguments, %{}, fn + argument, arguments when is_from_input(argument) -> + case Map.fetch(reactor.context.private.inputs, argument.source.name) do + {:ok, value} -> + {:ok, Map.put(arguments, argument.name, value)} - :error -> - {:halt, - {:error, - "Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` is missing"}} - end + :error -> + {:error, + "Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` relies on missing input `#{argument.source.name}`"} + end + + argument, arguments when is_from_result(argument) -> + case Map.fetch(reactor.intermediate_results, argument.source.name) do + {:ok, value} -> + {:ok, Map.put(arguments, argument.name, value)} + + :error -> + {:error, + "Step `#{inspect(step.name)}` argument `#{inspect(argument.name)}` is missing"} + end + + argument, arguments when is_from_value(argument) -> + {:ok, Map.put(arguments, argument.name, argument.source.value)} end) end - defp build_context(reactor, step), do: {:ok, deep_merge(step.context, reactor.context)} + defp build_context(reactor, step) do + context = + step.context + |> deep_merge(reactor.context) + |> Map.put(:current_step, step) + + {:ok, context} + end defp maybe_replace_arguments(arguments, context) when is_nil(context.private.replace_arguments), do: {:ok, arguments} diff --git a/lib/reactor/info.ex b/lib/reactor/info.ex index 8ac5025..4f5c4df 100644 --- a/lib/reactor/info.ex +++ b/lib/reactor/info.ex @@ -4,54 +4,51 @@ defmodule Reactor.Info do """ use Spark.InfoGenerator, sections: [:reactor], extension: Reactor.Dsl - alias Reactor.{Builder, Input, Step} + alias Reactor.{Builder, Dsl.Compose, Input, Step} + import Reactor.Utils @doc """ Convert a reactor DSL module into a reactor struct. """ - @spec to_struct(module) :: {:ok, Reactor.t()} | {:error, any} - def to_struct(module) when is_atom(module) do - with :ok <- assert_is_reactor_module(module), - {:ok, reactor} <- entities_to_struct(module) do + @spec to_struct(module | Reactor.t() | Spark.Dsl.t()) :: {:ok, Reactor.t()} | {:error, any} + def to_struct(reactor) when is_struct(reactor, Reactor), do: {:ok, reactor} + + def to_struct(module) do + with {:ok, reactor} <- entities_to_struct(module) do maybe_set_return(module, reactor) end end + @doc """ + Raising version of `to_struct/1`. + """ + @spec to_struct!(module | Reactor.t() | Spark.Dsl.t()) :: Reactor.t() | no_return + def to_struct!(reactor) do + case to_struct(reactor) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason + end + end + defp entities_to_struct(module) do module |> reactor() - |> Enum.reduce_while({:ok, Builder.new()}, fn - input, {:ok, reactor} when is_struct(input, Input) -> - case Builder.add_input(reactor, input.name, input.transform) do - {:ok, reactor} -> {:cont, {:ok, reactor}} - {:error, reason} -> {:halt, {:error, reason}} - end + |> reduce_while_ok(Builder.new(module), fn + input, reactor when is_struct(input, Input) -> + Builder.add_input(reactor, input.name, input.transform) - step, {:ok, reactor} when is_struct(step, Step) -> - case Builder.add_step(reactor, step.name, step.impl, step.arguments, - async?: step.async?, - max_retries: step.max_retries, - transform: step.transform - ) do - {:ok, reactor} -> {:cont, {:ok, reactor}} - {:error, reason} -> {:halt, {:error, reason}} - end + step, reactor when is_struct(step, Step) -> + Builder.add_step(reactor, step.name, step.impl, step.arguments, + async?: step.async?, + max_retries: step.max_retries, + transform: step.transform + ) + + compose, reactor when is_struct(compose, Compose) -> + Builder.compose(reactor, compose.name, compose.reactor, compose.arguments) end) end - defp assert_is_reactor_module(reactor) when is_atom(reactor) do - Code.ensure_loaded!(reactor) - - if reactor.spark_is() == Reactor do - :ok - else - {:error, "Module `#{inspect(reactor)}` is not a valid Reactor module"} - end - rescue - _error in [ArgumentError, UndefinedFunctionError] -> - {:error, "Module `#{inspect(reactor)}` is not a valid Reactor module"} - end - defp maybe_set_return(module, reactor) do case reactor_return(module) do {:ok, value} -> {:ok, %{reactor | return: value}} diff --git a/lib/reactor/planner.ex b/lib/reactor/planner.ex index f31ced9..58c8734 100644 --- a/lib/reactor/planner.ex +++ b/lib/reactor/planner.ex @@ -6,9 +6,10 @@ defmodule Reactor.Planner do between them representing their dependencies (arguments). """ - alias Reactor.Step + alias Reactor.{Error.PlanError, Step} import Reactor, only: :macros import Reactor.Argument, only: :macros + import Reactor.Utils @doc """ Build an execution plan for a Reactor. @@ -17,18 +18,29 @@ defmodule Reactor.Planner do """ @spec plan(Reactor.t()) :: {:ok, Reactor.t()} | {:error, any} def plan(reactor) when not is_reactor(reactor), - do: {:error, ArgumentError.exception("`reactor`: not a Reactor")} + do: {:error, argument_error(:reactor, "not a Reactor", reactor)} def plan(reactor) when is_nil(reactor.plan), do: plan(%{reactor | plan: empty_graph()}) def plan(reactor) do with {:ok, graph} <- reduce_steps_into_graph(reactor.plan, reactor.steps), - :ok <- assert_graph_not_cyclic(graph) do + :ok <- assert_graph_not_cyclic(reactor, graph) do {:ok, %{reactor | steps: [], plan: graph}} end end + @doc """ + Raising version of `plan/1`. + """ + @spec plan!(Reactor.t()) :: Reactor.t() | no_return + def plan!(reactor) do + case plan(reactor) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason + end + end + defp empty_graph, do: Graph.new(type: :directed, vertex_identifier: & &1.ref) defp reduce_steps_into_graph(graph, steps) do @@ -38,60 +50,64 @@ defmodule Reactor.Planner do |> Enum.concat(steps) |> Map.new(&{&1.name, &1}) - Enum.reduce_while(steps, {:ok, graph}, fn - step, {:ok, graph} when is_struct(step, Step) -> + steps + |> reduce_while_ok(graph, fn + step, graph when is_struct(step, Step) -> graph |> Graph.add_vertex(step, step.name) |> reduce_arguments_into_graph(step, steps_by_name) - |> case do - {:ok, graph} -> {:cont, {:ok, graph}} - {:error, reason} -> {:halt, {:error, reason}} - end not_step, _ -> - {:halt, {:error, "Value `#{inspect(not_step)}` is not a `Reactor.Step` struct."}} + {:error, + PlanError.exception( + graph: graph, + step: not_step, + message: "Value is not a `Reactor.Step` struct." + )} end) end defp reduce_arguments_into_graph(graph, current_step, steps_by_name) do - Enum.reduce_while(current_step.arguments, {:ok, graph}, fn - argument, {:ok, graph} when is_argument(argument) -> - dependency_name = - case argument do - argument when is_from_result(argument) -> argument.source.name - argument when is_from_input(argument) -> {:input, argument.source.name} - end + reduce_while_ok(current_step.arguments, graph, fn + argument, graph when is_argument(argument) and is_from_result(argument) -> + dependency_name = argument.source.name case Map.fetch(steps_by_name, dependency_name) do {:ok, dependency} when dependency.name == current_step.name -> - {:cont, {:ok, graph}} + {:ok, graph} {:ok, dependency} -> - {:cont, - {:ok, - Graph.add_edge( - graph, - dependency, - current_step, - label: {:argument, argument.name, :for, current_step.name} - )}} + {:ok, + Graph.add_edge(graph, dependency, current_step, + label: {:argument, argument.name, :for, current_step.name} + )} :error -> - {:halt, - {:error, - "Step `#{inspect(current_step.name)}` depends on the result of a step named `#{inspect(argument.source.name)}` which cannot be found"}} + {:error, + PlanError.exception( + graph: graph, + step: current_step, + message: + "Step `#{inspect(current_step.name)}` depends on the result of a step named `#{inspect(argument.source.name)}` which cannot be found" + )} end - _argument, _graph -> - {:halt, {:error, ArgumentError.exception("`argument` is not an argument.")}} + argument, graph + when is_argument(argument) and (is_from_input(argument) or is_from_value(argument)) -> + {:ok, graph} end) end - defp assert_graph_not_cyclic(graph) do + defp assert_graph_not_cyclic(reactor, graph) do if Graph.is_acyclic?(graph) do :ok else - {:error, "Reactor contains cyclic dependencies."} + {:error, + PlanError.exception( + reactor: reactor, + graph: graph, + message: "Reactor contains cyclic dependencies." + )} end end end diff --git a/lib/reactor/step.ex b/lib/reactor/step.ex index 0ab06a7..648603e 100644 --- a/lib/reactor/step.ex +++ b/lib/reactor/step.ex @@ -27,6 +27,8 @@ defmodule Reactor.Step do transform: nil | (any -> any) | {module, keyword} | mfa } + @type step :: module + @typedoc """ Optional capabilities which may be implemented by the step module. @@ -94,9 +96,6 @@ defmodule Reactor.Step do This provides you the opportunity to handle the error in a number of ways and direct the reactor as to what to do next. - This callback is only called if `c:can?/1` returns `true` for the - `:compensate` capability. - ## Arguments - `reason` - the error reason returned from `c:run/3`. - `arguments` - the arguments passed to the step. @@ -127,9 +126,6 @@ defmodule Reactor.Step do This callback is called when the reactor encounters an unhandled error later in it's execution run and must undo the work previously done. - This callback is only called if `c:can?/1` returns `true` for the `:undo` - capability. - ## Arguments - `value` - the return value of the previously successful call to `c:run/3`. @@ -159,13 +155,11 @@ defmodule Reactor.Step do Find out of a step has a capability. """ @spec can?(module | Step.t(), capability()) :: boolean - def can?(%Step{impl: {module, _opts}}, capability) - when is_atom(module) and capability in ~w[undo compensate]a, - do: function_exported?(module, capability, 4) + def can?(%Step{impl: {module, _opts}}, capability) when is_atom(module), + do: can?(module, capability) - def can?(%Step{impl: module}, capability) - when is_atom(module) and capability in ~w[undo compensate]a, - do: function_exported?(module, capability, 4) + def can?(%Step{impl: module}, capability) when is_atom(module), + do: can?(module, capability) def can?(module, capability) when is_atom(module) and capability in ~w[undo compensate]a, do: function_exported?(module, capability, 4) diff --git a/lib/reactor/step/anon_fn.ex b/lib/reactor/step/anon_fn.ex index 8d679a1..7eed51c 100644 --- a/lib/reactor/step/anon_fn.ex +++ b/lib/reactor/step/anon_fn.ex @@ -13,6 +13,9 @@ defmodule Reactor.Step.AnonFn do @spec run(Reactor.inputs(), Reactor.context(), keyword) :: {:ok | :error, any} def run(arguments, context, options) do case Keyword.pop(options, :fun) do + {fun, _opts} when is_function(fun, 1) -> + fun.(arguments) + {fun, _opts} when is_function(fun, 2) -> fun.(arguments, context) diff --git a/lib/reactor/step/compose.ex b/lib/reactor/step/compose.ex new file mode 100644 index 0000000..851f069 --- /dev/null +++ b/lib/reactor/step/compose.ex @@ -0,0 +1,165 @@ +defmodule Reactor.Step.Compose do + @moduledoc """ + A built-in step which can embed one reactor inside another. + + This is different to the `Builder.compose` and DSL `compose` methods. Those + methods build a new reactor by combining the steps of the two input reactors, + whereas this step expands the provided reactor at runtime and dynamically + inserts it's steps into the running reactor. + + If emitting the reactor's steps into the current reactor would be recursive, + then the reactor is directly executed within the step using `Reactor.run/4`. + """ + + use Reactor.Step + alias Reactor.{Argument, Builder, Error.ComposeError, Info, Step} + import Reactor, only: :macros + import Reactor.Argument, only: :macros + import Reactor.Utils + + @doc false + @impl true + def run(arguments, context, options) do + reactor = Keyword.fetch!(options, :reactor) + reactor_id = get_reactor_id(reactor) + + context + |> get_composed_reactors() + |> MapSet.member?(reactor_id) + |> if do + handle_recursive_reactor(reactor, arguments, context) + else + handle_non_recursive_reactor(reactor, arguments, context) + end + end + + defp handle_recursive_reactor(reactor, arguments, context), + do: Reactor.run(reactor, arguments, context, []) + + defp handle_non_recursive_reactor(reactor, arguments, context) when is_atom(reactor) do + with {:ok, reactor} <- Info.to_struct(reactor) do + handle_non_recursive_reactor(reactor, arguments, context) + end + end + + defp handle_non_recursive_reactor(reactor, arguments, context) do + current_step = Map.fetch!(context, :current_step) + + with :ok <- validate_arguments_match_inputs(arguments, reactor), + :ok <- validate_reactor_has_return(reactor), + {:ok, inner_steps} <- rewrite_steps(reactor, current_step.name, arguments), + {:ok, recursion_step} <- create_recursion_step(reactor, current_step.name) do + steps = + inner_steps + |> Enum.concat([recursion_step]) + + {:ok, nil, steps} + end + end + + defp get_reactor_id(reactor) when is_atom(reactor), do: reactor + defp get_reactor_id(reactor) when is_reactor(reactor), do: reactor.id + + defp get_composed_reactors(context) when not is_nil(context.private.composed_reactors), + do: context.private.composed_reactors + + defp get_composed_reactors(_context), do: MapSet.new() + + defp validate_reactor_has_return(reactor) when is_nil(reactor.return), + do: + {:error, + ComposeError.exception( + inner_reactor: reactor, + message: "The inner Reactor must have an explicit return value." + )} + + defp validate_reactor_has_return(reactor) do + if Enum.any?(reactor.steps, &(&1.name == reactor.return)) do + :ok + else + {:error, + ComposeError.exception( + inner_reactor: reactor, + message: + "The inner Reactor return value does not correspond with an existing Reactor step." + )} + end + end + + defp create_recursion_step(reactor, name) do + Builder.new_step( + name, + {Step.AnonFn, fun: fn args, _, _ -> {:ok, args.value} end}, + [value: {:result, {__MODULE__, name, reactor.return}}], + max_retries: 0 + ) + end + + defp validate_arguments_match_inputs(arguments, reactor) do + argument_names = arguments |> Map.keys() |> MapSet.new() + input_names = MapSet.new(reactor.inputs) + + input_names + |> MapSet.difference(argument_names) + |> Enum.to_list() + |> case do + [] -> + :ok + + [input] -> + {:error, + ComposeError.exception( + inner_reactor: reactor, + arguments: arguments, + message: "Missing argument for input `#{input}`" + )} + + inputs -> + inputs = sentence(inputs, &"`#{&1}`", ", ", " and ") + + {:error, + ComposeError.exception( + inner_reactor: reactor, + arguments: arguments, + message: "Missing arguments for inputs #{inputs}" + )} + end + end + + defp rewrite_steps(reactor, name, inputs) when not is_nil(reactor.plan) do + steps = + reactor.plan + |> Graph.vertices() + |> Enum.concat(reactor.steps) + + rewrite_steps(%{reactor | steps: steps, plan: nil}, name, inputs) + end + + defp rewrite_steps(reactor, name, inputs) do + reactor.steps + |> map_while_ok(&rewrite_step(&1, name, inputs)) + end + + defp rewrite_step(step, name, inputs) do + with {:ok, arguments} <- map_while_ok(step.arguments, &rewrite_argument(&1, name, inputs)) do + {:ok, + %{ + step + | arguments: arguments, + name: {__MODULE__, name, step.name}, + impl: {Step.ComposeWrapper, original: step.impl, prefix: [__MODULE__, name]} + }} + end + end + + defp rewrite_argument(argument, _name, inputs) when is_from_input(argument) do + value = Map.fetch!(inputs, argument.source.name) + {:ok, Argument.from_value(argument.name, value)} + end + + defp rewrite_argument(argument, name, _inputs) when is_from_result(argument), + do: {:ok, Argument.from_result(argument.name, {__MODULE__, name, argument.source.name})} + + defp rewrite_argument(argument, _name, _inputs) when is_from_value(argument), + do: {:ok, argument} +end diff --git a/lib/reactor/step/compose_wrapper.ex b/lib/reactor/step/compose_wrapper.ex new file mode 100644 index 0000000..77c05c4 --- /dev/null +++ b/lib/reactor/step/compose_wrapper.ex @@ -0,0 +1,143 @@ +defmodule Reactor.Step.ComposeWrapper do + @moduledoc """ + When doing run-time composition of Reactors we need to dynamically rewrite any + dynamically emitted steps to have the correct namespace. + + Yes, this gets hairy, fast. + + This is dynamically injected into steps by `Reactor.Step.Compose` - you + probably don't want to use this unless you're sure what you're doing. + + ## Options + + * `original` - the original value of the Step's `impl` key. + * `prefix` - a list of values to be placed in the `name` before the original value. + """ + + use Reactor.Step + alias Reactor.Argument + import Reactor.Argument, only: :macros + import Reactor.Utils + + @doc false + @impl true + def run(arguments, context, options) do + with {:ok, impl} <- validate_original_option(options, context.current_step), + {:ok, prefix} <- validate_prefix_option(options, context.current_step) do + case do_run(impl, arguments, context) do + {:ok, value} -> {:ok, value} + {:ok, value, steps} -> {:ok, value, rewrite_steps(steps, prefix)} + {stop, reason} when stop in ~w[halt error]a -> {stop, reason} + end + end + end + + @doc false + @impl true + def compensate(reason, arguments, context, options) do + case get_original_option(options, context.current_step) do + {:ok, {impl, opts}} -> impl.compensate(reason, arguments, context, opts) + {:ok, impl} -> impl.compensate(reason, arguments, context, []) + {:error, _} -> :ok + end + rescue + UndefinedFunctionError -> :ok + end + + @doc false + @impl true + def undo(value, arguments, context, options) do + case get_original_option(options, context.current_step) do + {:ok, {impl, opts}} -> impl.undo(value, arguments, context, opts) + {:ok, impl} -> impl.undo(value, arguments, context, []) + {:error, reason} -> {:error, reason} + end + rescue + UndefinedFunctionError -> :ok + end + + defp validate_original_option(options, current_step) do + with {:ok, original} <- get_original_option(options, current_step), + {module, opts} <- get_module_and_options(original) do + if Spark.implements_behaviour?(module, Reactor.Step) do + {:ok, {module, opts}} + else + {:error, + argument_error( + :options, + "Step `#{current_step.name}` module `#{inspect(module)}` does not implement the `Reactor.Step` behaviour.", + opts + )} + end + end + end + + defp get_original_option(options, current_step) do + with :error <- Keyword.fetch(options, :original) do + {:error, + argument_error( + :options, + "Step `#{current_step.name}` is missing the `original` option.", + options + )} + end + end + + defp validate_prefix_option(options, current_step) do + case Keyword.fetch(options, :prefix) do + {:ok, [_ | _] = prefix} -> + {:ok, prefix} + + :error -> + {:error, + argument_error( + :options, + "Step `#{current_step.name}` has missing `prefix` option.", + options + )} + + _ -> + {:error, + argument_error( + :options, + "Step `#{current_step.name}` has invalid `prefix` option.", + options + )} + end + end + + defp get_module_and_options(impl) when is_atom(impl), do: {impl, []} + + defp get_module_and_options({impl, options}) when is_atom(impl) and is_list(options), + do: {impl, options} + + defp do_run({module, options}, arguments, context) when is_atom(module) and is_list(options), + do: module.run(arguments, context, options) + + defp rewrite_steps(steps, prefix) do + steps + |> Enum.map(fn step -> + name = + prefix + |> Enum.concat([step.name]) + |> List.to_tuple() + + arguments = Enum.map(step.arguments, &rewrite_argument(&1, prefix)) + + %{step | name: name, arguments: arguments} + end) + end + + defp rewrite_argument(argument, prefix) when is_from_result(argument) do + source = + prefix + |> Enum.concat([argument.source.name]) + |> List.to_tuple() + + Argument.from_result(argument.name, source) + end + + defp rewrite_argument(argument, _prefix) + when is_from_input(argument) or is_from_value(argument), + do: argument +end diff --git a/lib/reactor/step/input.ex b/lib/reactor/step/input.ex deleted file mode 100644 index 47bf3e4..0000000 --- a/lib/reactor/step/input.ex +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Reactor.Step.Input do - @moduledoc """ - A built-in step which emits a reactor input. - """ - - use Reactor.Step - - @doc false - @impl true - @spec run(Reactor.inputs(), Reactor.context(), keyword) :: {:ok | :error, any} - def run(_arguments, context, options) do - case Keyword.fetch(options, :name) do - {:ok, name} -> - with {:ok, private} <- Map.fetch(context, :private), - {:ok, inputs} <- Map.fetch(private, :inputs), - {:ok, value} <- Map.fetch(inputs, name) do - {:ok, value} - else - :error -> - {:error, - ArgumentError.exception("Reactor is missing an input named `#{inspect(name)}`")} - end - - :error -> - {:error, ArgumentError.exception("Missing `:name` option in `Input` step")} - end - end -end diff --git a/lib/reactor/step/transform.ex b/lib/reactor/step/transform.ex index b01fb83..f428e5b 100644 --- a/lib/reactor/step/transform.ex +++ b/lib/reactor/step/transform.ex @@ -29,9 +29,6 @@ defmodule Reactor.Step.Transform do {{m, f, a}, _opts} when is_atom(m) and is_atom(f) and is_list(a) -> {:ok, apply(m, f, [value | a])} - - {nil, opts} -> - raise "Invalid options given to `run/3` callback: `#{inspect(opts)}`." end rescue error -> {:error, error} diff --git a/lib/reactor/step/transform_all.ex b/lib/reactor/step/transform_all.ex index 8e09867..e2e1e96 100644 --- a/lib/reactor/step/transform_all.ex +++ b/lib/reactor/step/transform_all.ex @@ -7,7 +7,7 @@ defmodule Reactor.Step.TransformAll do """ use Reactor.Step - alias Reactor.Step.Transform + alias Reactor.{Error.TransformError, Step.Transform} @doc false @impl true @@ -17,8 +17,13 @@ defmodule Reactor.Step.TransformAll do {:ok, result} when is_map(result) -> {:ok, result} - {:ok, _other} -> - {:error, "Step transformers must return a map to use as replacement arguments."} + {:ok, result} -> + {:error, + TransformError.exception( + input: arguments, + output: result, + message: "Step transformers must return a map to use as replacement arguments." + )} {:error, reason} -> {:error, reason} diff --git a/lib/reactor/template.ex b/lib/reactor/template.ex deleted file mode 100644 index 5738c91..0000000 --- a/lib/reactor/template.ex +++ /dev/null @@ -1,9 +0,0 @@ -defmodule Reactor.Template do - @moduledoc false - alias Reactor.Template - - @typedoc """ - An input or result template. - """ - @type t :: Template.Input.t() | Template.Result.t() -end diff --git a/lib/reactor/template/value.ex b/lib/reactor/template/value.ex new file mode 100644 index 0000000..cdbba44 --- /dev/null +++ b/lib/reactor/template/value.ex @@ -0,0 +1,9 @@ +defmodule Reactor.Template.Value do + @moduledoc """ + A statically `value` template. + """ + + defstruct value: nil + + @type t :: %__MODULE__{value: any} +end diff --git a/lib/reactor/utils.ex b/lib/reactor/utils.ex index ee2ac5d..c3a7a59 100644 --- a/lib/reactor/utils.ex +++ b/lib/reactor/utils.ex @@ -21,4 +21,113 @@ defmodule Reactor.Utils do @spec maybe_append(Enumerable.t(), any) :: Enumerable.t() def maybe_append(collection, nil), do: collection def maybe_append(collection, value), do: Enum.concat(collection, [value]) + + @doc """ + Append a non-nil result of the callback function to the enumerable. + """ + @spec maybe_append_result(Enumerable.t(), (() -> any)) :: Enumerable.t() + def maybe_append_result(collection, callback) do + case callback.() do + nil -> collection + value -> Enum.concat(collection, [value]) + end + end + + @doc """ + A joiner that replaces the last join value with a different one. + """ + @spec sentence(Enumerable.t(), (any -> binary), binary, binary) :: binary + def sentence(enumerable, mapper \\ &"`#{&1}`", joiner \\ ", ", last_joiner \\ " or ") + + def sentence(enumerable, mapper, joiner, last_joiner), + do: + enumerable + |> Enum.to_list() + |> do_sentence([], mapper, joiner, last_joiner) + + defp do_sentence([], [], _, _, _), do: [] + defp do_sentence([value], [], mapper, _, _), do: [mapper.(value)] + + defp do_sentence([value], zipped, mapper, _joiner, last_joiner) do + [mapper.(value), last_joiner | zipped] + |> Enum.reverse() + |> Enum.join() + end + + defp do_sentence([head | tail], [], mapper, joiner, last_joiner), + do: do_sentence(tail, [mapper.(head)], mapper, joiner, last_joiner) + + defp do_sentence([head | tail], zipped, mapper, joiner, last_joiner), + do: do_sentence(tail, [mapper.(head), joiner | zipped], mapper, joiner, last_joiner) + + @doc """ + Perform a map over an enumerable provided that the mapper function continues + to return ok tuples. + """ + @spec map_while_ok(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) :: + {:ok, Enumerable.t(output)} | {:error, any} + when input: any, output: any + def map_while_ok(inputs, mapper, preserve_order? \\ false) + + def map_while_ok(inputs, mapper, false) when is_function(mapper, 1) do + reduce_while_ok(inputs, [], fn input, acc -> + case mapper.(input) do + {:ok, value} -> {:ok, [value | acc]} + {:error, reason} -> {:error, reason} + end + end) + end + + def map_while_ok(inputs, mapper, true) do + case map_while_ok(inputs, mapper, false) do + {:ok, outputs} -> {:ok, Enum.reverse(outputs)} + {:error, reason} -> {:error, reason} + end + end + + @doc """ + Perform a reduction over an enumerable provided that the reduction function + returns an ok tuple. + """ + @spec reduce_while_ok(Enumerable.t(input), acc, (input, acc -> {:ok, acc} | {:error, any})) :: + {:ok, acc} | {:error, any} + when input: any, acc: any + def reduce_while_ok(inputs, default \\ [], reducer) when is_function(reducer, 2) do + Enum.reduce_while(inputs, {:ok, default}, fn input, {:ok, acc} -> + case reducer.(input, acc) do + {:ok, acc} -> {:cont, {:ok, acc}} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end + + @type argument_error :: %ArgumentError{ + __exception__: true, + message: binary + } + + @doc """ + A wrapper for defining an ArgumentError with a consistent error message format. + """ + @spec argument_error(String.Chars.t(), String.Chars.t(), any) :: argument_error + def argument_error(argument_name, reason, value) do + message = + case value do + nil -> + "`#{argument_name}` #{reason}" + + value -> + """ + `#{argument_name}` #{reason} + + ## Value of `#{argument_name}` + + ``` + #{inspect(value)} + ``` + """ + end + + ArgumentError.exception(message: message) + end end diff --git a/test/reactor/argument/templates_test.exs b/test/reactor/argument/templates_test.exs index 51e2463..abfbd67 100644 --- a/test/reactor/argument/templates_test.exs +++ b/test/reactor/argument/templates_test.exs @@ -15,4 +15,10 @@ defmodule Reactor.Argument.TemplatesTest do assert %Template.Result{name: :marty} = result(:marty) end end + + describe "value/1" do + test "it creates a value template" do + assert %Template.Value{value: :marty} = value(:marty) + end + end end diff --git a/test/reactor/argument_test.exs b/test/reactor/argument_test.exs index 7195865..9bc9b58 100644 --- a/test/reactor/argument_test.exs +++ b/test/reactor/argument_test.exs @@ -63,4 +63,34 @@ defmodule Reactor.ArgumentTest do } = Argument.from_result(:argument_name, :step_name, transform) end end + + describe "from_value/2" do + test "when given no transformation it creates an argument" do + assert %Argument{ + name: :argument_name, + source: %Template.Value{value: 32}, + transform: nil + } = Argument.from_value(:argument_name, 32) + end + + test "when given a function transformation it creates an argument" do + transform = &Atom.to_string/1 + + assert %Argument{ + name: :argument_name, + source: %Template.Value{value: 32}, + transform: ^transform + } = Argument.from_value(:argument_name, 32, transform) + end + + test "when given an MFA transformation it creates an argument" do + transform = {Atom, :to_string, []} + + assert %Argument{ + name: :argument_name, + source: %Template.Value{value: 32}, + transform: ^transform + } = Argument.from_value(:argument_name, 32, transform) + end + end end diff --git a/test/reactor/builder/argument_test.exs b/test/reactor/builder/argument_test.exs new file mode 100644 index 0000000..94440e0 --- /dev/null +++ b/test/reactor/builder/argument_test.exs @@ -0,0 +1,36 @@ +defmodule Reactor.Builder.ArgumentTest do + @moduledoc false + use ExUnit.Case, async: true + + import Reactor.Builder.Argument + alias Reactor.{Argument, Template} + + describe "asset_all_are_arguments/1" do + test "when given an argument with an input tuple, it returns an input argument" do + assert {:ok, [%Argument{name: :marty, source: %Template.Input{name: :doc}}]} = + assert_all_are_arguments(marty: {:input, :doc}) + end + + test "when given an argument with an result tuple, it returns an result argument" do + assert {:ok, [%Argument{name: :marty, source: %Template.Result{name: :doc}}]} = + assert_all_are_arguments(marty: {:result, :doc}) + end + + test "when given an argument with a value, it returns a value argument" do + assert {:ok, [%Argument{name: :marty, source: %Template.Value{value: :doc}}]} = + assert_all_are_arguments(marty: :doc) + end + + test "when given an argument struct, it returns the argument struct" do + argument = Argument.from_value(:marty, :doc) + + assert {:ok, [^argument]} = assert_all_are_arguments([argument]) + end + + test "when given any other value, it returns an error" do + assert {:error, error} = assert_all_are_arguments([:marty]) + + assert Exception.message(error) =~ ~r/contains a non-argument value/ + end + end +end diff --git a/test/reactor/builder/compose_test.exs b/test/reactor/builder/compose_test.exs new file mode 100644 index 0000000..21952d8 --- /dev/null +++ b/test/reactor/builder/compose_test.exs @@ -0,0 +1,222 @@ +defmodule Reactor.Builder.ComposeTest do + use ExUnit.Case, async: true + alias Reactor.{Argument, Builder, Builder.Compose, Error.ComposeError, Planner, Step, Template} + require Reactor.Argument + + describe "compose/4" do + defmodule ShoutStep do + @moduledoc false + use Reactor.Step + + def run(%{message: message}, _, _) do + {:ok, String.upcase(message)} + end + end + + defmodule InnerReactor do + @moduledoc false + use Reactor + + input :message + + step :shout, ShoutStep do + argument :message, input(:message) + end + end + + test "when the inner reactor is a module and would be recursive, it adds a compose step" do + assert {:ok, reactor} = + InnerReactor + |> Builder.new() + |> Compose.compose(:recurse, InnerReactor, message: {:input, :message}) + + assert recurse_step = + reactor.steps + |> Enum.find(&(&1.name == :recurse)) + + assert {Step.Compose, [reactor: InnerReactor]} = recurse_step.impl + + assert [%Argument{name: :message, source: %Template.Input{name: :message}}] = + recurse_step.arguments + end + + test "when the inner reactor is a struct and would be recursive, it adds a compose step" do + inner_reactor = + Builder.new() + |> Builder.add_input!(:message) + |> Builder.add_step!(:shout, ShoutStep, message: {:input, :message}) + |> Builder.return!(:shout) + + assert {:ok, outer_reactor} = + inner_reactor + |> Builder.compose(:recurse, inner_reactor, message: {:input, :message}) + + assert recurse_step = + outer_reactor.steps + |> Enum.find(&(&1.name == :recurse)) + + assert {Step.Compose, [reactor: ^inner_reactor]} = recurse_step.impl + + assert [%Argument{name: :message, source: %Template.Input{name: :message}}] = + recurse_step.arguments + end + + test "when the inner reactor is already planned, steps are taken from the plan" do + inner_reactor = + Builder.new() + |> Builder.add_input!(:message) + |> Builder.add_step!(:shout, ShoutStep, message: {:input, :message}) + |> Builder.return!(:shout) + |> Planner.plan!() + + assert [] = inner_reactor.steps + assert is_struct(inner_reactor.plan, Graph) + + assert {:ok, outer_reactor} = + Builder.new() + |> Builder.add_input!(:name) + |> Compose.compose(:shout_at, inner_reactor, message: {:input, :name}) + + assert {:__reactor__, :compose, :shout_at, :shout} in Enum.map( + outer_reactor.steps, + & &1.name + ) + end + + test "when the inner reactor is not already planned, steps are taken from the reactor" do + inner_reactor = + Builder.new() + |> Builder.add_input!(:message) + |> Builder.add_step!(:shout, ShoutStep, message: {:input, :message}) + |> Builder.return!(:shout) + + assert {:ok, outer_reactor} = + Builder.new() + |> Builder.add_input!(:name) + |> Compose.compose(:shout_at, inner_reactor, message: {:input, :name}) + + assert {:__reactor__, :compose, :shout_at, :shout} in Enum.map( + outer_reactor.steps, + & &1.name + ) + end + + test "when the inner reactor does not have a return value, it returns an error" do + inner_reactor = + Builder.new() + |> Builder.add_input!(:message) + |> Builder.add_step!(:shout, ShoutStep, message: {:input, :message}) + + assert {:error, %ComposeError{} = error} = + Builder.new() + |> Builder.add_input!(:name) + |> Compose.compose(:shout_at, inner_reactor, message: {:input, :name}) + + assert Exception.message(error) =~ ~r/must have an explicit return value/i + end + + test "when provided an invalid argument, it returns an error" do + inner_reactor = + Builder.new() + |> Builder.add_input!(:message) + |> Builder.add_step!(:shout, ShoutStep, message: {:input, :message}) + + assert {:error, %ArgumentError{} = error} = + Builder.new() + |> Builder.add_input!(:name) + |> Compose.compose(:shout_at, inner_reactor, [:marty]) + + assert Exception.message(error) =~ ~r/contains a non-argument value/i + end + + test "when not all inner reactor inputs are covered by the provided arguments, it returns an error" do + inner_reactor = + Builder.new() + |> Builder.add_input!(:message) + |> Builder.add_step!(:shout, ShoutStep, message: {:input, :message}) + + assert {:error, %ComposeError{} = error} = + Builder.new() + |> Builder.add_input!(:name) + |> Compose.compose(:shout_at, inner_reactor, []) + + assert Exception.message(error) =~ ~r/missing argument for `message` input/i + end + + test "inner steps are rewritten in the generated reactor" do + {inner_reactor, outer_reactor} = multi_step_composed_reactor() + + steps_by_name = outer_reactor.steps |> Map.new(&{&1.name, &1}) + + for step <- inner_reactor.steps do + assert outer_step = Map.get(steps_by_name, {:__reactor__, :compose, :shout, step.name}) + outer_arguments_by_name = outer_step.arguments |> Map.new(&{&1.name, &1}) + + for argument <- step.arguments do + assert outer_argument = Map.get(outer_arguments_by_name, argument.name) + + if Argument.is_from_result(argument) do + assert outer_argument.source.name == + {:__reactor__, :compose, :shout, argument.source.name} + end + end + end + end + + test "a return step is generated" do + {inner_reactor, outer_reactor} = multi_step_composed_reactor() + + steps_by_name = outer_reactor.steps |> Map.new(&{&1.name, &1}) + + assert return_step = Map.get(steps_by_name, :shout) + assert [return_value] = return_step.arguments + assert Argument.is_from_result(return_value) + assert return_value.name == :value + assert return_value.source.name == {:__reactor__, :compose, :shout, inner_reactor.return} + end + + test "the ID of the inner reactor is stored in the outer reactor context" do + {inner_reactor, outer_reactor} = multi_step_composed_reactor() + + assert inner_reactor.id in Enum.to_list(outer_reactor.context.private.composed_reactors) + end + end + + defmodule GreeterStep do + @moduledoc false + use Reactor.Step + + def run(%{first_name: first_name, last_name: last_name}, _, _) do + {:ok, "Hello #{first_name} #{last_name}"} + end + end + + defp multi_step_composed_reactor do + shouty_reactor = + Builder.new() + |> Builder.add_input!(:first_name, &String.upcase/1) + |> Builder.add_input!(:last_name, &String.upcase/1) + |> Builder.add_step!(:greet, GreeterStep, + first_name: {:input, :first_name}, + last_name: {:input, :last_name} + ) + |> Builder.return!(:greet) + + composed_reactor = + Builder.new() + |> Builder.add_input!(:user) + |> Builder.add_step!(:first_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :first_name)}, + user: {:input, :user} + ) + |> Builder.add_step!(:last_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :last_name)}, + user: {:input, :user} + ) + |> Builder.compose!(:shout, shouty_reactor, + first_name: {:result, :first_name}, + last_name: {:result, :last_name} + ) + |> Builder.return!(:shout) + + {shouty_reactor, composed_reactor} + end +end diff --git a/test/reactor/builder/input_test.exs b/test/reactor/builder/input_test.exs new file mode 100644 index 0000000..5d739ad --- /dev/null +++ b/test/reactor/builder/input_test.exs @@ -0,0 +1,48 @@ +defmodule Reactor.Builder.InputTest do + @moduledoc false + use ExUnit.Case, async: true + alias Reactor.{Builder, Builder.Input, Step} + + describe "add_input/3" do + test "when the input has no transform, it is added to the reactor" do + assert {:ok, reactor} = + Builder.new() + |> Input.add_input(:marty, nil) + + assert :marty in reactor.inputs + assert [] = reactor.steps + end + + test "when the input has a transform function the input and a transform step are added to the reactor" do + assert {:ok, reactor} = + Builder.new() + |> Input.add_input(:marty, &Function.identity/1) + + assert :marty in reactor.inputs + assert [step] = reactor.steps + + assert step.name == {:__reactor__, :transform, :input, :marty} + assert step.impl == {Step.Transform, fun: &Function.identity/1} + end + + test "when the input has a transform impl the input and a transform step are added to the reactor" do + assert {:ok, reactor} = + Builder.new() + |> Input.add_input(:marty, {Step.Transform, fun: &Function.identity/1}) + + assert :marty in reactor.inputs + assert [step] = reactor.steps + + assert step.name == {:__reactor__, :transform, :input, :marty} + assert step.impl == {Step.Transform, fun: &Function.identity/1} + end + + test "when the transform is not valid, it returns an error" do + assert {:error, %ArgumentError{} = error} = + Builder.new() + |> Input.add_input(:marty, :doc) + + assert Exception.message(error) =~ ~r/invalid transform function/i + end + end +end diff --git a/test/reactor/builder/step_test.exs b/test/reactor/builder/step_test.exs new file mode 100644 index 0000000..6adefa2 --- /dev/null +++ b/test/reactor/builder/step_test.exs @@ -0,0 +1,234 @@ +defmodule Reactor.Builder.StepTest do + @moduledoc false + use ExUnit.Case, async: true + require Reactor.Argument + alias Reactor.{Argument, Builder, Step} + + defmodule GreeterStep do + @moduledoc false + use Reactor.Step + + def run(%{first_name: first_name, last_name: last_name}, _, _) do + {:ok, "Hello #{first_name} #{last_name}"} + end + end + + describe "add_step/5" do + test "when given an invalid argument it returns an error" do + reactor = Builder.new() + + assert {:error, %ArgumentError{} = error} = + Builder.Step.add_step(reactor, :greet, GreeterStep, [:marty], []) + + assert Exception.message(error) =~ ~r/non-argument/i + end + + test "when the impl is not a `Reactor.Step` it returns an error" do + reactor = Builder.new() + + assert {:error, %ArgumentError{} = error} = + Builder.Step.add_step(reactor, :greet, Kernel, [], []) + + assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i + end + + test "when the step depends on a transformed input, its arguments are rewritten" do + reactor = + Builder.new() + |> Builder.add_input!(:first_name, &String.upcase/1) + + assert {:ok, reactor} = + Builder.Step.add_step( + reactor, + :greet, + GreeterStep, + [first_name: {:input, :first_name}], + [] + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + + assert %Step{arguments: [argument]} = steps_by_name[:greet] + assert argument.name == :first_name + assert Argument.is_from_result(argument) + assert argument.source.name == {:__reactor__, :transform, :input, :first_name} + end + + test "when the step has argument transforms, it adds transformation steps" do + transform = &Function.identity/1 + + reactor = + Builder.new() + |> Builder.add_input!(:first_name) + + assert {:ok, reactor} = + Builder.Step.add_step( + reactor, + :greet, + GreeterStep, + [Argument.from_input(:first_name, :first_name, transform)], + [] + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + + assert %Step{arguments: [argument]} = steps_by_name[:greet] + assert argument.name == :first_name + assert Argument.is_from_result(argument) + assert argument.source.name == {:__reactor__, :transform, :first_name, :greet} + + assert %Step{arguments: [argument], impl: {Step.Transform, fun: ^transform}} = + Map.get(steps_by_name, {:__reactor__, :transform, :first_name, :greet}) + + assert Argument.is_from_input(argument) + assert argument.source.name == :first_name + end + + test "when the step has a transform, it adds a transform all step" do + transform = &Function.identity/1 + + reactor = + Builder.new() + |> Builder.add_input!(:first_name) + + assert {:ok, reactor} = + Builder.Step.add_step( + reactor, + :greet, + GreeterStep, + [first_name: {:input, :first_name}], + transform: transform + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + + assert %Step{arguments: [argument], context: context} = steps_by_name[:greet] + assert :value = context.private.replace_arguments + assert argument.name == :value + assert Argument.is_from_result(argument) + assert argument.source.name == {:__reactor__, :transform, :greet} + + assert %Step{ + arguments: [argument], + impl: {Step.TransformAll, fun: ^transform} + } = Map.get(steps_by_name, {:__reactor__, :transform, :greet}) + + assert argument.name == :first_name + assert Argument.is_from_input(argument) + assert argument.source.name == :first_name + end + + test "it defaults to an async step" do + assert {:ok, reactor} = + Builder.new() + |> Builder.Step.add_step( + :greet, + GreeterStep, + [], + [] + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + assert %Step{async?: true} = steps_by_name[:greet] + end + + test "synchronous steps can be asked for" do + assert {:ok, reactor} = + Builder.new() + |> Builder.Step.add_step( + :greet, + GreeterStep, + [], + async?: false + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + assert %Step{async?: false} = steps_by_name[:greet] + end + + test "additional context can be provided" do + assert {:ok, reactor} = + Builder.new() + |> Builder.Step.add_step( + :greet, + GreeterStep, + [], + context: %{awesome?: true} + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + assert %Step{context: %{awesome?: true}} = steps_by_name[:greet] + end + + test "max retries defaults to 100" do + assert {:ok, reactor} = + Builder.new() + |> Builder.Step.add_step( + :greet, + GreeterStep, + [], + [] + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + assert %Step{max_retries: 100} = steps_by_name[:greet] + end + + test "max retries can be provided" do + assert {:ok, reactor} = + Builder.new() + |> Builder.Step.add_step( + :greet, + GreeterStep, + [], + max_retries: 99 + ) + + steps_by_name = Map.new(reactor.steps, &{&1.name, &1}) + assert %Step{max_retries: 99} = steps_by_name[:greet] + end + end + + describe "new_step/4" do + test "it builds a step" do + assert {:ok, %Step{}} = Builder.Step.new_step(:marty, GreeterStep, [], []) + end + + test "when given an invalid argument it returns an error" do + assert {:error, error} = Builder.Step.new_step(:marty, GreeterStep, [:doc], []) + assert Exception.message(error) =~ "non-argument" + end + + test "when the impl is not a `Reactor.Step` it returns an error" do + assert {:error, %ArgumentError{} = error} = Builder.Step.new_step(:greet, Kernel, [], []) + + assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i + end + + test "when the step relies on transformed arguments, it returns an error" do + assert {:error, %ArgumentError{} = error} = + Builder.Step.new_step( + :greet, + GreeterStep, + [ + Argument.from_input(:first_name, :name, &String.upcase/1) + ], + [] + ) + + assert Exception.message(error) =~ ~r/has a transform attached/i + end + + test "when the step wants a transform option, it returns an error" do + assert {:error, %ArgumentError{} = error} = + Builder.Step.new_step( + :greet, + GreeterStep, + [first_name: {:input, :first_name}], + transform: &Function.identity/1 + ) + + assert Exception.message(error) =~ ~r/adding transforms to dynamic steps is not supported/i + end + end +end diff --git a/test/reactor/builder_test.exs b/test/reactor/builder_test.exs index e3e7aa0..9b7fbfd 100644 --- a/test/reactor/builder_test.exs +++ b/test/reactor/builder_test.exs @@ -16,26 +16,21 @@ defmodule Reactor.BuilderTest do assert Exception.message(error) =~ "not a Reactor" end - test "when the input doesn't have a transformer, it adds the input step directly" do + test "when the input doesn't have a transformer, it adds the input" do {:ok, reactor} = add_input(new(), :marty) assert :marty in reactor.inputs - [step] = reactor.steps - assert step.name == {:input, :marty} - assert step.impl == {Step.Input, name: :marty} end - test "when the input has a transformer it adds a transform step and an input step" do + test "when the input has a transformer it adds a transform step and the input" do {:ok, reactor} = add_input(new(), :marty, &String.upcase/1) assert :marty in reactor.inputs - [input_step, transform_step] = reactor.steps - assert input_step.name == {:raw_input, :marty} - assert input_step.impl == {Step.Input, name: :marty} + [transform_step] = reactor.steps - assert transform_step.name == {:input, :marty} + assert transform_step.name == {:__reactor__, :transform, :input, :marty} assert transform_step.impl == {Step.Transform, fun: &String.upcase/1} assert [argument] = transform_step.arguments assert argument.name == :value - assert argument.source == %Template.Result{name: {:raw_input, :marty}} + assert argument.source == %Template.Input{name: :marty} end end @@ -61,27 +56,29 @@ defmodule Reactor.BuilderTest do test "when the arguments option is not a list, it returns an error" do reactor = new() assert {:error, error} = add_step(reactor, :marty, Noop, :wat) - assert Exception.message(error) =~ "is not a list" + assert Exception.message(error) =~ "not a list" end test "when the options option is not a list, it returns an error" do reactor = new() assert {:error, error} = add_step(reactor, :marty, Noop, [], :wat) - assert Exception.message(error) =~ "is not a list" + assert Exception.message(error) =~ "not a list" end - test "when an argument is an input tuple, it is converted to a argument struct in the step" do + test "when an argument is an input tuple, it is converted into an argument struct" do reactor = new() assert {:ok, %{steps: [step]}} = add_step(reactor, :marty, Noop, mentor: {:input, :doc}) - # this is a `result` not an `input` argument because it the input is - # emitted as a separate step. - assert [%Argument{name: :mentor, source: %Template.Result{name: {:input, :doc}}}] = - step.arguments + assert [ + %Argument{ + name: :mentor, + source: %Template.Input{name: :doc} + } + ] = step.arguments end - test "when an argument is a result tuple, it is converted to a argument struct in the step" do + test "when an argument is a result tuple, it is converted to a argument struct" do reactor = new() assert {:ok, %{steps: [step]}} = @@ -106,7 +103,7 @@ defmodule Reactor.BuilderTest do test "when an argument is anything else, it is an error" do reactor = new() assert {:error, error} = add_step(reactor, :marty, Noop, [:wat]) - assert Exception.message(error) =~ "is not a `Reactor.Argument` struct" + assert Exception.message(error) =~ "contains a non-argument value" end test "when an argument has a transformation function, it adds a transformation step to the reactor" do @@ -131,7 +128,7 @@ defmodule Reactor.BuilderTest do } = steps[{:__reactor__, :transform, :mentor, :marty}] end - test "when the step has an argument transformation function, it adds the collect and transformation step to the reactor" do + test "when the step has an argument transformation function, it adds a transformation step to the reactor" do reactor = new() assert {:ok, reactor} = @@ -156,13 +153,72 @@ defmodule Reactor.BuilderTest do ] } = steps[:add_user_to_org] - assert %Step{ - arguments: [ - %Argument{name: :user, source: %Template.Result{name: :create_user}}, - %Argument{name: :org, source: %Template.Result{name: :create_org}} - ], - impl: {Step.TransformAll, [fun: _]} - } = steps[{:__reactor__, :transform, :add_user_to_org}] + assert %Step{arguments: arguments, impl: {Step.TransformAll, [fun: _]}} = + steps[{:__reactor__, :transform, :add_user_to_org}] + + arguments = Enum.map(arguments, &{&1.name, &1.source.name}) + + assert {:user, :create_user} in arguments + assert {:org, :create_org} in arguments + end + end + + describe "compose/2" do + defmodule GreeterStep do + @moduledoc false + use Reactor.Step + + def run(%{first_name: first_name, last_name: last_name}, _, _) do + {:ok, "Hello #{first_name} #{last_name}"} + end + end + + test "when the reactor argument is not a reactor struct" do + assert_raise ArgumentError, ~r/reactor.*not a Reactor/, fn -> + compose!(:marty, :doc, new(), []) + end + end + + test "when the other reactor argument is not a reactor struct" do + assert_raise ArgumentError, ~r/inner_reactor.*not a Reactor/, fn -> + compose!(new(), :doc, 123, []) + end + end + + test "when the other arguments argument is not a list" do + assert_raise ArgumentError, ~r/arguments.*not a list/, fn -> + compose!(new(), :doc, new(), :marty) + end + end + + test "it can compose two reactors together" do + shouty_reactor = + new() + |> add_input!(:first_name, &String.upcase/1) + |> add_input!(:last_name, &String.upcase/1) + |> add_step!(:greet, GreeterStep, + first_name: {:input, :first_name}, + last_name: {:input, :last_name} + ) + |> return!(:greet) + + composite_reactor = + new() + |> add_input!(:user) + |> add_step!(:first_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :first_name)}, + user: {:input, :user} + ) + |> add_step!(:last_name, {Step.AnonFn, fun: &Map.fetch(&1.user, :last_name)}, + user: {:input, :user} + ) + |> compose!(:shout, shouty_reactor, + first_name: {:result, :first_name}, + last_name: {:result, :last_name} + ) + |> return!(:shout) + + assert {:ok, "Hello MARTY MCFLY"} = + Reactor.run(composite_reactor, %{user: %{first_name: "Marty", last_name: "McFly"}}) end end end diff --git a/test/reactor/dsl/planable_verifier_test.exs b/test/reactor/dsl/planable_verifier_test.exs new file mode 100644 index 0000000..d60a592 --- /dev/null +++ b/test/reactor/dsl/planable_verifier_test.exs @@ -0,0 +1,35 @@ +defmodule Reactor.Dsl.PlanableVerifierTest do + @moduledoc false + use ExUnit.Case, async: true + alias Reactor.{Dsl.PlanableVerifier, Error.PlanError} + + test "is a Spark verifier" do + assert Spark.implements_behaviour?(PlanableVerifier, Spark.Dsl.Verifier) + end + + defmodule NoopStep do + @moduledoc false + use Reactor.Step + + def run(_, _, _), do: {:ok, :noop} + end + + describe "verify/1" do + test "refuses to compile cyclic reactors" do + assert_raise PlanError, ~r/cyclic/i, fn -> + defmodule DegenerateReactor do + @moduledoc false + use Reactor + + step :a, NoopStep do + argument :b, result(:b) + end + + step :b, NoopStep do + argument :a, result(:a) + end + end + end + end + end +end diff --git a/test/reactor/dsl/transformer_test.exs b/test/reactor/dsl/transformer_test.exs new file mode 100644 index 0000000..1f9f22c --- /dev/null +++ b/test/reactor/dsl/transformer_test.exs @@ -0,0 +1,47 @@ +defmodule Reactor.Dsl.TransformerTest do + @moduledoc false + use ExUnit.Case, async: true + alias Spark.{Dsl.Extension, Error.DslError} + + defmodule Noop do + @moduledoc false + use Reactor.Step + + def run(_, _, _), do: {:ok, :noop} + end + + defmodule NoReturnReactor do + @moduledoc false + use Reactor + + step :a, Noop + end + + describe "transform/1" do + test "when the Reactor has no explicit return, it uses the last step" do + assert :a = Extension.get_opt(NoReturnReactor, [:reactor], :return) + end + + test "when the Reactor has a return that is unknown, it raises a DSL error" do + assert_raise DslError, ~r/return value/i, fn -> + defmodule InvalidReturnReactor do + @moduledoc false + use Reactor + + step :a, Noop + + return :b + end + end + end + + test "when the Reactor has no steps, it raises a DSL error" do + assert_raise DslError, ~r/no steps/i, fn -> + defmodule EmptyReactor do + @moduledoc false + use Reactor + end + end + end + end +end diff --git a/test/reactor/info_test.exs b/test/reactor/info_test.exs new file mode 100644 index 0000000..b3832a1 --- /dev/null +++ b/test/reactor/info_test.exs @@ -0,0 +1,26 @@ +defmodule Reactor.InfoTest do + @moduledoc false + use ExUnit.Case, async: true + alias Reactor.{Builder, Info} + + describe "to_struct/1" do + test "when passed a DSL module, it generates a Reactor struct" do + assert {:ok, %Reactor{} = reactor} = Info.to_struct(Example.BasicReactor) + assert reactor.id == Example.BasicReactor + assert reactor.steps |> hd() |> Map.get(:name) == :verify + end + + test "when passed a Reactor struct, it returns it unchanged" do + reactor = + Builder.new() + |> Builder.add_input!(:age) + |> Builder.add_input!(:country) + |> Builder.add_step!(:verify, Example.BasicReactor.DrinkingAgeVerifier, + age: {:input, :age}, + country: {:input, :country} + ) + + assert {:ok, ^reactor} = Info.to_struct(reactor) + end + end +end diff --git a/test/reactor/planner_test.exs b/test/reactor/planner_test.exs new file mode 100644 index 0000000..4df12bc --- /dev/null +++ b/test/reactor/planner_test.exs @@ -0,0 +1,95 @@ +defmodule Reactor.PlannerTest do + @moduledoc false + use ExUnit.Case, async: true + alias Reactor.{Builder, Error.PlanError, Info, Planner} + + describe "plan/1" do + test "when the argument is not a reactor, it returns an error" do + {:error, %ArgumentError{} = error} = Planner.plan(:marty) + assert Exception.message(error) =~ ~r/not a reactor/i + end + + test "when the reactor has no existing plan, it creates one" do + {:ok, reactor} = Info.to_struct(Example.BasicReactor) + refute reactor.plan + + {:ok, reactor} = Planner.plan(reactor) + assert reactor.plan + end + + test "when the reactor already has a plan, it amends it" do + reactor = + Example.BasicReactor + |> Info.to_struct!() + |> Planner.plan!() + |> Builder.add_step!(:second_step, Example.BasicReactor.DrinkingAgeVerifier) + + {:ok, reactor} = Planner.plan(reactor) + assert [] = reactor.steps + + planned_step_names = + reactor.plan + |> Graph.vertices() + |> MapSet.new(& &1.name) + + expected_step_names = MapSet.new([:verify, :second_step]) + + assert MapSet.equal?(expected_step_names, planned_step_names) + end + + test "it converts steps and arguments into a DAG" do + {:ok, reactor} = + Builder.new() + |> Builder.add_step!(:a, Example.BasicReactor.DrinkingAgeVerifier) + |> Builder.add_step!(:b, Example.BasicReactor.DrinkingAgeVerifier, a: {:result, :a}) + |> Planner.plan() + + assert [] = reactor.steps + + created_graph_vertices = + reactor.plan + |> Graph.vertices() + |> MapSet.new(& &1.name) + + expected_graph_vertices = MapSet.new([:a, :b]) + assert MapSet.equal?(created_graph_vertices, expected_graph_vertices) + + created_graph_edges = + reactor.plan + |> Graph.edges() + |> Enum.map(& &1.label) + + expected_graph_edges = [{:argument, :a, :for, :b}] + + assert created_graph_edges == expected_graph_edges + end + + test "when the created graph would be cyclic, it returns an error" do + assert {:error, %PlanError{} = error} = + Builder.new() + |> Builder.add_step!(:a, Example.BasicReactor.DrinkingAgeVerifier, b: {:result, :b}) + |> Builder.add_step!(:b, Example.BasicReactor.DrinkingAgeVerifier, a: {:result, :a}) + |> Planner.plan() + + assert Exception.message(error) =~ ~r/cyclic/i + end + + test "when given an invalid step, it returns an error" do + assert {:error, %PlanError{} = error} = + Builder.new() + |> Map.put(:steps, [%{name: :marty}]) + |> Planner.plan() + + assert Exception.message(error) =~ ~r/not a `Reactor.Step` struct/ + end + + test "when an argument depends on an unknown step, it returns an error" do + assert {:error, %PlanError{} = error} = + Builder.new() + |> Builder.add_step!(:a, Example.BasicReactor.DrinkingAgeVerifier, a: {:result, :b}) + |> Planner.plan() + + assert Exception.message(error) =~ ~r/cannot be found/i + end + end +end diff --git a/test/reactor/step/anon_fn_test.exs b/test/reactor/step/anon_fn_test.exs index f4efb53..2acb7dd 100644 --- a/test/reactor/step/anon_fn_test.exs +++ b/test/reactor/step/anon_fn_test.exs @@ -8,6 +8,14 @@ defmodule Reactor.Step.AnonFnTest do end describe "run/3" do + test "it can handle 1 arity anonymous functions" do + fun = fn arguments -> + arguments.first_name + end + + assert :marty = run(%{first_name: :marty}, %{}, fun: fun) + end + test "it can handle 2 arity anonymous functions" do fun = fn arguments, _ -> arguments.first_name diff --git a/test/reactor/step/compose_test.exs b/test/reactor/step/compose_test.exs new file mode 100644 index 0000000..7ceabfc --- /dev/null +++ b/test/reactor/step/compose_test.exs @@ -0,0 +1,72 @@ +defmodule Reactor.Step.ComposeTest do + @moduledoc false + use ExUnit.Case, async: true + alias Reactor.{Argument, Step} + import Reactor.Builder + require Reactor.Argument + + test "it is a step" do + assert Spark.implements_behaviour?(Step.Compose, Step) + end + + defmodule GreeterStep do + @moduledoc false + use Reactor.Step + + def run(%{whom: whom}, _, _) do + {:ok, "Hello #{whom}"} + end + end + + describe "run/3" do + test "when the composition would be recursive, it just runs the reactor directly" do + inner_reactor = + new() + |> add_input!(:whom) + |> add_step!(:greet, GreeterStep, whom: {:input, :whom}) + |> return!(:greet) + + assert {:ok, "Hello Marty McFly"} = + Step.Compose.run( + %{whom: "Marty McFly"}, + %{ + current_step: %{name: :greet_marty}, + private: %{composed_reactors: MapSet.new([inner_reactor.id])} + }, + reactor: inner_reactor + ) + end + + test "when the composition is not recursive, it emits rewritten steps" do + inner_reactor = + new() + |> add_input!(:whom) + |> add_step!(:greet, GreeterStep, whom: {:input, :whom}) + |> return!(:greet) + + assert {:ok, nil, new_steps} = + Step.Compose.run(%{whom: "Marty McFly"}, %{current_step: %{name: :greet_marty}}, + reactor: inner_reactor + ) + + new_steps_by_name = Map.new(new_steps, &{&1.name, &1}) + + assert %Step{arguments: [argument], impl: impl} = + Map.get(new_steps_by_name, {Step.Compose, :greet_marty, :greet}) + + assert Argument.is_from_value(argument) + assert argument.name == :whom + assert argument.source.value == "Marty McFly" + + assert {Step.ComposeWrapper, [original: GreeterStep, prefix: [Step.Compose, :greet_marty]]} = + impl + + assert %Step{arguments: [argument], impl: {Step.AnonFn, _}} = + Map.get(new_steps_by_name, :greet_marty) + + assert Argument.is_from_result(argument) + assert argument.name == :value + assert argument.source.name == {Step.Compose, :greet_marty, :greet} + end + end +end diff --git a/test/reactor/step/compose_wrapper_test.exs b/test/reactor/step/compose_wrapper_test.exs new file mode 100644 index 0000000..faab911 --- /dev/null +++ b/test/reactor/step/compose_wrapper_test.exs @@ -0,0 +1,106 @@ +defmodule Reactor.Step.ComposeWrapperTest do + @moduledoc false + use ExUnit.Case, async: true + + alias Reactor.{Builder, Step} + + test "it is a step" do + assert Spark.implements_behaviour?(Step.ComposeWrapper, Step) + end + + describe "run/3" do + test "when the `original` option is missing, it returns an error" do + assert {:error, error} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, prefix: [:a, :b]) + + assert Exception.message(error) =~ ~r/missing/i + end + + test "when the `original` option is a non-step module, it returns an error" do + assert {:error, error} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, + prefix: [:a, :b], + original: Kernel + ) + + assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i + end + + test "when the `original` option refers to a non-step module, it returns an error" do + assert {:error, error} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, + prefix: [:a, :b], + original: {Kernel, []} + ) + + assert Exception.message(error) =~ ~r/does not implement the `Reactor.Step` behaviour/i + end + + test "when the `prefix` option is an empty list, it returns an error" do + assert {:error, error} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, + prefix: [], + original: {Step.AnonFn, fun: fn args -> {:ok, args.a + 1} end} + ) + + assert Exception.message(error) =~ ~r/invalid `prefix` option/i + end + + test "when the `prefix` option is not a list, it returns an error" do + assert {:error, error} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, + prefix: :marty, + original: {Step.AnonFn, fun: fn args -> {:ok, args.a + 1} end} + ) + + assert Exception.message(error) =~ ~r/invalid `prefix` option/i + end + + test "when the original step returns an ok tuple, it returns it" do + assert {:ok, 2} = + Step.ComposeWrapper.run(%{a: 1}, %{current_step: %{name: :foo}}, + prefix: [:a, :b], + original: {Step.AnonFn, fun: fn args -> {:ok, args.a + 1} end} + ) + end + + test "when the original step returns an error tuple, it returns it" do + assert {:error, :wat} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, + prefix: [:a, :b], + original: {Step.AnonFn, fun: fn _ -> {:error, :wat} end} + ) + end + + test "when the original step returns a halt tuple, it returns it" do + assert {:halt, :wat} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, + prefix: [:a, :b], + original: {Step.AnonFn, fun: fn _ -> {:halt, :wat} end} + ) + end + + test "when the original step returns new dynamic steps, it rewrites them" do + [new_c, new_d] = [ + Builder.new_step!(:c, {Step.AnonFn, fun: fn args -> {:ok, args.b} end}, b: {:input, :b}), + Builder.new_step!(:d, {Step.AnonFn, fun: fn args -> {:ok, args.c + 1} end}, + c: {:result, :c} + ) + ] + + assert {:ok, _, [rewritten_c, rewritten_d]} = + Step.ComposeWrapper.run(%{}, %{current_step: %{name: :foo}}, + prefix: [:a, :b], + original: {Step.AnonFn, fun: fn _ -> {:ok, nil, [new_c, new_d]} end} + ) + + assert rewritten_c.name == {:a, :b, new_c.name} + assert [{rewritten_arg, new_arg}] = Enum.zip(rewritten_c.arguments, new_c.arguments) + assert rewritten_arg.source.name == new_arg.source.name + + assert rewritten_d.name == {:a, :b, new_d.name} + assert [{rewritten_arg, new_arg}] = Enum.zip(rewritten_d.arguments, new_d.arguments) + assert rewritten_arg.source.name == {:a, :b, new_arg.source.name} + end + end +end diff --git a/test/reactor/step/input_test.exs b/test/reactor/step/input_test.exs deleted file mode 100644 index 3c8cb09..0000000 --- a/test/reactor/step/input_test.exs +++ /dev/null @@ -1,25 +0,0 @@ -defmodule Reactor.Step.InputTest do - @moduledoc false - use ExUnit.Case, async: true - import Reactor.Step.Input - - test "it is a step" do - assert Spark.implements_behaviour?(Reactor.Step.Input, Reactor.Step) - end - - describe "run/3" do - test "when the input is present in the private context it returns it" do - assert {:ok, :marty} = run(%{}, %{private: %{inputs: %{name: :marty}}}, name: :name) - end - - test "when the input is not present in the private context it returns an error" do - assert {:error, error} = run(%{}, %{}, name: :name) - assert Exception.message(error) =~ "missing an input" - end - - test "when the name option is not present it returns an error" do - assert {:error, error} = run(%{}, %{}, []) - assert Exception.message(error) =~ "Missing `:name` option" - end - end -end diff --git a/test/reactor/step/transform_all_test.exs b/test/reactor/step/transform_all_test.exs new file mode 100644 index 0000000..c2b48b9 --- /dev/null +++ b/test/reactor/step/transform_all_test.exs @@ -0,0 +1,26 @@ +defmodule Reactor.Step.TransformAllTest do + @moduledoc false + use ExUnit.Case, async: true + alias Reactor.Step + + test "it is a step" do + assert Spark.implements_behaviour?(Step.TransformAll, Step) + end + + describe "run/3" do + test "it applies the function to the `value` argument" do + assert {:ok, %{a: 2}} = + Step.TransformAll.run(%{a: 1}, %{}, fun: &Map.update(&1, :a, 1, fn v -> v * 2 end)) + end + + test "when the function returns a non-map value it returns an error" do + assert {:error, error} = Step.TransformAll.run(%{a: 1}, %{}, fun: fn _ -> :wat end) + assert Exception.message(error) =~ ~r/must return a map/i + end + + test "when the function raises, it returns an error" do + assert {:error, error} = Step.TransformAll.run(%{a: 1}, %{}, fun: fn _ -> raise "hell" end) + assert Exception.message(error) == "hell" + end + end +end diff --git a/test/reactor/step/transform_test.exs b/test/reactor/step/transform_test.exs index 28f0285..e2998bc 100644 --- a/test/reactor/step/transform_test.exs +++ b/test/reactor/step/transform_test.exs @@ -4,7 +4,7 @@ defmodule Reactor.Step.TransformTest do import Reactor.Step.Transform test "it is a step" do - assert Spark.implements_behaviour?(Reactor.Step.Input, Reactor.Step) + assert Spark.implements_behaviour?(Reactor.Step.Transform, Reactor.Step) end describe "run/3" do @@ -13,11 +13,6 @@ defmodule Reactor.Step.TransformTest do assert Exception.message(error) =~ "argument is missing" end - test "when the function option is missing" do - assert {:error, error} = run(%{value: :marty}, %{}, []) - assert Exception.message(error) =~ "Invalid options" - end - test "it applies the transform" do assert {:ok, "marty"} = run(%{value: :marty}, %{}, fun: &Atom.to_string/1) end diff --git a/test/reactor/step_test.exs b/test/reactor/step_test.exs new file mode 100644 index 0000000..055b0c3 --- /dev/null +++ b/test/reactor/step_test.exs @@ -0,0 +1,47 @@ +defmodule Reactor.StepTest do + @moduledoc false + use ExUnit.Case, async: true + alias Reactor.{Builder, Step} + + describe "can/2" do + test "when the module defines `undo/4`, it can undo" do + assert Step.can?(Example.Step.Undoable, :undo) + end + + test "when the module does not define `undo/4`, it cannot undo" do + refute Step.can?(Example.Step.Greeter, :undo) + end + + test "when the module defines `compensate/4`, it can compensate" do + assert Step.can?(Example.Step.Compensable, :compensate) + end + + test "when the module does not defined `compensate/4`, it cannot compensate" do + refute Step.can?(Example.Step.Greeter, :compensate) + end + end + + describe "run/3" do + test "it runs the step" do + step = Builder.new_step!(:greet, Example.Step.Greeter, whom: {:input, :whom}) + + assert {:ok, "Hello, Marty!"} = Step.run(step, %{whom: "Marty"}, %{}) + end + end + + describe "compensate/4" do + test "it runs the step's compensation callback" do + step = Builder.new_step!(:compensate, Example.Step.Compensable) + + assert :ok = Step.compensate(step, "No plutonium", %{}, %{}) + end + end + + describe "undo/4" do + test "it runs the step's undo callback" do + step = Builder.new_step!(:undo, Example.Step.Undoable) + + assert :ok = Step.undo(step, :marty, %{}, %{}) + end + end +end diff --git a/test/reactor/utils_test.exs b/test/reactor/utils_test.exs new file mode 100644 index 0000000..025f1c6 --- /dev/null +++ b/test/reactor/utils_test.exs @@ -0,0 +1,93 @@ +defmodule Reactor.UtilsTest do + @moduledoc false + use ExUnit.Case, async: true + import Reactor.Utils + + describe "deep_merge/2" do + test "it can deeply merge two maps" do + lhs = %{a: %{b: %{c: %{d: :e}}}} + rhs = %{a: %{b: %{c: %{e: :f}}, g: :h}} + + assert deep_merge(lhs, rhs) == %{a: %{b: %{c: %{d: :e, e: :f}}, g: :h}} + end + end + + describe "maybe_append/2" do + test "it appends non-nil values to the collection" do + assert [1, 2, 3] = maybe_append([1, 2], 3) + end + + test "it does not append nil values to the collection" do + assert [1, 2] = maybe_append([1, 2], nil) + end + end + + describe "maybe_append_result/2" do + test "when the function returns a non-nil value, it is appended to the collection" do + assert [1, 2, 3] = maybe_append_result([1, 2], fn -> 3 end) + end + + test "when the function returns a nil value, it is not appended to the collection" do + assert [1, 2] = maybe_append_result([1, 2], fn -> nil end) + end + end + + describe "sentence/4" do + test "it converts a list of values into a sentence" do + assert "a, b or c" = sentence(~w[a b c]a, &to_string/1, ", ", " or ") + end + end + + describe "map_while_ok/3" do + test "when all the map functions return an ok tuple, it maps the collection" do + assert {:ok, [2, 4, 6]} = map_while_ok([1, 2, 3], &{:ok, &1 * 2}, true) + end + + test "when one of the map functions returns an error tuple, it returns the error" do + assert {:error, :fail} = + map_while_ok( + [1, 2, 3], + fn + i when rem(i, 2) == 0 -> {:error, :fail} + i -> {:ok, i * 2} + end, + true + ) + end + + test "it doesn't preserve order by default" do + assert {:ok, [6, 4, 2]} = map_while_ok([1, 2, 3], &{:ok, &1 * 2}) + end + end + + describe "reduce_while_ok/3" do + test "when all the reduce functions return an ok tuple, it reduces into an ok tuple" do + assert {:ok, 12} = reduce_while_ok([1, 2, 3], 0, &{:ok, &2 + &1 * 2}) + end + + test "when one of the reduce functions returns an error tuple, it returns the error" do + assert {:error, :fail} = + reduce_while_ok([1, 2, 3], 0, fn + i, _acc when rem(i, 2) == 0 -> {:error, :fail} + i, acc -> {:ok, acc + i * 2} + end) + end + end + + describe "argument_error/3" do + test "it consistently formats the argument error message" do + message = """ + `fruit` is not fruit + + ## Value of `fruit` + + ``` + :pepperoni + ``` + """ + + assert %ArgumentError{message: ^message} = + argument_error(:fruit, "is not fruit", :pepperoni) + end + end +end diff --git a/test/support/example/cyclic_reactor.ex b/test/support/example/cyclic_reactor.ex deleted file mode 100644 index a6ae71f..0000000 --- a/test/support/example/cyclic_reactor.ex +++ /dev/null @@ -1,19 +0,0 @@ -defmodule Example.CyclicReactor do - @moduledoc false - use Reactor - - defmodule Noop do - use Reactor.Step - - @moduledoc false - def run(_, _, _), do: {:ok, :noop} - end - - step :a, Noop do - argument :b, result(:b) - end - - step :b, Noop do - argument :a, result(:a) - end -end