feat(map): Add the ability to map over elements of a collection inside a reactor. (#123)

* feat(map): Add the ability to map over elements of a collection inside a reactor.

* improvement: throw a more helpful error when a step returns an invalid result.

* fix: automatically pass extra arguments from the map step to nested steps.

They can't be referred to directly because they may be added to the graph
at runtime, and may depend on steps which have been completed and their
results discarded because they have no dependents at that time.

* fix: spurious test failures seemingly caused by `Mimic`.
This commit is contained in:
James Harton 2024-07-18 21:56:46 +12:00 committed by GitHub
parent d380f2af4b
commit e8ac9a1d90
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
34 changed files with 1612 additions and 250 deletions

View file

@ -9,6 +9,7 @@ spark_locals_without_parens = [
around: 2,
around: 3,
async?: 1,
batch_size: 1,
before_all: 1,
collect: 1,
collect: 2,
@ -24,6 +25,8 @@ spark_locals_without_parens = [
input: 1,
input: 2,
level: 1,
map: 1,
map: 2,
matches?: 1,
matches?: 2,
max_retries: 1,
@ -32,9 +35,11 @@ spark_locals_without_parens = [
on: 1,
return: 1,
run: 1,
source: 1,
step: 1,
step: 2,
step: 3,
strict_ordering?: 1,
switch: 1,
switch: 2,
transform: 1,

View file

@ -4,9 +4,11 @@
"backoff",
"casted",
"Desugars",
"lvalue",
"mappish",
"noreply",
"Planable",
"rvalue",
"splode",
"Splode"
]

View file

@ -27,6 +27,9 @@ The top-level reactor DSL
* argument
* wait_for
* [input](#reactor-input)
* [map](#reactor-map)
* argument
* wait_for
* [step](#reactor-step)
* argument
* wait_for
@ -106,7 +109,7 @@ Wrap a function around a group of steps.
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-around-name){: #reactor-around-name .spark-required} | `atom` | | A unique name of the group of steps. |
| [`name`](#reactor-around-name){: #reactor-around-name .spark-required} | `atom` | | A unique name for the group of steps. |
| [`fun`](#reactor-around-fun){: #reactor-around-fun .spark-required} | `(any, any, any, any -> any) \| mfa` | | The around function. See `Reactor.Step.Around` for more information. |
### Options
@ -171,7 +174,7 @@ argument :three, value(3)
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-around-argument-name){: #reactor-around-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. |
| [`source`](#reactor-around-argument-source){: #reactor-around-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
| [`source`](#reactor-around-argument-source){: #reactor-around-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
### Options
| Name | Type | Default | Docs |
@ -327,7 +330,7 @@ argument :three, value(3)
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-collect-argument-name){: #reactor-collect-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. |
| [`source`](#reactor-collect-argument-source){: #reactor-collect-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
| [`source`](#reactor-collect-argument-source){: #reactor-collect-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
### Options
| Name | Type | Default | Docs |
@ -467,7 +470,7 @@ argument :three, value(3)
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-compose-argument-name){: #reactor-compose-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. |
| [`source`](#reactor-compose-argument-source){: #reactor-compose-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
| [`source`](#reactor-compose-argument-source){: #reactor-compose-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
### Options
| Name | Type | Default | Docs |
@ -616,7 +619,7 @@ argument :three, value(3)
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-debug-argument-name){: #reactor-debug-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. |
| [`source`](#reactor-debug-argument-source){: #reactor-debug-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
| [`source`](#reactor-debug-argument-source){: #reactor-debug-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
### Options
| Name | Type | Default | Docs |
@ -759,7 +762,7 @@ argument :three, value(3)
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-group-argument-name){: #reactor-group-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. |
| [`source`](#reactor-group-argument-source){: #reactor-group-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
| [`source`](#reactor-group-argument-source){: #reactor-group-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
### Options
| Name | Type | Default | Docs |
@ -867,6 +870,206 @@ end
Target: `Reactor.Dsl.Input`
## reactor.map
```elixir
map name
```
Execute nested steps for every item of an iterator.
Allows you to "map over" a collection applying a some steps to each element,
returning a list of new values. The input collection must be bounded - ie you
cannot pass infinite streams into this step or it will just loop forever - and
because it has to keep the results from each batch will eventually just use up
all available RAM.
Under the hood we use `Iter` to handle lazy iteration of the collection. This
means that you can pass an `Iter.t` or any value for which `Iter.IntoIterable`
is implemented.
> #### A note on ordering {: .tip}
>
> If your application doesn't need the results back in the same order that they
> were provided then setting `strict_ordering?` to `false` will increase
> performance - especially on large input sets.
### Nested DSLs
* [argument](#reactor-map-argument)
* [wait_for](#reactor-map-wait_for)
### Examples
```
map :double_numbers do
input input(:numbers)
step :double do
argument :number, element(:double_numbers)
run %{number: number}, _, _ ->
{:ok, number * 2}
end
end
end
```
```
step :get_subscriptions do
run _, _, _ ->
Stripe.Subscription.list()
end
end
map :cancel_subscriptions do
input result(:get_subscriptions)
step :cancel do
argument :sub_id, element(:cancel_subscriptions, [:id])
run fn args, _, _ ->
Stripe.Subscription.cancel(arg.sub_id, %{prorate: true, invoice_now: true})
end
end
return :cancel
end
```
### Arguments
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-map-name){: #reactor-map-name .spark-required} | `atom` | | A unique name for the step. |
### Options
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`source`](#reactor-map-source){: #reactor-map-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The iterator or enumerable to use as the source of the iteration. |
| [`allow_async?`](#reactor-map-allow_async?){: #reactor-map-allow_async? } | `boolean` | `false` | Whether the emitted steps should be allowed to run asynchronously. |
| [`batch_size`](#reactor-map-batch_size){: #reactor-map-batch_size } | `pos_integer` | `100` | The number of items to consume off the source when emitting steps. |
| [`return`](#reactor-map-return){: #reactor-map-return } | `atom` | | The name of the nested step to use as the return value. |
| [`strict_ordering?`](#reactor-map-strict_ordering?){: #reactor-map-strict_ordering? } | `boolean` | `true` | Whether the mapped values must be returned in the same order that they were provided. |
## reactor.map.argument
```elixir
argument name, source \\ nil
```
Specifies an argument to a Reactor step.
Each argument is a value which is either the result of another step, or an input value.
Individual arguments can be transformed with an arbitrary function before
being passed to any steps.
### Examples
```
argument :name, input(:name)
```
```
argument :year, input(:date, [:year])
```
```
argument :user, result(:create_user)
```
```
argument :user_id, result(:create_user) do
transform & &1.id
end
```
```
argument :user_id, result(:create_user, [:id])
```
```
argument :three, value(3)
```
### Arguments
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-map-argument-name){: #reactor-map-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. |
| [`source`](#reactor-map-argument-source){: #reactor-map-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
### Options
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`transform`](#reactor-map-argument-transform){: #reactor-map-argument-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the argument before it is passed to the step. |
### Introspection
Target: `Reactor.Dsl.Argument`
## reactor.map.wait_for
```elixir
wait_for names
```
Wait for the named step to complete before allowing this one to start.
Desugars to `argument :_, result(step_to_wait_for)`
### Examples
```
wait_for :create_user
```
### Arguments
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`names`](#reactor-map-wait_for-names){: #reactor-map-wait_for-names .spark-required} | `atom \| list(atom)` | | The name of the step to wait for. |
### Introspection
Target: `Reactor.Dsl.WaitFor`
### Introspection
Target: `Reactor.Dsl.Map`
## reactor.step
```elixir
step name, impl \\ nil
@ -983,7 +1186,7 @@ argument :three, value(3)
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-step-argument-name){: #reactor-step-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. |
| [`source`](#reactor-step-argument-source){: #reactor-step-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
| [`source`](#reactor-step-argument-source){: #reactor-step-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. |
### Options
| Name | Type | Default | Docs |
@ -1067,7 +1270,7 @@ Use a predicate to determine which steps should be executed.
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`on`](#reactor-switch-on){: #reactor-switch-on .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The value to match against. |
| [`on`](#reactor-switch-on){: #reactor-switch-on .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The value to match against. |
| [`allow_async?`](#reactor-switch-allow_async?){: #reactor-switch-allow_async? } | `boolean` | `true` | Whether the emitted steps should be allowed to run asynchronously. |

View file

@ -6,6 +6,7 @@ defmodule Reactor.Argument do
defstruct name: nil, source: nil, transform: nil
alias Reactor.{Argument, Template}
import Reactor.Template, only: :macros
@type t :: %Argument{
name: atom,
@ -64,6 +65,49 @@ defmodule Reactor.Argument do
def from_value(name, value, transform \\ nil) when is_atom(name) and maybe_transform(transform),
do: %Argument{name: name, source: %Template.Value{value: value}, transform: transform}
@doc """
Build an argument which refers to to an element within a map step with an optional transformation applied.
## Example
iex> Argument.from_element(:argument_name, &Atom.to_string/1)
"""
@spec from_element(atom, atom, nil | (any -> any)) :: Argument.t()
def from_element(name, element_name, transform \\ nil)
when is_atom(name) and maybe_transform(transform),
do: %Argument{
name: name,
source: %Template.Element{name: element_name},
transform: transform
}
@doc """
Build an argument directly from a template.
## Example
iex> Argument.from_template(:argument_name, Reactor.Dsl.Argument.input(:input_name))
"""
@spec from_template(atom, Template.t(), nil | (any -> any)) :: Argument.t()
def from_template(name, template, transform \\ nil)
when is_atom(name) and is_template(template) and maybe_transform(transform),
do: %Argument{name: name, source: template, transform: transform}
@doc """
Set a sub-path on the argument.
## Example
iex> Argument.from_value(:example, :value)
...> |> Argument.sub_path([:nested, :values])
"""
@spec sub_path(Argument.t(), [any]) :: Argument.t()
def sub_path(argument, sub_path),
do: %{argument | source: %{argument.source | sub_path: sub_path}}
@doc """
Validate that the argument is an Argument struct.
"""
@ -72,17 +116,26 @@ defmodule Reactor.Argument do
@doc """
Validate that the argument refers to a reactor input.
"""
defguard is_from_input(argument) when is_struct(argument.source, Template.Input)
defguard is_from_input(argument)
when is_argument(argument) and is_input_template(argument.source)
@doc """
Validate that the argument refers to a step result.
"""
defguard is_from_result(argument) when is_struct(argument.source, Template.Result)
defguard is_from_result(argument)
when is_argument(argument) and is_result_template(argument.source)
@doc """
Validate that the argument contains a static value.
"""
defguard is_from_value(argument) when is_struct(argument.source, Template.Value)
defguard is_from_value(argument)
when is_argument(argument) and is_value_template(argument.source)
@doc """
Validate that the argument contains an element.
"""
defguard is_from_element(argument)
when is_argument(argument) and is_element_template(argument.source)
@doc """
Validate that the argument has a transform.

View file

@ -224,7 +224,8 @@ defmodule Reactor.Builder.Step do
{:ok, argument}
end
argument when is_from_result(argument) or is_from_value(argument) ->
argument
when is_from_result(argument) or is_from_value(argument) or is_from_element(argument) ->
{:ok, argument}
end)
end

View file

@ -30,6 +30,7 @@ defmodule Reactor.Dsl do
Dsl.Debug.__entity__(),
Dsl.Group.__entity__(),
Dsl.Input.__entity__(),
Dsl.Map.__entity__(),
Dsl.Step.__entity__(),
Dsl.Switch.__entity__()
],

View file

@ -117,6 +117,37 @@ defmodule Reactor.Dsl.Argument do
@spec value(any) :: Template.Value.t()
def value(value), do: %Template.Value{value: value}
@doc ~S"""
The `element` template helper for the Reactor DSL.
## Example
```elixir
defmodule ExampleReactor do
use Reactor
input :numbers
map :double_numbers do
source input(:numbers)
step :double do
argument :number, element(:double_numbers)
run fn args, _, _ ->
{:ok, args.number * 2}
end
end
return :double
end
end
```
"""
@spec element(any, [any]) :: Template.Element.t()
def element(name, sub_path \\ [])
def element(name, sub_path), do: %Template.Element{name: name, sub_path: List.wrap(sub_path)}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{

View file

@ -39,7 +39,7 @@ defmodule Reactor.Dsl.Around do
type: :atom,
required: true,
doc: """
A unique name of the group of steps.
A unique name for the group of steps.
"""
],
fun: [
@ -91,9 +91,15 @@ defmodule Reactor.Dsl.Around do
)}
end
def verify(_around, _dsl_state), do: :ok
def transform(_around, dsl_state), do: {:ok, dsl_state}
def verify(around, dsl_state) do
around.steps
|> Enum.reduce_while(:ok, fn step, :ok ->
case Dsl.Build.verify(step, dsl_state) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp build_inputs(reactor, around) do
around.arguments

View file

@ -13,15 +13,6 @@ defprotocol Reactor.Dsl.Build do
@spec build(t, Reactor.t()) :: {:ok, Reactor.t()} | {:error, any}
def build(entity, reactor)
@doc """
Perform any transformation that is needed to make the entity work in the
system before building.
See `Spark.Dsl.Transformer` for more information.
"""
@spec transform(t, Spark.Dsl.t()) :: {:ok, Spark.Dsl.t()} | {:error, any}
def transform(entity, dsl_state)
@doc """
Perform any after-compilation verification that is needed to make the entity
work.

View file

@ -103,12 +103,18 @@ defmodule Reactor.Dsl.Group do
)}
end
def verify(_group, _dsl_state), do: :ok
def verify(group, dsl_state) do
group.steps
|> Enum.reduce_while(:ok, fn step, :ok ->
case Dsl.Build.verify(step, dsl_state) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
def transform(_around, dsl_state), do: {:ok, dsl_state}
defp build_inputs(reactor, around) do
around.arguments
defp build_inputs(reactor, group) do
group.arguments
|> Enum.map(& &1.name)
|> reduce_while_ok(reactor, &Builder.add_input(&2, &1))
end

61
lib/reactor/dsl/info.ex Normal file
View file

@ -0,0 +1,61 @@
defmodule Reactor.Dsl.Info do
@moduledoc """
Introspection for the Reactor DSL.
"""
use Spark.InfoGenerator, sections: [:reactor], extension: Reactor.Dsl
alias Reactor.{Builder, Dsl}
alias Spark.Dsl.Extension
import Reactor.Utils
@doc """
Convert a reactor DSL module into a reactor struct.
"""
@spec to_struct(module | Reactor.t() | Spark.Dsl.t()) :: {:ok, Reactor.t()} | {:error, any}
def to_struct(reactor) when is_struct(reactor, Reactor), do: {:ok, reactor}
def to_struct(module) do
with {:ok, reactor} <- entities_to_struct(module),
{:ok, reactor} <- maybe_set_return(module, reactor) do
add_middleware(module, reactor)
end
end
@doc """
Raising version of `to_struct/1`.
"""
@spec to_struct!(module | Reactor.t() | Spark.Dsl.t()) :: Reactor.t() | no_return
def to_struct!(reactor) do
case to_struct(reactor) do
{:ok, reactor} -> reactor
{:error, reason} -> raise reason
end
end
defp entities_to_struct(module) when is_atom(module) do
module
|> reactor()
|> reduce_while_ok(Builder.new(module), &Dsl.Build.build/2)
end
defp entities_to_struct(dsl_state) when is_map(dsl_state) do
module = Extension.get_persisted(dsl_state, :module)
dsl_state
|> reactor()
|> reduce_while_ok(Builder.new(module), &Dsl.Build.build/2)
end
defp maybe_set_return(module, reactor) do
case reactor_return(module) do
{:ok, value} -> {:ok, %{reactor | return: value}}
:error -> {:ok, reactor}
end
end
defp add_middleware(module, reactor) do
module
|> reactor_middlewares()
|> reduce_while_ok(reactor, &Dsl.Build.build/2)
end
end

235
lib/reactor/dsl/map.ex Normal file
View file

@ -0,0 +1,235 @@
defmodule Reactor.Dsl.Map do
@moduledoc """
The `map` DSL entity struct.
See `d:Reactor.map`.
"""
defstruct __identifier__: nil,
allow_async?: false,
arguments: [],
batch_size: 100,
iterable?: true,
name: nil,
return: nil,
source: nil,
steps: [],
strict_ordering?: true
alias Reactor.{Builder, Dsl, Step, Template}
@type t :: %Dsl.Map{
__identifier__: any,
allow_async?: boolean,
arguments: [Dsl.Argument.t()],
batch_size: pos_integer(),
iterable?: true,
name: atom,
return: atom,
source: Template.t(),
steps: [Dsl.Step.t()],
strict_ordering?: boolean
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :map,
describe: """
Execute nested steps for every item of an iterator.
Allows you to "map over" a collection applying a some steps to each element,
returning a list of new values. The input collection must be bounded - ie you
cannot pass infinite streams into this step or it will just loop forever - and
because it has to keep the results from each batch will eventually just use up
all available RAM.
Under the hood we use `Iter` to handle lazy iteration of the collection. This
means that you can pass an `Iter.t` or any value for which `Iter.IntoIterable`
is implemented.
> #### A note on ordering {: .tip}
>
> If your application doesn't need the results back in the same order that they
> were provided then setting `strict_ordering?` to `false` will increase
> performance - especially on large input sets.
""",
examples: [
"""
map :double_numbers do
input input(:numbers)
step :double do
argument :number, element(:double_numbers)
run %{number: number}, _, _ ->
{:ok, number * 2}
end
end
end
""",
"""
step :get_subscriptions do
run _, _, _ ->
Stripe.Subscription.list()
end
end
map :cancel_subscriptions do
input result(:get_subscriptions)
step :cancel do
argument :sub_id, element(:cancel_subscriptions, [:id])
run fn args, _, _ ->
Stripe.Subscription.cancel(arg.sub_id, %{prorate: true, invoice_now: true})
end
end
return :cancel
end
"""
],
target: Dsl.Map,
args: [:name],
identifier: :name,
imports: [Dsl.Argument],
entities: [steps: [], arguments: [Dsl.Argument.__entity__(), Dsl.WaitFor.__entity__()]],
recursive_as: :steps,
schema: [
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
"""
],
allow_async?: [
type: :boolean,
required: false,
default: false,
doc: """
Whether the emitted steps should be allowed to run asynchronously.
"""
],
batch_size: [
type: :pos_integer,
required: false,
default: 100,
doc: """
The number of items to consume off the source when emitting steps.
"""
],
source: [
type: Template.type(),
required: true,
doc: """
The iterator or enumerable to use as the source of the iteration.
"""
],
return: [
type: :atom,
required: false,
doc: """
The name of the nested step to use as the return value.
"""
],
strict_ordering?: [
type: :boolean,
required: false,
default: true,
doc: """
Whether the mapped values must be returned in the same order that they were provided.
"""
]
]
}
defimpl Dsl.Build do
import Reactor.Utils
require Reactor.Argument
alias Reactor.Argument
alias Spark.{Dsl.Verifier, Error.DslError}
def build(map, reactor) do
sub_reactor = Builder.new(reactor.id)
with {:ok, sub_reactor} <- build_steps(sub_reactor, map) do
arguments =
map.arguments
|> Enum.concat([Argument.from_template(:source, map.source)])
step_options =
map
|> Map.take([:allow_async?, :batch_size, :return, :strict_ordering])
|> Map.put(:state, :init)
|> Map.put(:steps, sub_reactor.steps)
|> Map.update!(:return, fn
nil ->
sub_reactor.steps
|> List.first()
|> Map.fetch!(:name)
return ->
return
end)
|> Enum.to_list()
Builder.add_step(
reactor,
map.name,
{Step.Map, step_options},
arguments,
max_retries: 0,
ref: :step_name
)
end
end
@spec verify(Reactor.Dsl.Map.t(), any()) :: :ok | struct()
def verify(step, dsl_state) do
with :ok <- verify_at_least_one_step(step, dsl_state) do
verify_return(step, dsl_state)
end
end
defp verify_at_least_one_step(%{steps: [], name: name}, dsl_state) do
DslError.exception(
module: Verifier.get_persisted(dsl_state, :module),
path: [:reactor, :map, name],
message: "You must provide at least one child step to execute."
)
end
defp verify_at_least_one_step(_step, _dsl_state), do: :ok
defp verify_return(%{return: nil, steps: [_]}, _dsl_state), do: :ok
defp verify_return(%{return: nil, name: name}, dsl_state),
do:
DslError.exception(
module: Verifier.get_persisted(dsl_state, :module),
path: [:reactor, :map, name],
message:
"You must specify which step to use as the return value when more than one nested step is present in a map."
)
defp verify_return(%{steps: steps, return: return, name: name}, dsl_state) do
if Enum.any?(steps, &(&1.name == return)) do
:ok
else
DslError.exception(
module: Verifier.get_persisted(dsl_state, :module),
path: [:reactor, :map, name],
message:
"The name `#{inspect(return)}` does not refer to a direct descendant of the `#{inspect(name)}` step."
)
end
end
defp build_steps(reactor, map) do
map.steps
|> reduce_while_ok(reactor, &Dsl.Build.build/2)
end
end
end

View file

@ -152,7 +152,17 @@ defmodule Reactor.Dsl.Switch do
)}
end
def verify(_switch, _dsl_state), do: :ok
def verify(switch, dsl_state) do
switch.matches
|> Enum.flat_map(& &1.steps)
|> Enum.concat(switch.default.steps)
|> Enum.reduce_while(:ok, fn step, :ok ->
case Build.verify(step, dsl_state) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
def transform(_switch, dsl_state), do: {:ok, dsl_state}

View file

@ -11,7 +11,6 @@ defmodule Reactor.Dsl.Transformer do
with {:ok, step_names} <- step_names(dsl_state),
{:ok, dsl_state} <- maybe_set_return(dsl_state, step_names),
{:ok, dsl_state} <- validate_return(dsl_state, step_names),
{:ok, dsl_state} <- do_entity_transform(dsl_state),
{:ok, reactor} <- Info.to_struct(dsl_state),
{:ok, reactor} <- Planner.plan(reactor) do
dsl_state =
@ -53,12 +52,6 @@ defmodule Reactor.Dsl.Transformer do
end
end
defp do_entity_transform(dsl_state) do
dsl_state
|> Transformer.get_entities([:reactor])
|> reduce_while_ok(dsl_state, &Dsl.Build.transform/2)
end
defp maybe_set_return(dsl_state, step_names) do
case Transformer.get_option(dsl_state, [:reactor], :return) do
nil ->

View file

@ -3,8 +3,9 @@ defmodule Reactor.Dsl.Verifier do
Runs `Reactor.Dsl.Build.verify/2` for all the entities in the reactor.
"""
use Spark.Dsl.Verifier
alias Reactor.Dsl.Build
alias Spark.Dsl.Verifier
alias Reactor.{Argument, Dsl.Build, Dsl.Input}
alias Spark.{Dsl.Verifier, Error.DslError}
require Argument
@doc false
@impl true
@ -12,11 +13,89 @@ defmodule Reactor.Dsl.Verifier do
def verify(dsl_state) do
dsl_state
|> Verifier.get_entities([:reactor])
|> Enum.reduce_while(:ok, fn entity, :ok ->
case Build.verify(entity, dsl_state) do
|> Enum.reject(&is_struct(&1, Input))
|> Enum.reduce_while(:ok, fn step, :ok ->
case recursively_verify_step(step, nil, dsl_state) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp recursively_verify_step(%{step: [_ | _]} = step, parent_step, dsl_state) do
with :ok <- verify_step(step, parent_step, dsl_state) do
Enum.reduce_while(step.steps, :ok, fn child, :ok ->
case recursively_verify_step(child, step, dsl_state) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
end
defp recursively_verify_step(step, parent_step, dsl_state),
do: verify_step(step, parent_step, dsl_state)
defp verify_step(step, parent_step, dsl_state) do
with :ok <- maybe_verify_element_arguments(step, parent_step, dsl_state) do
Build.verify(step, dsl_state)
end
end
defp maybe_verify_element_arguments(step, parent_step, dsl_state)
when parent_step.iterable? == true do
step.arguments
|> Enum.reduce_while(:ok, fn
argument, :ok
when Argument.is_from_element(argument) and argument.source.name == parent_step.name ->
{:cont, :ok}
argument, :ok when Argument.is_from_element(argument) ->
{:halt,
{:error,
DslError.exception(
module: Verifier.get_persisted(dsl_state, :module),
path: [:reactor, step.name, :argument, argument.name],
message: """
Element template refers to non-parent step.
The argument `#{inspect(argument.name)}` is sourced from an element template,
however this template refers to a step which is not it's immediate parent. This
is an unsupported configuration.
"""
)}}
_argument, :ok ->
{:cont, :ok}
end)
end
defp maybe_verify_element_arguments(step, _parent_step, dsl_state)
when is_map_key(step, :arguments) do
step.arguments
|> Enum.reduce_while(:ok, fn
argument, :ok when Argument.is_from_element(argument) ->
{:halt,
{:error,
DslError.exception(
module: Verifier.get_persisted(dsl_state, :module),
path: [:reactor, step.name, :argument, argument.name],
message: """
Unsupported element template in argument.
The argument `#{inspect(argument.name)}` is sourced from an element template,
this is fine if it's being passed to a step which is an iterator however the
step type `#{inspect(step.__struct__)}` isn't an iterator.
If you're defining your own iterable step type then you need to add the
`iterable?` field to its struct with its value set to `true`.
"""
)}}
_argument, :ok ->
{:cont, :ok}
end)
end
defp maybe_verify_element_arguments(_step, _parent_step, _dsl_state), do: :ok
end

View file

@ -0,0 +1,161 @@
defmodule Reactor.Error.Internal.UnreachableError do
@moduledoc """
An error that should never happen.
"""
use Reactor.Error,
fields: [:bindings, :message, :file, :line],
class: :reactor
@doc false
@impl true
def message(error) do
[
"""
# Unreachable Error
You should _never_ see this error in the wild. If you do please raise an issue on
the Reactor repository:
https://github.com/ash-project/reactor/issues/new
And paste the following information:
--- BEGIN COPY ---
Reached unreachable code at #{error.file}:#{error.line}:
#{error.message}
"""
]
|> maybe_append(maybe_format_bindings(error))
|> maybe_append(maybe_format_stacktrace(error))
|> maybe_append(format_system_info())
|> maybe_append(format_running_applications())
|> Enum.join("\n")
end
@doc """
Create an unreachable error.
"""
@spec unreachable(String.t()) :: Macro.output()
defmacro unreachable(message) do
quote do
unquote(__MODULE__).exception(
bindings: binding(),
line: __ENV__.line,
file: __ENV__.file,
message: unquote(message)
)
end
end
@doc """
Bang version of `unreachable/1`.
"""
@spec unreachable!(String.t()) :: Macro.output()
defmacro unreachable!(message) do
quote do
raise unquote(__MODULE__).exception(
bindings: binding(),
line: __ENV__.line,
file: __ENV__.file,
message: unquote(message)
)
end
end
defp maybe_format_bindings(error) do
if Enum.any?(error.bindings) do
bindings =
error.bindings
|> Enum.map_join("\n", fn {name, value} ->
" - `#{inspect(name)}`: `#{inspect(value)}`"
end)
"""
Bindings:
#{bindings}
"""
end
end
defp maybe_format_stacktrace(error) do
if error.stacktrace do
stacktrace =
error.stacktrace.stacktrace
|> Enum.drop(2)
|> Exception.format_stacktrace()
"""
Backtrace:
#{stacktrace}
"""
end
end
# sobelow_skip ["Traversal.FileModule"]
defp format_system_info do
elixir = System.build_info()
erlang_vsn =
[
:code.root_dir(),
"releases",
:erlang.system_info(:otp_release),
"OTP_VERSION"
]
|> Path.join()
|> File.read!()
|> String.trim()
system =
with {_, code} when code > 0 <- System.cmd("uname", ["-a"]),
{_, code} when code > 0 <- System.cmd("ver", []) do
{family, name} = :os.type()
version =
case :os.version() do
version when is_tuple(version) ->
version
|> Tuple.to_list()
|> Enum.map_join(".", &to_string/1)
version when is_list(version) ->
to_string(version)
end
"#{name} #{family} / #{version}"
else
{uname, 0} -> uname
end
"""
System:
Elixir #{elixir["version"]} (#{elixir[:revision]}) compiled with Erlang/OTP #{elixir[:otp_release]}
Erlang/OTP #{erlang_vsn} [erts-#{:erlang.system_info(:version)}]
#{system}
"""
end
defp format_running_applications do
applications =
Application.loaded_applications()
|> Enum.map_join("\n", fn {app, _, vsn} ->
" - #{app} #{vsn}"
end)
"""
Running applications:
#{applications}
--- END COPY ---
Please carefully read all of the above and redact any sensitive information.
"""
end
end

View file

@ -0,0 +1,38 @@
defmodule Reactor.Error.Invalid.InvalidResultError do
@moduledoc """
This error is returned when a step returns an invalid result.
"""
use Reactor.Error, fields: [:reactor, :step, :result, :arguments], class: :invalid
@doc false
def message(error) do
"""
# Invalid Result Error
The step `#{inspect(error.step.name)}` returned an invalid result.
Valid return types from the `c:Reactor.Step.run/3` callback are:
- `{:ok, any}` - a successful result.
- `{:ok, any, [Reactor.Step.t]}` - a successful result with additional steps to
add to the running reactor.
- `:retry` - the step wants to be retried.
- `{:retry, Exception.t | any}` - the step wants to be retried, and here's why.
- `{:error, Exception.t | any}` - the step failed, and here's why.
- `{:halt, any}` - the step wants the Reactor to stop.
## `result`:
#{inspect(error.result)}
## `step`:
#{inspect(error.step)}
## `arguments`:
#{inspect(error.arguments)}
"""
end
end

View file

@ -270,15 +270,16 @@ defmodule Reactor.Executor do
end
defp find_ready_steps(reactor, _state) do
step =
reactor.plan
|> Graph.vertices()
|> Enum.find(fn
step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0
_ -> false
end)
{:continue, [step]}
reactor.plan
|> Graph.vertices()
|> Enum.find(fn
step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0
_ -> false
end)
|> case do
nil -> {:continue, []}
step -> {:continue, [step]}
end
end
defp maybe_release_pool(state) when state.pool_owner == true do

View file

@ -71,105 +71,125 @@ defmodule Reactor.Executor.Async do
end
@doc """
Check to see if any steps are completed, and if so handle them.
Handle zero or one completed async steps and then decide what to do.
"""
@spec handle_completed_steps(Reactor.t(), Executor.State.t()) ::
{:recurse | :continue | :undo | :halt, Reactor.t(), Executor.State.t()}
def handle_completed_steps(reactor, state) do
completed_task_results = get_normalised_task_results(state.current_tasks, 100)
reactor
|> delete_vertices(Map.keys(completed_task_results))
|> handle_completed_steps(state, completed_task_results)
completed_task_results = get_normalised_task_results(state, timeout: 100)
handle_completed_task_results(reactor, state, completed_task_results)
end
defp handle_completed_steps(reactor, state, completed_task_results)
when map_size(completed_task_results) == 0,
do: {:continue, reactor, state}
defp handle_completed_task_results(reactor, state, []),
do: {:continue, reactor, state}
defp handle_completed_steps(reactor, state, completed_task_results) do
release_concurrency_resources_to_pool(state.concurrency_key, map_size(completed_task_results))
defp handle_completed_task_results(reactor, state, completed_task_results) do
Enum.reduce(
completed_task_results,
{:recurse, reactor, state},
fn task_result, {status, reactor, state} ->
{new_status, reactor, state} = handle_completed_step(reactor, state, task_result)
new_current_tasks = Map.drop(state.current_tasks, Map.keys(completed_task_results))
if got_worse?(status, new_status) do
{new_status, reactor, state}
else
{status, reactor, state}
end
end
)
end
completed_step_results =
completed_task_results
|> Map.values()
|> Map.new()
defp got_worse?(:recurse, :undo), do: true
defp got_worse?(:recurse, :halt), do: true
defp got_worse?(:undo, :halt), do: true
defp got_worse?(_old, _new), do: false
retry_steps =
completed_step_results
|> Enum.filter(fn
{_, :retry} -> true
{_, {:retry, _}} -> true
_ -> false
end)
|> Enum.map(&elem(&1, 0))
steps_to_remove =
completed_step_results
|> Map.keys()
|> MapSet.new()
|> MapSet.difference(MapSet.new(retry_steps))
|> Enum.to_list()
steps_to_append =
completed_step_results
|> Map.values()
|> Enum.flat_map(fn
{:ok, _, steps} -> steps
_ -> []
end)
defp handle_completed_step(reactor, state, {task, step, {:ok, value, new_steps}}) do
state =
state
|> drop_task(task)
reactor =
reactor
|> store_successful_results_in_the_undo_stack(completed_step_results)
|> store_intermediate_results(completed_step_results)
|> delete_vertices(steps_to_remove)
|> append_steps(steps_to_append)
|> drop_from_plan(task)
|> maybe_store_undo(step, value)
|> maybe_store_intermediate_result(step, value)
reactor =
case Enum.split_with(new_steps, &(&1.name == step.name)) do
{[], new_steps} ->
reactor
|> drop_from_plan(step)
|> append_steps(new_steps)
{recursive_steps, new_steps} ->
recursive_steps = Enum.map(recursive_steps, &%{&1 | ref: step.ref})
reactor
|> append_steps(recursive_steps)
|> append_steps(new_steps)
end
{:recurse, reactor, state}
end
defp handle_completed_step(reactor, state, {task, step, {:retry, error}}) do
state =
state
|> increment_retry_counts(retry_steps)
|> collect_errors(completed_step_results)
|> increment_retries(step)
|> drop_task(task)
status =
completed_task_results
|> Enum.find_value(:ok, fn
{_task, {_step, {:halt, _}}} ->
:halt
reactor =
reactor
|> drop_from_plan(task)
{_task, {_step, {:error, _}}} ->
:undo
if Map.get(state.retries, step.ref) >= step.max_retries do
error =
error ||
RetriesExceededError.exception(
step: step,
retry_count: Map.get(state.retries, step.ref)
)
{_task, {step, :retry}} ->
if Map.get(state.retries, step.ref) >= step.max_retries,
do: :undo
_ ->
nil
end)
state = %{state | current_tasks: new_current_tasks}
case status do
:ok ->
{:recurse, reactor, state}
:undo ->
{reactor, state} = collect_remaining_tasks_for_shutdown(reactor, state)
{:undo, reactor, state}
:halt ->
{reactor, state} = collect_remaining_tasks_for_shutdown(reactor, state)
{:halt, reactor, state}
reactor = drop_from_plan(reactor, step)
{:undo, reactor, add_error(state, error)}
else
{:recurse, reactor, state}
end
end
defp get_normalised_task_results(current_tasks, timeout) do
defp handle_completed_step(reactor, state, {task, step, {:error, error}}) do
state =
state
|> drop_task(task)
|> add_error(error)
reactor =
reactor
|> drop_from_plan(task)
|> drop_from_plan(step)
{:undo, reactor, state}
end
defp handle_completed_step(reactor, state, {task, step, {:halt, value}}) do
state =
state
|> drop_task(task)
reactor =
reactor
|> drop_from_plan(task)
|> drop_from_plan(step)
|> store_intermediate_result(step, value)
{:halt, reactor, state}
end
defp get_normalised_task_results(%{current_tasks: current_tasks}, opts) do
current_tasks
|> Map.keys()
|> Task.yield_many(timeout)
|> Task.yield_many(opts)
|> Stream.reject(&is_nil(elem(&1, 1)))
|> Stream.map(fn
{task, {:ok, {:error, reason}}} ->
@ -179,7 +199,7 @@ defmodule Reactor.Executor.Async do
{task, {:halt, reason}}
{task, {:ok, :retry}} ->
{task, :retry}
{task, {:retry, nil}}
{task, {:ok, {:retry, reason}}} ->
{task, {:retry, reason}}
@ -190,11 +210,53 @@ defmodule Reactor.Executor.Async do
{task, {:exit, reason}} ->
{task, {:error, reason}}
end)
|> Map.new(fn {task, result} ->
{task, {Map.fetch!(current_tasks, task), result}}
|> Enum.map(fn {task, result} ->
{task, Map.fetch!(current_tasks, task), result}
end)
end
defp drop_task(state, task) do
ConcurrencyTracker.release(state.concurrency_key, 1)
%{state | current_tasks: Map.delete(state.current_tasks, task)}
end
defp increment_retries(state, step) do
%{state | retries: Map.update(state.retries, step.ref, 0, &(&1 + 1))}
end
defp drop_from_plan(reactor, step) do
%{reactor | plan: Graph.delete_vertex(reactor.plan, step)}
end
defp add_error(state, error) do
%{state | errors: [error | state.errors]}
end
defp store_intermediate_result(reactor, step, value) do
%{reactor | intermediate_results: Map.put(reactor.intermediate_results, step.name, value)}
end
defp maybe_store_undo(reactor, step, value) do
if Step.can?(step, :undo) do
%{reactor | undo: [{step, value} | reactor.undo]}
else
reactor
end
end
defp maybe_store_intermediate_result(reactor, step, value) when reactor.return == step.name do
store_intermediate_result(reactor, step, value)
end
defp maybe_store_intermediate_result(reactor, step, value) do
if Graph.out_degree(reactor.plan, step) > 0 do
store_intermediate_result(reactor, step, value)
else
reactor
end
end
defp store_successful_results_in_the_undo_stack(reactor, completed_step_results)
when map_size(completed_step_results) == 0,
do: reactor
@ -246,47 +308,6 @@ defmodule Reactor.Executor.Async do
}
end
defp increment_retry_counts(state, retry_steps) do
retries =
retry_steps
|> Enum.reduce(state.retries, fn step, retries ->
Map.update(retries, step.ref, 1, &(&1 + 1))
end)
%{state | retries: retries}
end
defp collect_errors(state, completed_step_results) do
errors =
completed_step_results
|> Enum.filter(fn
{_step, {:error, _}} ->
true
{step, {:retry, _}} ->
Map.get(state.retries, step.ref) >= step.max_retries
{step, :retry} ->
Map.get(state.retries, step.ref) >= step.max_retries
_ ->
false
end)
|> Enum.map(fn
{_step, {_, reason}} ->
reason
{step, :retry} ->
RetriesExceededError.exception(
step: step,
retry_count: Map.get(state.retries, step.ref)
)
end)
|> Enum.concat(state.errors)
%{state | errors: errors}
end
@doc """
When the Reactor needs to shut down for any reason, we need to await all the
currently running asynchronous steps and delete any task vertices.
@ -299,14 +320,16 @@ defmodule Reactor.Executor.Async do
end
def collect_remaining_tasks_for_shutdown(reactor, state) do
remaining_task_results = get_normalised_task_results(state.current_tasks, state.halt_timeout)
remaining_task_results =
get_normalised_task_results(state, timeout: state.halt_timeout, on_timeout: :ignore)
release_concurrency_resources_to_pool(state.concurrency_key, map_size(remaining_task_results))
release_concurrency_resources_to_pool(state.concurrency_key, length(remaining_task_results))
remaining_step_results =
remaining_task_results
|> Map.values()
|> Map.new()
|> Map.new(fn {_task, step, result} -> {step, result} end)
finished_tasks = remaining_step_results |> Enum.map(&elem(&1, 0))
reactor =
reactor
@ -315,7 +338,7 @@ defmodule Reactor.Executor.Async do
unfinished_tasks =
state.current_tasks
|> Map.delete(Map.keys(remaining_task_results))
|> Map.delete(finished_tasks)
unfinished_task_count = map_size(unfinished_tasks)
@ -327,7 +350,7 @@ defmodule Reactor.Executor.Async do
|> Enum.map_join("\n * ", &inspect/1)
"""
Waited #{state.halt_timeout}ms for async steps to complete, however #{unfinished_task_count} are still running and will be abandoned and cannot be undone.
Waited #{state.halt_timeout}ms for async steps to complete, however #{unfinished_task_count} are still running, will be abandoned and cannot be undone.
* #{unfinished_steps}
"""

View file

@ -53,7 +53,7 @@ defmodule Reactor.Executor.State do
|> maybe_set_max_concurrency()
|> maybe_allocate_concurrency_pool()
|> Map.put(:started_at, DateTime.utc_now())
|> then(&struct(__MODULE__, &1))
|> then(&struct!(__MODULE__, &1))
end
defp maybe_set_max_concurrency(attrs)

View file

@ -5,6 +5,7 @@ defmodule Reactor.Executor.StepRunner do
alias Reactor.{
Error.Invalid.ArgumentSubpathError,
Error.Invalid.CompensateStepError,
Error.Invalid.InvalidResultError,
Error.Invalid.MissingInputError,
Error.Invalid.MissingResultError,
Error.Invalid.RunStepError,
@ -20,6 +21,8 @@ defmodule Reactor.Executor.StepRunner do
import Reactor.Argument, only: :macros
require Logger
@dialyzer {:nowarn_function, handle_run_result: 5}
# In the future this could be moved into a step property.
@max_undo_count 5
@ -161,6 +164,16 @@ defmodule Reactor.Executor.StepRunner do
{:halt, value}
end
defp handle_run_result(result, reactor, step, arguments, _context) do
{:error,
InvalidResultError.exception(
reactor: reactor,
step: step,
result: result,
arguments: arguments
)}
end
defp maybe_compensate(reactor, step, error, arguments, context) do
if Step.can?(step, :compensate) do
compensate(reactor, step, error, arguments, context)

View file

@ -15,57 +15,72 @@ defmodule Reactor.Executor.Sync do
def run(reactor, state, nil), do: {:continue, reactor, state}
def run(reactor, state, step) do
case Executor.StepRunner.run(reactor, state, step, state.concurrency_key) do
:retry ->
state = increment_retries(state, step)
result = Executor.StepRunner.run(reactor, state, step, state.concurrency_key)
if Map.get(state.retries, step.ref) >= step.max_retries do
reactor = drop_from_plan(reactor, step)
handle_completed_step(reactor, state, step, result)
end
error =
RetriesExceededError.exception(
step: step,
retry_count: Map.get(state.retries, step.ref)
)
defp handle_completed_step(reactor, state, step, :retry) do
handle_completed_step(reactor, state, step, {:retry, nil})
end
{:undo, reactor, %{state | errors: [error | state.errors]}}
else
{:recurse, reactor, state}
end
defp handle_completed_step(reactor, state, step, {:retry, error}) do
state = increment_retries(state, step)
{:retry, reason} ->
state = increment_retries(state, step)
if Map.get(state.retries, step.ref) >= step.max_retries do
reactor = drop_from_plan(reactor, step)
if Map.get(state.retries, step.ref) >= step.max_retries do
reactor = drop_from_plan(reactor, step)
{:undo, reactor, %{state | errors: [reason | state.errors]}}
else
{:recurse, reactor, state}
end
error =
error ||
RetriesExceededError.exception(
step: step,
retry_count: Map.get(state.retries, step.ref)
)
{:ok, value, new_steps} ->
reactor =
{:undo, reactor, %{state | errors: [error | state.errors]}}
else
{:recurse, reactor, state}
end
end
defp handle_completed_step(reactor, state, step, {:ok, value, new_steps}) do
reactor =
reactor
|> maybe_store_undo(step, value)
|> maybe_store_intermediate_result(step, value)
reactor =
case Enum.split_with(new_steps, &(&1.name == step.name)) do
{[], new_steps} ->
reactor
|> maybe_store_undo(step, value)
|> maybe_store_intermediate_result(step, value)
|> drop_from_plan(step)
|> append_steps(new_steps)
{:recurse, reactor, state}
{recursive_steps, new_steps} ->
recursive_steps = Enum.map(recursive_steps, &%{&1 | ref: step.ref})
{:error, reason} ->
state = %{state | errors: [reason | state.errors]}
reactor = drop_from_plan(reactor, step)
{:undo, reactor, state}
{:halt, value} ->
reactor =
reactor
|> drop_from_plan(step)
|> store_intermediate_result(step, value)
|> append_steps(recursive_steps)
|> append_steps(new_steps)
end
{:halt, reactor, state}
end
{:recurse, reactor, state}
end
defp handle_completed_step(reactor, state, step, {:error, reason}) do
state = %{state | errors: [reason | state.errors]}
reactor = drop_from_plan(reactor, step)
{:undo, reactor, state}
end
defp handle_completed_step(reactor, state, step, {:halt, value}) do
reactor =
reactor
|> drop_from_plan(step)
|> store_intermediate_result(step, value)
{:halt, reactor, state}
end
defp increment_retries(state, step) do
@ -84,16 +99,15 @@ defmodule Reactor.Executor.Sync do
end
end
defp maybe_store_intermediate_result(reactor, step, value) when reactor.return == step.name do
store_intermediate_result(reactor, step, value)
end
defp maybe_store_intermediate_result(reactor, step, value) do
cond do
Graph.out_degree(reactor.plan, step) > 0 ->
store_intermediate_result(reactor, step, value)
reactor.return == step.name ->
store_intermediate_result(reactor, step, value)
true ->
reactor
if Graph.out_degree(reactor.plan, step) > 0 do
store_intermediate_result(reactor, step, value)
else
reactor
end
end

View file

@ -50,15 +50,22 @@ defmodule Reactor.Planner do
steps_by_name =
graph
|> Graph.vertices()
|> Enum.concat(steps)
|> Stream.filter(&is_struct(&1, Step))
|> Stream.concat(steps)
|> Map.new(&{&1.name, &1})
steps
|> reduce_while_ok(graph, fn
step, graph when is_struct(step, Step) ->
graph
|> Graph.add_vertex(step, step.name)
|> reduce_arguments_into_graph(step, steps_by_name)
if Graph.has_vertex?(graph, step) do
graph
|> Graph.replace_vertex(step, step)
|> reduce_arguments_into_graph(step, steps_by_name)
else
graph
|> Graph.add_vertex(step, step.name)
|> reduce_arguments_into_graph(step, steps_by_name)
end
not_step, _ ->
{:error,
@ -72,7 +79,7 @@ defmodule Reactor.Planner do
defp reduce_arguments_into_graph(graph, current_step, steps_by_name) do
reduce_while_ok(current_step.arguments, graph, fn
argument, graph when is_argument(argument) and is_from_result(argument) ->
argument, graph when is_from_result(argument) ->
dependency_name = argument.source.name
case Map.fetch(steps_by_name, dependency_name) do
@ -95,8 +102,7 @@ defmodule Reactor.Planner do
)}
end
argument, graph
when is_argument(argument) and (is_from_input(argument) or is_from_value(argument)) ->
argument, graph when is_from_input(argument) or is_from_value(argument) ->
{:ok, graph}
end)
end

352
lib/reactor/step/map.ex Normal file
View file

@ -0,0 +1,352 @@
defmodule Reactor.Step.Map do
use Reactor.Step
require Reactor.Argument
require Reactor.Error.Internal.UnreachableError
require Iter
alias Reactor.{Argument, Builder, Error.Internal.UnreachableError, Step, Template}
alias Spark.Options
import Reactor.Utils
@option_schema [
state: [
type: {:in, [:init, :iterating]},
required: true,
doc: """
The current execution state of the map. This is required because it's recursive.
"""
],
batch_size: [
type: :pos_integer,
required: false,
default: 100,
doc: """
The number of elements to consume off the source when emitting steps.
"""
],
steps: [
type: {:list, {:struct, Step}},
required: true,
doc: """
The steps to use when mapping source elements.
"""
],
return: [
type: :atom,
required: true,
doc: """
The name of the step whose result will be used as the new value for each element.
"""
],
strict_ordering?: [
type: :boolean,
required: false,
default: true,
doc: """
Whether the mapped values must be returned in the same order that they were provided.
"""
],
allow_async?: [
type: :boolean,
required: false,
default: true,
doc: """
Whether the emitted steps should be allowed to run asynchronously.
"""
],
descendant_step_names: [
type: {:struct, MapSet},
required: false,
doc: """
The cached names of all descendant steps to aid rewriting. You don't need to provide this value - it is calculated by the init pass.
"""
],
extra_arguments: [
type: {:list, {:struct, Argument}},
required: false,
doc: """
Extra arguments to be passed by to every nested step.
"""
]
]
@moduledoc """
Given an iterable input run the provided steps for each element and collect
the results into a new value.
> #### A note on ordering {: .tip}
>
> If your application doesn't need the results back in the same order that
> they were provided then setting `strict_ordering?` to `false` will increase
> performance - especially on large input sets.
## Options
#{Options.docs(@option_schema)}
"""
@doc false
@impl true
def run(arguments, context, options) do
with {:ok, options} <- Options.validate(options, @option_schema) do
case options[:state] do
:init -> do_init(arguments.source, arguments, options, context.current_step)
:iterating -> do_iterate(arguments, options, context.current_step)
end
end
end
defp do_init(source, arguments, options, map_step) when Iter.is_iter(source) do
source =
source
|> Iter.with_index()
extra_arguments =
arguments
|> Map.drop([:source, :result])
|> Enum.map(fn {name, value} ->
Argument.from_value(name, value)
end)
options =
options
|> Keyword.put_new_lazy(:descendant_step_names, fn ->
collect_all_step_names(options[:steps])
end)
|> Keyword.put(:state, :iterating)
|> Keyword.put(:extra_arguments, extra_arguments)
emit_batch(source, options, map_step, [])
end
defp do_init(source, arguments, options, map_step) do
source
|> Iter.from()
|> do_init(arguments, options, map_step)
end
defp do_iterate(arguments, options, map_step) do
{source, arguments} = Map.pop!(arguments, :source)
{result, arguments} = Map.pop!(arguments, :result)
map_step_name = map_step.name
result =
Enum.reduce(arguments, result, fn {{__MODULE__, ^map_step_name, :element, index}, value},
result ->
[{index, value} | result]
end)
emit_batch(source, options, map_step, result)
end
defp collect_all_step_names(steps, into \\ MapSet.new())
defp collect_all_step_names([], into), do: into
defp collect_all_step_names([%{steps: [_ | _] = child_steps} = step | steps], into) do
into = collect_all_step_names(child_steps, MapSet.put(into, step.name))
collect_all_step_names(steps, into)
end
defp collect_all_step_names([step | steps], into),
do: collect_all_step_names(steps, MapSet.put(into, step.name))
defp emit_batch(source, options, map_step, result) do
with {:done, batch} <- Iter.take_chunk(source, options[:batch_size]),
{:done, []} <- {:done, Iter.to_list(batch)} do
finalise_result(result, options)
else
{:ok, batch, remainder} -> do_emit_batch(batch, remainder, options, map_step, result)
{:done, batch} -> do_emit_batch(batch, Iter.empty(), options, map_step, result)
end
end
defp do_emit_batch(batch, remainder, options, map_step, result) do
with {:ok, arguments} <- arguments_for_batch(batch, options, map_step),
{:ok, recursive_step} <-
Builder.new_step(
map_step.name,
{__MODULE__, options},
Enum.concat(arguments, [
Argument.from_value(:source, remainder),
Argument.from_result(:result, map_step.name)
])
),
{:ok, steps} <- steps_for_batch(batch, options, map_step) do
steps = Enum.concat(steps, [recursive_step])
{:ok, result, steps}
end
end
defp finalise_result(result, options) do
if options[:strict_ordering?] do
result =
result
|> Enum.sort_by(&elem(&1, 0))
|> Enum.map(&elem(&1, 1))
{:ok, result}
else
{:ok, Map.values(result)}
end
end
# generate a whole heap of arguments for the recursive step so that it can
# collect up the whole batch.
defp arguments_for_batch(batch, options, map_step) do
arguments =
Enum.map(batch, fn {_element, index} ->
%Argument{
name: {__MODULE__, map_step.name, :element, index},
source: %Template.Result{name: {__MODULE__, map_step.name, options[:return], index}}
}
end)
{:ok, arguments}
end
defp steps_for_batch(batch, options, map_step) do
steps = options[:steps]
descendant_step_names = options[:descendant_step_names]
extra_arguments = options[:extra_arguments]
reduce_while_ok(batch, [], fn {element, index}, result ->
case rewrite_steps_for_element(
{element, index},
steps,
descendant_step_names,
map_step,
extra_arguments,
options[:allow_async?]
) do
{:ok, steps} -> reduce_while_ok(steps, result, &{:ok, [&1 | &2]})
{:error, reason} -> {:error, reason}
end
end)
end
defp rewrite_steps_for_element(
{element, index},
steps,
descendant_step_names,
map_step,
extra_arguments,
allow_async?
) do
map_while_ok(
steps,
&rewrite_step_for_element(
&1,
{element, index},
descendant_step_names,
map_step,
extra_arguments,
allow_async?
)
)
end
defp rewrite_step_for_element(
step,
{element, index},
descendant_step_names,
map_step,
extra_arguments,
allow_async?
) do
with {:ok, step} <-
rewrite_arguments(
step,
{element, index},
descendant_step_names,
map_step
),
{:ok, step} <-
rewrite_nested_steps_for_element(
step,
{element, index},
descendant_step_names,
map_step,
extra_arguments,
allow_async?
) do
{:ok,
%{
step
| arguments: Enum.concat(step.arguments, extra_arguments),
name: {__MODULE__, map_step.name, step.name, index},
ref: {__MODULE__, map_step.name, step.ref, index},
async?: allow_async?
}}
end
end
defp rewrite_arguments(step, {element, index}, descendant_step_names, map_step) do
map_while_ok(step.arguments, fn
argument
when Argument.is_from_element(argument) and argument.source.name != map_step.name ->
{:error,
UnreachableError.unreachable(
"Attempted to retrieve an element whose source doesn't match the current map step: #{inspect(argument.source.name)} vs #{inspect(map_step.name)}"
)}
argument when Argument.is_from_element(argument) ->
argument =
argument.name
|> Argument.from_value(element)
|> Argument.sub_path(argument.source.sub_path)
{:ok, argument}
argument when Argument.is_from_result(argument) ->
if MapSet.member?(descendant_step_names, argument.source.name) do
argument = %{
argument
| source: %{
argument.source
| name: {__MODULE__, map_step.name, argument.source.name, index}
}
}
{:ok, argument}
else
{:ok, argument}
end
argument ->
{:ok, argument}
end)
|> and_then(&{:ok, %{step | arguments: &1}})
end
defp rewrite_nested_steps_for_element(
%{steps: [_ | _] = steps} = step,
{element, index},
descendant_step_names,
map_step,
extra_arguments,
allow_async?
) do
with {:ok, steps} <-
rewrite_steps_for_element(
{element, index},
steps,
descendant_step_names,
map_step,
extra_arguments,
allow_async?
) do
{:ok, %{step | steps: steps}}
end
end
defp rewrite_nested_steps_for_element(
step,
_element_index,
_descendant_step_names,
_map_step,
_extra_arguments,
_allow_async?
),
do: {:ok, step}
end

View file

@ -2,14 +2,13 @@ defmodule Reactor.Template do
@moduledoc """
Templates used to refer to some sort of computed value.
"""
alias __MODULE__.{Element, Input, Result, Value}
alias __MODULE__.{Input, Result, Value}
@type t :: Input.t() | Result.t() | Value.t()
@type t :: Element.t() | Input.t() | Result.t() | Value.t()
@doc "The type for use in option schemas"
@spec type :: Spark.Options.type()
def type, do: {:or, [{:struct, Input}, {:struct, Result}, {:struct, Value}]}
def type, do: {:or, [{:struct, Element}, {:struct, Input}, {:struct, Result}, {:struct, Value}]}
@doc "A guard for input templates"
@spec is_input_template(any) :: Macro.output()
@ -23,9 +22,13 @@ defmodule Reactor.Template do
@spec is_value_template(any) :: Macro.output()
defguard is_value_template(template) when is_struct(template, Value)
@doc "A guard for element templates"
@spec is_element_template(any) :: Macro.output()
defguard is_element_template(template) when is_struct(template, Element)
@doc "A guard to detect all template types"
@spec is_template(any) :: Macro.output()
defguard is_template(template)
when is_input_template(template) or is_result_template(template) or
is_value_template(template)
is_value_template(template) or is_element_template(template)
end

View file

@ -0,0 +1,9 @@
defmodule Reactor.Template.Element do
@moduledoc """
The `element` template.
"""
defstruct name: nil, sub_path: []
@type t :: %__MODULE__{name: atom, sub_path: [atom]}
end

View file

@ -3,7 +3,7 @@ defmodule Reactor.Template.Value do
A statically `value` template.
"""
defstruct value: nil
defstruct value: nil, sub_path: []
@type t :: %__MODULE__{value: any}
@type t :: %__MODULE__{value: any, sub_path: [any]}
end

View file

@ -98,6 +98,41 @@ defmodule Reactor.Utils do
end
end
@doc """
Perform a flat map over an enumerable provided that the mapper function
continues to return ok tuples.
"""
@spec flat_map_while_ok(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) ::
{:ok, Enumerable.t(output)} | {:error, any}
when input: any, output: any
def flat_map_while_ok(inputs, mapper, preserve_order? \\ false)
def flat_map_while_ok(inputs, mapper, false) do
reduce_while_ok(inputs, [], fn input, acc ->
case mapper.(input) do
{:ok, result} -> reduce_while_ok(result, acc, &[&1 | &2])
{:error, reason} -> {:error, reason}
end
end)
end
def flat_map_while_ok(inputs, mapper, true) do
inputs
|> flat_map_while_ok(mapper, false)
|> and_then(&{:ok, Enum.reverse(&1)})
end
@doc "Raising version of `flat_map_while_ok/3`"
@spec flat_map_while_ok!(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) ::
Enumerable.t(output) | no_return
when input: any, output: any
def flat_map_while_ok!(inputs, mapper, preserve_order? \\ false) do
case flat_map_while_ok(inputs, mapper, preserve_order?) do
{:ok, result} -> result
{:error, reason} -> raise reason
end
end
@doc """
Perform a reduction over an enumerable provided that the reduction function
returns an ok tuple.

View file

@ -95,6 +95,7 @@ defmodule Reactor.MixProject do
{:splode, "~> 0.2"},
{:libgraph, "~> 0.16"},
{:igniter, "~> 0.2"},
{:iterex, "~> 0.1"},
{:telemetry, "~> 1.2"},
# Dev/Test dependencies

View file

@ -16,6 +16,7 @@
"glob_ex": {:hex, :glob_ex, "0.1.7", "eae6b6377147fb712ac45b360e6dbba00346689a87f996672fe07e97d70597b1", [:mix], [], "hexpm", "decc1c21c0c73df3c9c994412716345c1692477b9470e337f628a7e08da0da6a"},
"hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"},
"igniter": {:hex, :igniter, "0.2.10", "078a1308924e2cffce7956b00e01794ec218ed09cea969d0a9911ee91f885c99", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.4", [hex: :req, repo: "hexpm", optional: false]}, {:rewrite, "~> 0.9", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "071080c4676226764b9513138f185e842993ff95bbf483e78536db50396986eb"},
"iterex": {:hex, :iterex, "0.1.1", "90378a9561ad87da46737dceaf02e68a0b3023746216a4de34a0c509f5f505d4", [:mix], [], "hexpm", "c4f5916a6dbb03aa4c3d5c480069e13075ca6a57bd0c28d643da3891962440ad"},
"jason": {:hex, :jason, "1.4.3", "d3f984eeb96fe53b85d20e0b049f03e57d075b5acda3ac8d465c969a2536c17b", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9a90e868927f7c777689baa16d86f4d0e086d968db5c05d917ccff6d443e58a3"},
"libgraph": {:hex, :libgraph, "0.16.0", "3936f3eca6ef826e08880230f806bfea13193e49bf153f93edcf0239d4fd1d07", [:mix], [], "hexpm", "41ca92240e8a4138c30a7e06466acc709b0cbb795c643e9e17174a178982d6bf"},
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},

View file

@ -0,0 +1,36 @@
defmodule Reactor.Dsl.MapTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule MapOverNumbersReactor do
@moduledoc false
use Reactor
input :numbers
step :multiplier do
run fn _ -> {:ok, 2} end
end
map :map_over_numbers do
source(input(:numbers))
argument :multiplier, result(:multiplier)
batch_size(2)
step :double do
argument :input, element(:map_over_numbers)
run fn %{input: input, multiplier: multiplier}, _ ->
{:ok, input * multiplier}
end
end
end
end
test "it maps over it's inputs" do
numbers = [0, 2, 4, 6, 8, 10]
assert {:ok, [0, 4, 8, 12, 16, 20]} =
Reactor.run!(MapOverNumbersReactor, %{numbers: numbers}, %{}, async?: false)
end
end

View file

@ -204,12 +204,14 @@ defmodule Reactor.Executor.AsyncTest do
assert Graph.has_vertex?(reactor.plan, step)
end
test "when one of the steps asks to retry, it increments the retry count for the step",
test "when one of the steps asks to retry, it sets the retry count for the step",
%{reactor: reactor, state: state, doable: doable, supervisor: supervisor} do
task = Task.Supervisor.async_nolink(supervisor, fn -> :retry end)
state = %{state | current_tasks: %{task => doable}}
refute is_map_key(state.retries, doable.ref)
assert {_, _reactor, state} = handle_completed_steps(reactor, state)
assert state.retries[doable.ref] == 1
assert state.retries[doable.ref] == 0
end
test "when one of the steps asks to retry (again), it increments the retry count for the step",

View file

@ -2,7 +2,7 @@ defmodule Reactor.Executor.InitTest do
@moduledoc false
use ExUnit.Case, async: true
import Reactor.Executor.Init
alias Reactor.{Builder, Executor}
alias Reactor.Builder
use Mimic
describe "init/4" do
@ -52,14 +52,5 @@ defmodule Reactor.Executor.InitTest do
assert {:error, error} = init(reactor, [], [], [:wat])
assert Exception.message(error) =~ "cannot be converted into a map"
end
test "options are passed into `State.init/1`", %{reactor: reactor} do
Executor.State
|> expect(:init, fn options ->
assert options.hello == :marty
end)
assert {:ok, _reactor, _state} = init(reactor, [], [], hello: :marty)
end
end
end

View file

@ -1,5 +1,4 @@
Mimic.copy(Example.Step.Compensable)
Mimic.copy(Example.Step.Doable)
Mimic.copy(Example.Step.Undoable)
Mimic.copy(Reactor.Executor.State)
ExUnit.start()