improvement: Add step-wide argument transforms. (#9)

* improvement: Add step-wide argument transforms.

* docs: add more entity documentation.
This commit is contained in:
James Harton 2023-06-08 11:40:02 +12:00 committed by GitHub
parent d03deb0a57
commit b23e5ef688
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 488 additions and 40 deletions

View file

@ -10,13 +10,18 @@ defmodule Reactor.Argument do
@type t :: %Argument{
name: atom,
source: Template.t(),
transform: nil | (any -> any)
transform: nil | (any -> any) | {module, keyword} | mfa
}
defguardp is_spark_fun_behaviour(fun)
when tuple_size(fun) == 2 and is_atom(elem(fun, 0)) and is_list(elem(fun, 1))
defguardp is_mfa(fun)
when tuple_size(fun) == 3 and is_atom(elem(fun, 0)) and is_atom(elem(fun, 1)) and
is_list(elem(fun, 2))
defguardp is_transform(fun)
when is_function(fun, 1) or
(tuple_size(fun) == 3 and is_atom(elem(fun, 0)) and is_atom(elem(fun, 1)) and
is_list(elem(fun, 2)))
when is_function(fun, 1) or is_spark_fun_behaviour(fun) or is_mfa(fun)
defguardp maybe_transform(fun) when is_nil(fun) or is_transform(fun)

View file

@ -21,8 +21,13 @@ defmodule Reactor.Builder do
alias Reactor.{Argument, Step, Template}
import Argument, only: :macros
import Reactor, only: :macros
import Reactor.Utils
@type step_options :: [async? | max_retries()]
defguardp is_mfa(mfa)
when tuple_size(mfa) == 2 and is_atom(elem(mfa, 0)) and
is_list(elem(mfa, 1))
@type step_options :: [async? | max_retries() | arguments_transform | context]
@typedoc "Should the step be run asynchronously?"
@type async? :: {:async?, boolean}
@ -30,6 +35,12 @@ defmodule Reactor.Builder do
@typedoc "How many times is the step allowed to retry?"
@type max_retries :: {:max_retries, :infinity | non_neg_integer()}
@typedoc "Optionally transform all the arguments into new arguments"
@type arguments_transform :: {:transform, nil | (any -> map) | {module | keyword} | mfa}
@typedoc "Optional context which will be merged with the reactor context when calling this step."
@type context :: Reactor.context()
@type step_argument :: Argument.t() | {atom, {:input | :result, any}}
@type impl :: module | {module, keyword}
@ -115,19 +126,33 @@ defmodule Reactor.Builder do
def add_step(reactor, name, impl, arguments, options) do
with {:ok, arguments} <- assert_all_are_arguments(arguments),
:ok <- assert_is_step_impl(impl),
{:ok, arguments, transform_steps} <- build_transforms_steps(arguments, name) do
{:ok, arguments, argument_transform_steps} <-
build_argument_transform_steps(arguments, name),
{:ok, arguments, step_transform_step} <-
maybe_build_step_transform_step(arguments, name, options[:transform]) do
context =
if step_transform_step do
options
|> Keyword.get(:context, %{})
|> deep_merge(%{private: %{replace_arguments: :value}})
else
Keyword.get(options, :context, %{})
end
steps =
[
%Step{
arguments: arguments,
async?: Keyword.get(options, :async?, true),
context: context,
impl: impl,
name: name,
max_retries: Keyword.get(options, :max_retries, 100),
ref: make_ref()
}
]
|> Enum.concat(transform_steps)
|> Enum.concat(argument_transform_steps)
|> maybe_append(step_transform_step)
|> Enum.concat(reactor.steps)
{:ok, %{reactor | steps: steps}}
@ -157,6 +182,7 @@ defmodule Reactor.Builder do
step = %Step{
arguments: arguments,
async?: Keyword.get(options, :async?, true),
context: Keyword.get(options, :context, %{}),
impl: impl,
name: name,
max_retries: Keyword.get(options, :max_retries, 100),
@ -196,8 +222,12 @@ defmodule Reactor.Builder do
{name, {:result, source}}, {:ok, arguments} ->
{:cont, {:ok, [Argument.from_result(name, source) | arguments]}}
not_argument, :ok ->
{:halt, {:error, "Value `#{inspect(not_argument)}` is not an Argument struct."}}
not_argument, _ ->
{:halt,
{:error,
ArgumentError.exception(
"Value `#{inspect(not_argument)}` is not a `Reactor.Argument` struct."
)}}
end)
end
@ -207,11 +237,14 @@ defmodule Reactor.Builder do
if Spark.implements_behaviour?(impl, Step) do
:ok
else
{:error, {"Module `#{inspect(impl)}` does not implement the `Step` behaviour."}}
{:error,
ArgumentError.exception(
"Module `#{inspect(impl)}` does not implement the `Reactor.Step` behaviour."
)}
end
end
defp build_transforms_steps(arguments, step_name) do
defp build_argument_transform_steps(arguments, step_name) do
arguments
|> Enum.reduce_while({:ok, [], []}, fn
argument, {:ok, arguments, steps}
@ -228,13 +261,13 @@ defmodule Reactor.Builder do
source: %Template.Result{name: {:transform, argument.name, :for, step_name}}
}
{:cont, {:ok, [argument | arguments], [step | steps]}}
{:cont, {:ok, [argument | arguments], [%{step | transform: nil} | steps]}}
argument, {:ok, arguments, steps}
when is_from_result(argument) and has_transform(argument) ->
step =
build_transform_step(
argument.source,
argument.source.name,
{:transform, argument.name, :for, step_name},
argument.transform
)
@ -244,7 +277,7 @@ defmodule Reactor.Builder do
source: %Template.Result{name: {:transform, argument.name, :for, step_name}}
}
{:cont, {:ok, [argument | arguments], [step | steps]}}
{:cont, {:ok, [argument | arguments], [%{step | transform: nil} | steps]}}
argument, {:ok, arguments, steps} when is_from_input(argument) ->
argument = %{argument | source: %Template.Result{name: {:input, argument.source.name}}}
@ -256,16 +289,33 @@ defmodule Reactor.Builder do
end)
end
defp maybe_build_step_transform_step(arguments, _name, nil), do: {:ok, arguments, nil}
defp maybe_build_step_transform_step(arguments, name, transform)
when is_function(transform, 1),
do: maybe_build_step_transform_step(arguments, name, {Step.TransformAll, fun: transform})
defp maybe_build_step_transform_step(arguments, name, transform) do
step = %Step{
arguments: arguments,
async?: true,
impl: transform,
name: {:transform, :for, name},
max_retries: 0,
ref: make_ref()
}
{:ok, [Argument.from_result(:value, step.name)], step}
end
defp build_transform_step(input_name, step_name, transform) when is_function(transform, 1),
do: build_transform_step(input_name, step_name, {Step.Transform, fun: transform})
defp build_transform_step(input_name, step_name, transform)
when tuple_size(transform) == 2 and is_atom(elem(transform, 0)) and
is_list(elem(transform, 1)) do
defp build_transform_step(input_name, step_name, transform) when is_mfa(transform) do
%Step{
arguments: [
%Argument{
name: :input,
name: :value,
source: %Template.Result{name: input_name}
}
],

View file

@ -12,34 +12,125 @@ defmodule Reactor.Dsl do
@input %Entity{
name: :input,
describe: """
Specifies an input to the Reactor.
An input is a value passed in to the Reactor when executing.
If a Reactor were a function, these would be it's arguments.
Inputs can be transformed with an arbitrary function before being passed
to any steps.
""",
examples: [
"""
input :name
""",
"""
input :age do
transform &String.to_integer/1
end
"""
],
args: [:name],
target: Input,
schema: [
name: [type: :atom, required: true],
transform: @transform
name: [
type: :atom,
required: true,
doc: """
A unique name for this input.
The name is used to allow steps to depend on it.
"""
],
transform:
Keyword.put(@transform, :doc, """
An optional transformation function which can be used to modify the
input before it is passed to any steps.
""")
]
}
@argument %Entity{
name: :argument,
describe: """
Specifies an argument to a Reactor step.
Each argument is a value which is either the result of another step, or an input value.
Individual arguments can be transformed with an arbitrary function before
being passed to any steps.
""",
examples: [
"""
argument :name, input(:name)
""",
"""
argument :user, result(:create_user)
""",
"""
argument :user_id, result(:create_user) do
transform & &1.id
end
"""
],
args: [:name, {:optional, :source}],
target: Argument,
imports: [Argument.Templates],
schema: [
name: [
type: :atom,
required: true
required: true,
doc: """
The name of the argument which will be used as the key in the
`arguments` map passed to the implementation.
"""
],
source: [
type: {:or, [{:struct, Template.Input}, {:struct, Template.Result}]},
required: true
required: true,
doc: """
What to use as the source of the argument.
See `Reactor.Argument.Templates` for more information.
"""
],
transform: @transform
transform:
Keyword.put(@transform, :doc, """
An optional transformation function which can be used to modify the
argument before it is passed to the step.
""")
]
}
@step %Entity{
name: :step,
describe: """
Specifies a Reactor step.
Steps are the unit of work in a Reactor. Reactor will calculate the
dependencies graph between the steps and execute as many as it can in each
iteration.
See the `Reactor.Step` behaviour for more information.
""",
examples: [
"""
step :create_user, MyApp.Steps.CreateUser do
argument :username, input(:username)
argument :password_hash, result(:hash_password)
end
""",
"""
step :hash_password do
argument :password, input(:password)
impl fn %{password: password}, _ ->
{:ok, Bcrypt.hash_pwd_salt(password)}
end
end
"""
],
args: [:name, {:optional, :impl}],
target: Step,
no_depend_modules: [:impl],
@ -47,22 +138,56 @@ defmodule Reactor.Dsl do
schema: [
name: [
type: :atom,
required: true
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for arguments into
another step.
"""
],
impl: [
type: {:spark_function_behaviour, Step, {Step.AnonFn, 3}},
required: true
required: true,
doc: """
The step implementation.
The implementation can be either a module which implements the
`Reactor.Step` behaviour or an anonymous function or function capture
with an arity of 2.
Note that steps which are implemented as functions cannot be
compensated or undone.
"""
],
max_retries: [
type: {:or, [{:in, [:infinity]}, :non_neg_integer]},
required: false,
default: :infinity
default: :infinity,
doc: """
The maximum number of times that the step can be retried before failing.
This is only used when the result of the `compensate/4` callback is
`:retry`.
"""
],
async?: [
type: :boolean,
required: false,
default: true
]
default: true,
doc: """
When set to true the step will be executed asynchronously via Reactor's
`TaskSupervisor`.
"""
],
transform:
Keyword.merge(@transform,
type: {:or, [{:spark_function_behaviour, Step, {Step.TransformAll, 1}}, nil]},
doc: """
An optional transformation function which can be used to modify the
entire argument map before it is passed to the step.
"""
)
]
}
@ -72,7 +197,10 @@ defmodule Reactor.Dsl do
schema: [
return: [
type: :atom,
required: false
required: false,
doc: """
Specify which step result to return upon completion.
"""
]
],
entities: [@input, @step],

View file

@ -3,6 +3,7 @@ defmodule Reactor.Executor.StepRunner do
Run an individual step, including compensation if possible.
"""
alias Reactor.{Step, Template}
import Reactor.Utils
require Logger
@max_undo_count 5
@ -13,8 +14,10 @@ defmodule Reactor.Executor.StepRunner do
@spec run(Reactor.t(), Step.t()) :: {:ok, any, [Step.t()]} | :retry | {:error | :halt, any}
def run(reactor, step) do
with {:ok, arguments} <- get_step_arguments(reactor, step),
{module, options} <- module_and_opts(step) do
do_run(module, options, arguments, reactor.context)
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, step),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
do_run(module, options, arguments, context)
end
end
@ -24,8 +27,10 @@ defmodule Reactor.Executor.StepRunner do
@spec undo(Reactor.t(), Step.t(), any) :: :ok | {:error, any}
def undo(reactor, step, value) do
with {:ok, arguments} <- get_step_arguments(reactor, step),
{module, options} <- module_and_opts(step) do
do_undo(value, module, options, arguments, reactor.context)
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, step),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
do_undo(value, module, options, arguments, context)
end
end
@ -100,4 +105,15 @@ defmodule Reactor.Executor.StepRunner do
end
end)
end
defp build_context(reactor, step), do: {:ok, deep_merge(step.context, reactor.context)}
defp maybe_replace_arguments(arguments, context) when is_nil(context.private.replace_arguments),
do: {:ok, arguments}
defp maybe_replace_arguments(arguments, context)
when is_map_key(arguments, context.private.replace_arguments),
do: {:ok, Map.get(arguments, context.private.replace_arguments)}
defp maybe_replace_arguments(arguments, _context), do: {:ok, arguments}
end

View file

@ -30,7 +30,8 @@ defmodule Reactor.Info do
step, {:ok, reactor} when is_struct(step, Step) ->
case Builder.add_step(reactor, step.name, step.impl, step.arguments,
async?: step.async?,
max_retries: step.max_retries
max_retries: step.max_retries,
transform: step.transform
) do
{:ok, reactor} -> {:cont, {:ok, reactor}}
{:error, reason} -> {:halt, {:error, reason}}

View file

@ -5,17 +5,26 @@ defmodule Reactor.Step do
Implement this behaviour to make steps for your Reactor.
"""
defstruct arguments: [], async?: true, impl: nil, name: nil, max_retries: :infinite, ref: nil
defstruct arguments: [],
async?: true,
context: %{},
impl: nil,
name: nil,
max_retries: :infinite,
ref: nil,
transform: nil
alias Reactor.{Argument, Step}
@type t :: %Step{
arguments: [Argument.t()],
async?: boolean,
context: %{optional(atom) => any},
impl: module | {module, keyword},
name: any,
max_retries: non_neg_integer() | :infinity,
ref: nil | reference()
ref: nil | reference(),
transform: nil | (any -> any) | {module, keyword} | mfa
}
@typedoc """

View file

@ -0,0 +1,27 @@
defmodule Reactor.Step.TransformAll do
@moduledoc """
A built-in step which applies a transformation function to all it's arguments.
The returned map is used as the arguments to the step, instead of the step's
defined arguments.
"""
use Reactor.Step
alias Reactor.Step.Transform
@doc false
@impl true
@spec run(Reactor.inputs(), Reactor.context(), keyword) :: {:ok | :error, any}
def run(arguments, context, options) do
case Transform.run(%{value: arguments}, context, options) do
{:ok, result} when is_map(result) ->
{:ok, result}
{:ok, _other} ->
{:error, "Step transformers must return a map to use as replacement arguments."}
{:error, reason} ->
{:error, reason}
end
end
end

24
lib/reactor/utils.ex Normal file
View file

@ -0,0 +1,24 @@
defmodule Reactor.Utils do
@moduledoc false
@doc """
Recursively merge maps.
"""
@spec deep_merge(map, map) :: map
def deep_merge(lhs, rhs) when is_map(lhs) and is_map(rhs) do
Map.merge(lhs, rhs, fn
_key, lvalue, rvalue when is_map(lvalue) and is_map(rvalue) ->
deep_merge(lvalue, rvalue)
_key, _lvalue, rvalue ->
rvalue
end)
end
@doc """
Append a non-nil value to the end of the enumerable.
"""
@spec maybe_append(Enumerable.t(), any) :: Enumerable.t()
def maybe_append(collection, nil), do: collection
def maybe_append(collection, value), do: Enum.concat(collection, [value])
end

View file

@ -19,6 +19,6 @@
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"},
"sobelow": {:hex, :sobelow, "0.12.2", "45f4d500e09f95fdb5a7b94c2838d6b26625828751d9f1127174055a78542cf5", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "2f0b617dce551db651145662b84c8da4f158e7abe049a76daaaae2282df01c5d"},
"sourceror": {:hex, :sourceror, "0.12.2", "2ae55efd149193572e0eb723df7c7a1bda9ab33c43373c82642931dbb2f4e428", [:mix], [], "hexpm", "7ad74ade6fb079c71f29fae10c34bcf2323542d8c51ee1bcd77a546cfa89d59c"},
"spark": {:hex, :spark, "1.1.2", "827e6e3e5a303a3994f42217e9760ab310005801fa2263d9a2d581cbb97d26e4", [:mix], [{:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "9d7dc50528340187674e189e812d326c60cc3c8b0623e03ecc17f468fc8d2376"},
"sourceror": {:hex, :sourceror, "0.12.3", "a2ad3a1a4554b486d8a113ae7adad5646f938cad99bf8bfcef26dc0c88e8fade", [:mix], [], "hexpm", "4d4e78010ca046524e8194ffc4683422f34a96f6b82901abbb45acc79ace0316"},
"spark": {:hex, :spark, "1.1.15", "c0db345f030c928d2c9cf8dbf7574c635664d54b3afaf64ec9c1481d20c48b66", [:mix], [{:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "bd7da17b8af5acd39e49b9dbdc98a21132cade2ff70e6283e09f37a4657362b8"},
}

View file

@ -2,7 +2,7 @@ defmodule Reactor.BuilderTest do
@moduledoc false
use ExUnit.Case, async: true
import Reactor.Builder
alias Reactor.{Step, Template}
alias Reactor.{Argument, Step, Template}
describe "new/0" do
test "it returns an empty reactor struct" do
@ -34,8 +34,133 @@ defmodule Reactor.BuilderTest do
assert transform_step.name == {:input, :marty}
assert transform_step.impl == {Step.Transform, fun: &String.upcase/1}
assert [argument] = transform_step.arguments
assert argument.name == :input
assert argument.name == :value
assert argument.source == %Template.Result{name: {:raw_input, :marty}}
end
end
describe "add_step/3..5" do
defmodule Noop do
@moduledoc false
use Reactor.Step
def run(_arguments, _context, _options), do: {:ok, :noop}
end
test "when the reactor argument is not a reactor struct, it returns an error" do
assert {:error, error} = add_step(:reactor, :marty, Noop)
assert Exception.message(error) =~ "not a Reactor"
end
test "when the implementation does not implement the Reactor.Step behaviour, it returns an error" do
reactor = new()
assert {:error, error} = add_step(reactor, :marty, __MODULE__)
assert Exception.message(error) =~ "does not implement the `Reactor.Step` behaviour"
end
test "when the arguments option is not a list, it returns an error" do
reactor = new()
assert {:error, error} = add_step(reactor, :marty, Noop, :wat)
assert Exception.message(error) =~ "is not a list"
end
test "when the options option is not a list, it returns an error" do
reactor = new()
assert {:error, error} = add_step(reactor, :marty, Noop, [], :wat)
assert Exception.message(error) =~ "is not a list"
end
test "when an argument is an input tuple, it is converted to a argument struct in the step" do
reactor = new()
assert {:ok, %{steps: [step]}} = add_step(reactor, :marty, Noop, mentor: {:input, :doc})
# this is a `result` not an `input` argument because it the input is
# emitted as a separate step.
assert [%Argument{name: :mentor, source: %Template.Result{name: {:input, :doc}}}] =
step.arguments
end
test "when an argument is a result tuple, it is converted to a argument struct in the step" do
reactor = new()
assert {:ok, %{steps: [step]}} =
add_step(reactor, :marty, Noop, mentor: {:result, :find_mentor})
assert [%Argument{name: :mentor, source: %Template.Result{name: :find_mentor}}] =
step.arguments
end
test "when an argument is an argument struct, it is added to the step" do
reactor = new()
assert {:ok, %{steps: [step]}} =
add_step(reactor, :marty, Noop, [
%Argument{name: :mentor, source: %Template.Result{name: :find_mentor}}
])
assert [%Argument{name: :mentor, source: %Template.Result{name: :find_mentor}}] =
step.arguments
end
test "when an argument is anything else, it is an error" do
reactor = new()
assert {:error, error} = add_step(reactor, :marty, Noop, [:wat])
assert Exception.message(error) =~ "is not a `Reactor.Argument` struct"
end
test "when an argument has a transformation function, it adds a transformation step to the reactor" do
reactor = new()
argument = Argument.from_result(:mentor, :find_mentor, &String.to_existing_atom/1)
assert {:ok, reactor} = add_step(reactor, :marty, Noop, [argument])
steps = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{
arguments: [
%Argument{
name: :mentor,
source: %Template.Result{name: {:transform, :mentor, :for, :marty}}
}
]
} = steps[:marty]
assert %Step{
arguments: [%Argument{name: :value, source: %Template.Result{name: :find_mentor}}],
impl: {Step.Transform, [fun: _]}
} = steps[{:transform, :mentor, :for, :marty}]
end
test "when the step has an argument transformation function, it adds the collect and transformation step to the reactor" do
reactor = new()
assert {:ok, reactor} =
add_step(
reactor,
:add_user_to_org,
Noop,
[user: {:result, :create_user}, org: {:result, :create_org}],
transform: &%{user_id: &1.user.id, org_id: &1.org.id}
)
steps = Map.new(reactor.steps, &{&1.name, &1})
assert %Step{
arguments: [
%Argument{
name: :value,
source: %Template.Result{name: {:transform, :for, :add_user_to_org}}
}
]
} = steps[:add_user_to_org]
assert %Step{
arguments: [
%Argument{name: :user, source: %Template.Result{name: :create_user}},
%Argument{name: :org, source: %Template.Result{name: :create_org}}
],
impl: {Step.TransformAll, [fun: _]}
} = steps[{:transform, :for, :add_user_to_org}]
end
end
end

View file

@ -234,4 +234,67 @@ defmodule Reactor.ExecutorTest do
Reactor.Executor.run(reactor, %{from: 7}, %{}, max_iterations: 100)
end
end
describe "argument transformation" do
defmodule ArgumentTransformReactor do
@moduledoc false
use Reactor
input :whom
input :when
step :blame do
argument :whom, input(:whom) do
transform &"#{&1.first_name} #{&1.last_name}"
end
argument :when, input(:when) do
transform & &1.year
end
impl(fn args, _, _ ->
{:ok, "#{args.whom} in #{args.when}"}
end)
end
return :blame
end
test "it correctly transforms the arguments" do
assert {:ok, "Marty McFly in 1985"} =
Reactor.run(ArgumentTransformReactor, %{
whom: %{first_name: "Marty", last_name: "McFly"},
when: ~N[1985-10-26 01:22:00]
})
end
defmodule AllArgumentsTransformReactor do
@moduledoc false
use Reactor
input :whom
input :when
step :blame do
argument :whom, input(:whom)
argument :when, input(:when)
transform &%{whom: "#{&1.whom.first_name} #{&1.whom.last_name}", when: &1.when.year}
impl(fn args, _, _ ->
{:ok, "#{args.whom} in #{args.when}"}
end)
end
return :blame
end
test "it correctly transforms all the arguments" do
assert {:ok, "Marty McFly in 1985"} =
Reactor.run(AllArgumentsTransformReactor, %{
whom: %{first_name: "Marty", last_name: "McFly"},
when: ~N[1985-10-26 01:22:00]
})
end
end
end