From e8ac9a1d90a19ae09a7a10caea51f11cdf336d23 Mon Sep 17 00:00:00 2001 From: James Harton Date: Thu, 18 Jul 2024 21:56:46 +1200 Subject: [PATCH] feat(map): Add the ability to map over elements of a collection inside a reactor. (#123) * feat(map): Add the ability to map over elements of a collection inside a reactor. * improvement: throw a more helpful error when a step returns an invalid result. * fix: automatically pass extra arguments from the map step to nested steps. They can't be referred to directly because they may be added to the graph at runtime, and may depend on steps which have been completed and their results discarded because they have no dependents at that time. * fix: spurious test failures seemingly caused by `Mimic`. --- .formatter.exs | 5 + .vscode/settings.json | 2 + documentation/dsls/DSL:-Reactor.md | 219 ++++++++++- lib/reactor/argument.ex | 59 ++- lib/reactor/builder/step.ex | 3 +- lib/reactor/dsl.ex | 1 + lib/reactor/dsl/argument.ex | 31 ++ lib/reactor/dsl/around.ex | 14 +- lib/reactor/dsl/build.ex | 9 - lib/reactor/dsl/group.ex | 16 +- lib/reactor/dsl/info.ex | 61 +++ lib/reactor/dsl/map.ex | 235 ++++++++++++ lib/reactor/dsl/switch.ex | 12 +- lib/reactor/dsl/transformer.ex | 7 - lib/reactor/dsl/verifier.ex | 87 ++++- .../error/internal/unreachable_error.ex | 161 ++++++++ .../error/invalid/invalid_result_error.ex | 38 ++ lib/reactor/executor.ex | 19 +- lib/reactor/executor/async.ex | 273 +++++++------- lib/reactor/executor/state.ex | 2 +- lib/reactor/executor/step_runner.ex | 13 + lib/reactor/executor/sync.ex | 106 +++--- lib/reactor/planner.ex | 20 +- lib/reactor/step/map.ex | 352 ++++++++++++++++++ lib/reactor/template.ex | 13 +- lib/reactor/template/element.ex | 9 + lib/reactor/template/value.ex | 4 +- lib/reactor/utils.ex | 35 ++ mix.exs | 1 + mix.lock | 1 + test/reactor/dsl/map_test.exs | 36 ++ test/reactor/executor/async_test.exs | 6 +- test/reactor/executor/init_test.exs | 11 +- test/test_helper.exs | 1 - 34 files changed, 1612 insertions(+), 250 deletions(-) create mode 100644 lib/reactor/dsl/info.ex create mode 100644 lib/reactor/dsl/map.ex create mode 100644 lib/reactor/error/internal/unreachable_error.ex create mode 100644 lib/reactor/error/invalid/invalid_result_error.ex create mode 100644 lib/reactor/step/map.ex create mode 100644 lib/reactor/template/element.ex create mode 100644 test/reactor/dsl/map_test.exs diff --git a/.formatter.exs b/.formatter.exs index 8b251d3..549d28d 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -9,6 +9,7 @@ spark_locals_without_parens = [ around: 2, around: 3, async?: 1, + batch_size: 1, before_all: 1, collect: 1, collect: 2, @@ -24,6 +25,8 @@ spark_locals_without_parens = [ input: 1, input: 2, level: 1, + map: 1, + map: 2, matches?: 1, matches?: 2, max_retries: 1, @@ -32,9 +35,11 @@ spark_locals_without_parens = [ on: 1, return: 1, run: 1, + source: 1, step: 1, step: 2, step: 3, + strict_ordering?: 1, switch: 1, switch: 2, transform: 1, diff --git a/.vscode/settings.json b/.vscode/settings.json index d3e22f1..c9c0d16 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,9 +4,11 @@ "backoff", "casted", "Desugars", + "lvalue", "mappish", "noreply", "Planable", + "rvalue", "splode", "Splode" ] diff --git a/documentation/dsls/DSL:-Reactor.md b/documentation/dsls/DSL:-Reactor.md index 1cad18a..444eb49 100644 --- a/documentation/dsls/DSL:-Reactor.md +++ b/documentation/dsls/DSL:-Reactor.md @@ -27,6 +27,9 @@ The top-level reactor DSL * argument * wait_for * [input](#reactor-input) + * [map](#reactor-map) + * argument + * wait_for * [step](#reactor-step) * argument * wait_for @@ -106,7 +109,7 @@ Wrap a function around a group of steps. | Name | Type | Default | Docs | |------|------|---------|------| -| [`name`](#reactor-around-name){: #reactor-around-name .spark-required} | `atom` | | A unique name of the group of steps. | +| [`name`](#reactor-around-name){: #reactor-around-name .spark-required} | `atom` | | A unique name for the group of steps. | | [`fun`](#reactor-around-fun){: #reactor-around-fun .spark-required} | `(any, any, any, any -> any) \| mfa` | | The around function. See `Reactor.Step.Around` for more information. | ### Options @@ -171,7 +174,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-around-argument-name){: #reactor-around-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-around-argument-source){: #reactor-around-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-around-argument-source){: #reactor-around-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -327,7 +330,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-collect-argument-name){: #reactor-collect-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-collect-argument-source){: #reactor-collect-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-collect-argument-source){: #reactor-collect-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -467,7 +470,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-compose-argument-name){: #reactor-compose-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-compose-argument-source){: #reactor-compose-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-compose-argument-source){: #reactor-compose-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -616,7 +619,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-debug-argument-name){: #reactor-debug-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-debug-argument-source){: #reactor-debug-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-debug-argument-source){: #reactor-debug-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -759,7 +762,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-group-argument-name){: #reactor-group-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-group-argument-source){: #reactor-group-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-group-argument-source){: #reactor-group-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -867,6 +870,206 @@ end Target: `Reactor.Dsl.Input` +## reactor.map +```elixir +map name +``` + + +Execute nested steps for every item of an iterator. + +Allows you to "map over" a collection applying a some steps to each element, +returning a list of new values. The input collection must be bounded - ie you +cannot pass infinite streams into this step or it will just loop forever - and +because it has to keep the results from each batch will eventually just use up +all available RAM. + +Under the hood we use `Iter` to handle lazy iteration of the collection. This +means that you can pass an `Iter.t` or any value for which `Iter.IntoIterable` +is implemented. + +> #### A note on ordering {: .tip} +> +> If your application doesn't need the results back in the same order that they +> were provided then setting `strict_ordering?` to `false` will increase +> performance - especially on large input sets. + + +### Nested DSLs + * [argument](#reactor-map-argument) + * [wait_for](#reactor-map-wait_for) + + +### Examples +``` +map :double_numbers do + input input(:numbers) + + step :double do + argument :number, element(:double_numbers) + + run %{number: number}, _, _ -> + {:ok, number * 2} + end + end +end + +``` + +``` +step :get_subscriptions do + run _, _, _ -> + Stripe.Subscription.list() + end +end + +map :cancel_subscriptions do + input result(:get_subscriptions) + + step :cancel do + argument :sub_id, element(:cancel_subscriptions, [:id]) + + run fn args, _, _ -> + Stripe.Subscription.cancel(arg.sub_id, %{prorate: true, invoice_now: true}) + end + end + + return :cancel +end + +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`name`](#reactor-map-name){: #reactor-map-name .spark-required} | `atom` | | A unique name for the step. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`source`](#reactor-map-source){: #reactor-map-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The iterator or enumerable to use as the source of the iteration. | +| [`allow_async?`](#reactor-map-allow_async?){: #reactor-map-allow_async? } | `boolean` | `false` | Whether the emitted steps should be allowed to run asynchronously. | +| [`batch_size`](#reactor-map-batch_size){: #reactor-map-batch_size } | `pos_integer` | `100` | The number of items to consume off the source when emitting steps. | +| [`return`](#reactor-map-return){: #reactor-map-return } | `atom` | | The name of the nested step to use as the return value. | +| [`strict_ordering?`](#reactor-map-strict_ordering?){: #reactor-map-strict_ordering? } | `boolean` | `true` | Whether the mapped values must be returned in the same order that they were provided. | + + +## reactor.map.argument +```elixir +argument name, source \\ nil +``` + + +Specifies an argument to a Reactor step. + +Each argument is a value which is either the result of another step, or an input value. + +Individual arguments can be transformed with an arbitrary function before +being passed to any steps. + + + + +### Examples +``` +argument :name, input(:name) + +``` + +``` +argument :year, input(:date, [:year]) + +``` + +``` +argument :user, result(:create_user) + +``` + +``` +argument :user_id, result(:create_user) do + transform & &1.id +end + +``` + +``` +argument :user_id, result(:create_user, [:id]) + +``` + +``` +argument :three, value(3) + +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`name`](#reactor-map-argument-name){: #reactor-map-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | +| [`source`](#reactor-map-argument-source){: #reactor-map-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`transform`](#reactor-map-argument-transform){: #reactor-map-argument-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the argument before it is passed to the step. | + + + + + +### Introspection + +Target: `Reactor.Dsl.Argument` + +## reactor.map.wait_for +```elixir +wait_for names +``` + + +Wait for the named step to complete before allowing this one to start. + +Desugars to `argument :_, result(step_to_wait_for)` + + + + +### Examples +``` +wait_for :create_user +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`names`](#reactor-map-wait_for-names){: #reactor-map-wait_for-names .spark-required} | `atom \| list(atom)` | | The name of the step to wait for. | + + + + + + +### Introspection + +Target: `Reactor.Dsl.WaitFor` + + + + +### Introspection + +Target: `Reactor.Dsl.Map` + ## reactor.step ```elixir step name, impl \\ nil @@ -983,7 +1186,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-step-argument-name){: #reactor-step-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-step-argument-source){: #reactor-step-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-step-argument-source){: #reactor-step-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -1067,7 +1270,7 @@ Use a predicate to determine which steps should be executed. | Name | Type | Default | Docs | |------|------|---------|------| -| [`on`](#reactor-switch-on){: #reactor-switch-on .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The value to match against. | +| [`on`](#reactor-switch-on){: #reactor-switch-on .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The value to match against. | | [`allow_async?`](#reactor-switch-allow_async?){: #reactor-switch-allow_async? } | `boolean` | `true` | Whether the emitted steps should be allowed to run asynchronously. | diff --git a/lib/reactor/argument.ex b/lib/reactor/argument.ex index d651d48..9b381c8 100644 --- a/lib/reactor/argument.ex +++ b/lib/reactor/argument.ex @@ -6,6 +6,7 @@ defmodule Reactor.Argument do defstruct name: nil, source: nil, transform: nil alias Reactor.{Argument, Template} + import Reactor.Template, only: :macros @type t :: %Argument{ name: atom, @@ -64,6 +65,49 @@ defmodule Reactor.Argument do def from_value(name, value, transform \\ nil) when is_atom(name) and maybe_transform(transform), do: %Argument{name: name, source: %Template.Value{value: value}, transform: transform} + @doc """ + Build an argument which refers to to an element within a map step with an optional transformation applied. + + ## Example + + iex> Argument.from_element(:argument_name, &Atom.to_string/1) + + """ + @spec from_element(atom, atom, nil | (any -> any)) :: Argument.t() + def from_element(name, element_name, transform \\ nil) + when is_atom(name) and maybe_transform(transform), + do: %Argument{ + name: name, + source: %Template.Element{name: element_name}, + transform: transform + } + + @doc """ + Build an argument directly from a template. + + ## Example + + iex> Argument.from_template(:argument_name, Reactor.Dsl.Argument.input(:input_name)) + + """ + @spec from_template(atom, Template.t(), nil | (any -> any)) :: Argument.t() + def from_template(name, template, transform \\ nil) + when is_atom(name) and is_template(template) and maybe_transform(transform), + do: %Argument{name: name, source: template, transform: transform} + + @doc """ + Set a sub-path on the argument. + + ## Example + + iex> Argument.from_value(:example, :value) + ...> |> Argument.sub_path([:nested, :values]) + + """ + @spec sub_path(Argument.t(), [any]) :: Argument.t() + def sub_path(argument, sub_path), + do: %{argument | source: %{argument.source | sub_path: sub_path}} + @doc """ Validate that the argument is an Argument struct. """ @@ -72,17 +116,26 @@ defmodule Reactor.Argument do @doc """ Validate that the argument refers to a reactor input. """ - defguard is_from_input(argument) when is_struct(argument.source, Template.Input) + defguard is_from_input(argument) + when is_argument(argument) and is_input_template(argument.source) @doc """ Validate that the argument refers to a step result. """ - defguard is_from_result(argument) when is_struct(argument.source, Template.Result) + defguard is_from_result(argument) + when is_argument(argument) and is_result_template(argument.source) @doc """ Validate that the argument contains a static value. """ - defguard is_from_value(argument) when is_struct(argument.source, Template.Value) + defguard is_from_value(argument) + when is_argument(argument) and is_value_template(argument.source) + + @doc """ + Validate that the argument contains an element. + """ + defguard is_from_element(argument) + when is_argument(argument) and is_element_template(argument.source) @doc """ Validate that the argument has a transform. diff --git a/lib/reactor/builder/step.ex b/lib/reactor/builder/step.ex index c5b74e3..5f560d4 100644 --- a/lib/reactor/builder/step.ex +++ b/lib/reactor/builder/step.ex @@ -224,7 +224,8 @@ defmodule Reactor.Builder.Step do {:ok, argument} end - argument when is_from_result(argument) or is_from_value(argument) -> + argument + when is_from_result(argument) or is_from_value(argument) or is_from_element(argument) -> {:ok, argument} end) end diff --git a/lib/reactor/dsl.ex b/lib/reactor/dsl.ex index 4dc3d2d..b213f4a 100644 --- a/lib/reactor/dsl.ex +++ b/lib/reactor/dsl.ex @@ -30,6 +30,7 @@ defmodule Reactor.Dsl do Dsl.Debug.__entity__(), Dsl.Group.__entity__(), Dsl.Input.__entity__(), + Dsl.Map.__entity__(), Dsl.Step.__entity__(), Dsl.Switch.__entity__() ], diff --git a/lib/reactor/dsl/argument.ex b/lib/reactor/dsl/argument.ex index 64f6de1..5c74902 100644 --- a/lib/reactor/dsl/argument.ex +++ b/lib/reactor/dsl/argument.ex @@ -117,6 +117,37 @@ defmodule Reactor.Dsl.Argument do @spec value(any) :: Template.Value.t() def value(value), do: %Template.Value{value: value} + @doc ~S""" + The `element` template helper for the Reactor DSL. + + ## Example + + ```elixir + defmodule ExampleReactor do + use Reactor + + input :numbers + + map :double_numbers do + source input(:numbers) + + step :double do + argument :number, element(:double_numbers) + + run fn args, _, _ -> + {:ok, args.number * 2} + end + end + + return :double + end + end + ``` + """ + @spec element(any, [any]) :: Template.Element.t() + def element(name, sub_path \\ []) + def element(name, sub_path), do: %Template.Element{name: name, sub_path: List.wrap(sub_path)} + @doc false def __entity__, do: %Spark.Dsl.Entity{ diff --git a/lib/reactor/dsl/around.ex b/lib/reactor/dsl/around.ex index 9e49280..fd8d913 100644 --- a/lib/reactor/dsl/around.ex +++ b/lib/reactor/dsl/around.ex @@ -39,7 +39,7 @@ defmodule Reactor.Dsl.Around do type: :atom, required: true, doc: """ - A unique name of the group of steps. + A unique name for the group of steps. """ ], fun: [ @@ -91,9 +91,15 @@ defmodule Reactor.Dsl.Around do )} end - def verify(_around, _dsl_state), do: :ok - - def transform(_around, dsl_state), do: {:ok, dsl_state} + def verify(around, dsl_state) do + around.steps + |> Enum.reduce_while(:ok, fn step, :ok -> + case Dsl.Build.verify(step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end defp build_inputs(reactor, around) do around.arguments diff --git a/lib/reactor/dsl/build.ex b/lib/reactor/dsl/build.ex index a68e8df..ffc69f8 100644 --- a/lib/reactor/dsl/build.ex +++ b/lib/reactor/dsl/build.ex @@ -13,15 +13,6 @@ defprotocol Reactor.Dsl.Build do @spec build(t, Reactor.t()) :: {:ok, Reactor.t()} | {:error, any} def build(entity, reactor) - @doc """ - Perform any transformation that is needed to make the entity work in the - system before building. - - See `Spark.Dsl.Transformer` for more information. - """ - @spec transform(t, Spark.Dsl.t()) :: {:ok, Spark.Dsl.t()} | {:error, any} - def transform(entity, dsl_state) - @doc """ Perform any after-compilation verification that is needed to make the entity work. diff --git a/lib/reactor/dsl/group.ex b/lib/reactor/dsl/group.ex index 5c4928e..700fd39 100644 --- a/lib/reactor/dsl/group.ex +++ b/lib/reactor/dsl/group.ex @@ -103,12 +103,18 @@ defmodule Reactor.Dsl.Group do )} end - def verify(_group, _dsl_state), do: :ok + def verify(group, dsl_state) do + group.steps + |> Enum.reduce_while(:ok, fn step, :ok -> + case Dsl.Build.verify(step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end - def transform(_around, dsl_state), do: {:ok, dsl_state} - - defp build_inputs(reactor, around) do - around.arguments + defp build_inputs(reactor, group) do + group.arguments |> Enum.map(& &1.name) |> reduce_while_ok(reactor, &Builder.add_input(&2, &1)) end diff --git a/lib/reactor/dsl/info.ex b/lib/reactor/dsl/info.ex new file mode 100644 index 0000000..dd1f3e5 --- /dev/null +++ b/lib/reactor/dsl/info.ex @@ -0,0 +1,61 @@ +defmodule Reactor.Dsl.Info do + @moduledoc """ + Introspection for the Reactor DSL. + """ + use Spark.InfoGenerator, sections: [:reactor], extension: Reactor.Dsl + + alias Reactor.{Builder, Dsl} + alias Spark.Dsl.Extension + import Reactor.Utils + + @doc """ + Convert a reactor DSL module into a reactor struct. + """ + @spec to_struct(module | Reactor.t() | Spark.Dsl.t()) :: {:ok, Reactor.t()} | {:error, any} + def to_struct(reactor) when is_struct(reactor, Reactor), do: {:ok, reactor} + + def to_struct(module) do + with {:ok, reactor} <- entities_to_struct(module), + {:ok, reactor} <- maybe_set_return(module, reactor) do + add_middleware(module, reactor) + end + end + + @doc """ + Raising version of `to_struct/1`. + """ + @spec to_struct!(module | Reactor.t() | Spark.Dsl.t()) :: Reactor.t() | no_return + def to_struct!(reactor) do + case to_struct(reactor) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason + end + end + + defp entities_to_struct(module) when is_atom(module) do + module + |> reactor() + |> reduce_while_ok(Builder.new(module), &Dsl.Build.build/2) + end + + defp entities_to_struct(dsl_state) when is_map(dsl_state) do + module = Extension.get_persisted(dsl_state, :module) + + dsl_state + |> reactor() + |> reduce_while_ok(Builder.new(module), &Dsl.Build.build/2) + end + + defp maybe_set_return(module, reactor) do + case reactor_return(module) do + {:ok, value} -> {:ok, %{reactor | return: value}} + :error -> {:ok, reactor} + end + end + + defp add_middleware(module, reactor) do + module + |> reactor_middlewares() + |> reduce_while_ok(reactor, &Dsl.Build.build/2) + end +end diff --git a/lib/reactor/dsl/map.ex b/lib/reactor/dsl/map.ex new file mode 100644 index 0000000..e47d2dd --- /dev/null +++ b/lib/reactor/dsl/map.ex @@ -0,0 +1,235 @@ +defmodule Reactor.Dsl.Map do + @moduledoc """ + The `map` DSL entity struct. + + See `d:Reactor.map`. + """ + + defstruct __identifier__: nil, + allow_async?: false, + arguments: [], + batch_size: 100, + iterable?: true, + name: nil, + return: nil, + source: nil, + steps: [], + strict_ordering?: true + + alias Reactor.{Builder, Dsl, Step, Template} + + @type t :: %Dsl.Map{ + __identifier__: any, + allow_async?: boolean, + arguments: [Dsl.Argument.t()], + batch_size: pos_integer(), + iterable?: true, + name: atom, + return: atom, + source: Template.t(), + steps: [Dsl.Step.t()], + strict_ordering?: boolean + } + + @doc false + def __entity__, + do: %Spark.Dsl.Entity{ + name: :map, + describe: """ + Execute nested steps for every item of an iterator. + + Allows you to "map over" a collection applying a some steps to each element, + returning a list of new values. The input collection must be bounded - ie you + cannot pass infinite streams into this step or it will just loop forever - and + because it has to keep the results from each batch will eventually just use up + all available RAM. + + Under the hood we use `Iter` to handle lazy iteration of the collection. This + means that you can pass an `Iter.t` or any value for which `Iter.IntoIterable` + is implemented. + + > #### A note on ordering {: .tip} + > + > If your application doesn't need the results back in the same order that they + > were provided then setting `strict_ordering?` to `false` will increase + > performance - especially on large input sets. + """, + examples: [ + """ + map :double_numbers do + input input(:numbers) + + step :double do + argument :number, element(:double_numbers) + + run %{number: number}, _, _ -> + {:ok, number * 2} + end + end + end + """, + """ + step :get_subscriptions do + run _, _, _ -> + Stripe.Subscription.list() + end + end + + map :cancel_subscriptions do + input result(:get_subscriptions) + + step :cancel do + argument :sub_id, element(:cancel_subscriptions, [:id]) + + run fn args, _, _ -> + Stripe.Subscription.cancel(arg.sub_id, %{prorate: true, invoice_now: true}) + end + end + + return :cancel + end + """ + ], + target: Dsl.Map, + args: [:name], + identifier: :name, + imports: [Dsl.Argument], + entities: [steps: [], arguments: [Dsl.Argument.__entity__(), Dsl.WaitFor.__entity__()]], + recursive_as: :steps, + schema: [ + name: [ + type: :atom, + required: true, + doc: """ + A unique name for the step. + """ + ], + allow_async?: [ + type: :boolean, + required: false, + default: false, + doc: """ + Whether the emitted steps should be allowed to run asynchronously. + """ + ], + batch_size: [ + type: :pos_integer, + required: false, + default: 100, + doc: """ + The number of items to consume off the source when emitting steps. + """ + ], + source: [ + type: Template.type(), + required: true, + doc: """ + The iterator or enumerable to use as the source of the iteration. + """ + ], + return: [ + type: :atom, + required: false, + doc: """ + The name of the nested step to use as the return value. + """ + ], + strict_ordering?: [ + type: :boolean, + required: false, + default: true, + doc: """ + Whether the mapped values must be returned in the same order that they were provided. + """ + ] + ] + } + + defimpl Dsl.Build do + import Reactor.Utils + require Reactor.Argument + alias Reactor.Argument + alias Spark.{Dsl.Verifier, Error.DslError} + + def build(map, reactor) do + sub_reactor = Builder.new(reactor.id) + + with {:ok, sub_reactor} <- build_steps(sub_reactor, map) do + arguments = + map.arguments + |> Enum.concat([Argument.from_template(:source, map.source)]) + + step_options = + map + |> Map.take([:allow_async?, :batch_size, :return, :strict_ordering]) + |> Map.put(:state, :init) + |> Map.put(:steps, sub_reactor.steps) + |> Map.update!(:return, fn + nil -> + sub_reactor.steps + |> List.first() + |> Map.fetch!(:name) + + return -> + return + end) + |> Enum.to_list() + + Builder.add_step( + reactor, + map.name, + {Step.Map, step_options}, + arguments, + max_retries: 0, + ref: :step_name + ) + end + end + + @spec verify(Reactor.Dsl.Map.t(), any()) :: :ok | struct() + def verify(step, dsl_state) do + with :ok <- verify_at_least_one_step(step, dsl_state) do + verify_return(step, dsl_state) + end + end + + defp verify_at_least_one_step(%{steps: [], name: name}, dsl_state) do + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, :map, name], + message: "You must provide at least one child step to execute." + ) + end + + defp verify_at_least_one_step(_step, _dsl_state), do: :ok + + defp verify_return(%{return: nil, steps: [_]}, _dsl_state), do: :ok + + defp verify_return(%{return: nil, name: name}, dsl_state), + do: + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, :map, name], + message: + "You must specify which step to use as the return value when more than one nested step is present in a map." + ) + + defp verify_return(%{steps: steps, return: return, name: name}, dsl_state) do + if Enum.any?(steps, &(&1.name == return)) do + :ok + else + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, :map, name], + message: + "The name `#{inspect(return)}` does not refer to a direct descendant of the `#{inspect(name)}` step." + ) + end + end + + defp build_steps(reactor, map) do + map.steps + |> reduce_while_ok(reactor, &Dsl.Build.build/2) + end + end +end diff --git a/lib/reactor/dsl/switch.ex b/lib/reactor/dsl/switch.ex index a44a73f..bdb4833 100644 --- a/lib/reactor/dsl/switch.ex +++ b/lib/reactor/dsl/switch.ex @@ -152,7 +152,17 @@ defmodule Reactor.Dsl.Switch do )} end - def verify(_switch, _dsl_state), do: :ok + def verify(switch, dsl_state) do + switch.matches + |> Enum.flat_map(& &1.steps) + |> Enum.concat(switch.default.steps) + |> Enum.reduce_while(:ok, fn step, :ok -> + case Build.verify(step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end def transform(_switch, dsl_state), do: {:ok, dsl_state} diff --git a/lib/reactor/dsl/transformer.ex b/lib/reactor/dsl/transformer.ex index 718d168..4cc4a22 100644 --- a/lib/reactor/dsl/transformer.ex +++ b/lib/reactor/dsl/transformer.ex @@ -11,7 +11,6 @@ defmodule Reactor.Dsl.Transformer do with {:ok, step_names} <- step_names(dsl_state), {:ok, dsl_state} <- maybe_set_return(dsl_state, step_names), {:ok, dsl_state} <- validate_return(dsl_state, step_names), - {:ok, dsl_state} <- do_entity_transform(dsl_state), {:ok, reactor} <- Info.to_struct(dsl_state), {:ok, reactor} <- Planner.plan(reactor) do dsl_state = @@ -53,12 +52,6 @@ defmodule Reactor.Dsl.Transformer do end end - defp do_entity_transform(dsl_state) do - dsl_state - |> Transformer.get_entities([:reactor]) - |> reduce_while_ok(dsl_state, &Dsl.Build.transform/2) - end - defp maybe_set_return(dsl_state, step_names) do case Transformer.get_option(dsl_state, [:reactor], :return) do nil -> diff --git a/lib/reactor/dsl/verifier.ex b/lib/reactor/dsl/verifier.ex index bb9c059..68239a6 100644 --- a/lib/reactor/dsl/verifier.ex +++ b/lib/reactor/dsl/verifier.ex @@ -3,8 +3,9 @@ defmodule Reactor.Dsl.Verifier do Runs `Reactor.Dsl.Build.verify/2` for all the entities in the reactor. """ use Spark.Dsl.Verifier - alias Reactor.Dsl.Build - alias Spark.Dsl.Verifier + alias Reactor.{Argument, Dsl.Build, Dsl.Input} + alias Spark.{Dsl.Verifier, Error.DslError} + require Argument @doc false @impl true @@ -12,11 +13,89 @@ defmodule Reactor.Dsl.Verifier do def verify(dsl_state) do dsl_state |> Verifier.get_entities([:reactor]) - |> Enum.reduce_while(:ok, fn entity, :ok -> - case Build.verify(entity, dsl_state) do + |> Enum.reject(&is_struct(&1, Input)) + |> Enum.reduce_while(:ok, fn step, :ok -> + case recursively_verify_step(step, nil, dsl_state) do :ok -> {:cont, :ok} {:error, reason} -> {:halt, {:error, reason}} end end) end + + defp recursively_verify_step(%{step: [_ | _]} = step, parent_step, dsl_state) do + with :ok <- verify_step(step, parent_step, dsl_state) do + Enum.reduce_while(step.steps, :ok, fn child, :ok -> + case recursively_verify_step(child, step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end + end + + defp recursively_verify_step(step, parent_step, dsl_state), + do: verify_step(step, parent_step, dsl_state) + + defp verify_step(step, parent_step, dsl_state) do + with :ok <- maybe_verify_element_arguments(step, parent_step, dsl_state) do + Build.verify(step, dsl_state) + end + end + + defp maybe_verify_element_arguments(step, parent_step, dsl_state) + when parent_step.iterable? == true do + step.arguments + |> Enum.reduce_while(:ok, fn + argument, :ok + when Argument.is_from_element(argument) and argument.source.name == parent_step.name -> + {:cont, :ok} + + argument, :ok when Argument.is_from_element(argument) -> + {:halt, + {:error, + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, step.name, :argument, argument.name], + message: """ + Element template refers to non-parent step. + + The argument `#{inspect(argument.name)}` is sourced from an element template, + however this template refers to a step which is not it's immediate parent. This + is an unsupported configuration. + """ + )}} + + _argument, :ok -> + {:cont, :ok} + end) + end + + defp maybe_verify_element_arguments(step, _parent_step, dsl_state) + when is_map_key(step, :arguments) do + step.arguments + |> Enum.reduce_while(:ok, fn + argument, :ok when Argument.is_from_element(argument) -> + {:halt, + {:error, + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, step.name, :argument, argument.name], + message: """ + Unsupported element template in argument. + + The argument `#{inspect(argument.name)}` is sourced from an element template, + this is fine if it's being passed to a step which is an iterator however the + step type `#{inspect(step.__struct__)}` isn't an iterator. + + If you're defining your own iterable step type then you need to add the + `iterable?` field to its struct with its value set to `true`. + """ + )}} + + _argument, :ok -> + {:cont, :ok} + end) + end + + defp maybe_verify_element_arguments(_step, _parent_step, _dsl_state), do: :ok end diff --git a/lib/reactor/error/internal/unreachable_error.ex b/lib/reactor/error/internal/unreachable_error.ex new file mode 100644 index 0000000..81bb1de --- /dev/null +++ b/lib/reactor/error/internal/unreachable_error.ex @@ -0,0 +1,161 @@ +defmodule Reactor.Error.Internal.UnreachableError do + @moduledoc """ + An error that should never happen. + """ + + use Reactor.Error, + fields: [:bindings, :message, :file, :line], + class: :reactor + + @doc false + @impl true + def message(error) do + [ + """ + # Unreachable Error + + You should _never_ see this error in the wild. If you do please raise an issue on + the Reactor repository: + + https://github.com/ash-project/reactor/issues/new + + And paste the following information: + + --- BEGIN COPY --- + + Reached unreachable code at #{error.file}:#{error.line}: + + #{error.message} + """ + ] + |> maybe_append(maybe_format_bindings(error)) + |> maybe_append(maybe_format_stacktrace(error)) + |> maybe_append(format_system_info()) + |> maybe_append(format_running_applications()) + |> Enum.join("\n") + end + + @doc """ + Create an unreachable error. + """ + @spec unreachable(String.t()) :: Macro.output() + defmacro unreachable(message) do + quote do + unquote(__MODULE__).exception( + bindings: binding(), + line: __ENV__.line, + file: __ENV__.file, + message: unquote(message) + ) + end + end + + @doc """ + Bang version of `unreachable/1`. + """ + @spec unreachable!(String.t()) :: Macro.output() + defmacro unreachable!(message) do + quote do + raise unquote(__MODULE__).exception( + bindings: binding(), + line: __ENV__.line, + file: __ENV__.file, + message: unquote(message) + ) + end + end + + defp maybe_format_bindings(error) do + if Enum.any?(error.bindings) do + bindings = + error.bindings + |> Enum.map_join("\n", fn {name, value} -> + " - `#{inspect(name)}`: `#{inspect(value)}`" + end) + + """ + Bindings: + + #{bindings} + """ + end + end + + defp maybe_format_stacktrace(error) do + if error.stacktrace do + stacktrace = + error.stacktrace.stacktrace + |> Enum.drop(2) + |> Exception.format_stacktrace() + + """ + Backtrace: + + #{stacktrace} + """ + end + end + + # sobelow_skip ["Traversal.FileModule"] + defp format_system_info do + elixir = System.build_info() + + erlang_vsn = + [ + :code.root_dir(), + "releases", + :erlang.system_info(:otp_release), + "OTP_VERSION" + ] + |> Path.join() + |> File.read!() + |> String.trim() + + system = + with {_, code} when code > 0 <- System.cmd("uname", ["-a"]), + {_, code} when code > 0 <- System.cmd("ver", []) do + {family, name} = :os.type() + + version = + case :os.version() do + version when is_tuple(version) -> + version + |> Tuple.to_list() + |> Enum.map_join(".", &to_string/1) + + version when is_list(version) -> + to_string(version) + end + + "#{name} #{family} / #{version}" + else + {uname, 0} -> uname + end + + """ + System: + + Elixir #{elixir["version"]} (#{elixir[:revision]}) compiled with Erlang/OTP #{elixir[:otp_release]} + Erlang/OTP #{erlang_vsn} [erts-#{:erlang.system_info(:version)}] + #{system} + """ + end + + defp format_running_applications do + applications = + Application.loaded_applications() + |> Enum.map_join("\n", fn {app, _, vsn} -> + " - #{app} #{vsn}" + end) + + """ + Running applications: + + #{applications} + + --- END COPY --- + + Please carefully read all of the above and redact any sensitive information. + """ + end +end diff --git a/lib/reactor/error/invalid/invalid_result_error.ex b/lib/reactor/error/invalid/invalid_result_error.ex new file mode 100644 index 0000000..b51f3c5 --- /dev/null +++ b/lib/reactor/error/invalid/invalid_result_error.ex @@ -0,0 +1,38 @@ +defmodule Reactor.Error.Invalid.InvalidResultError do + @moduledoc """ + This error is returned when a step returns an invalid result. + """ + + use Reactor.Error, fields: [:reactor, :step, :result, :arguments], class: :invalid + + @doc false + def message(error) do + """ + # Invalid Result Error + + The step `#{inspect(error.step.name)}` returned an invalid result. + + Valid return types from the `c:Reactor.Step.run/3` callback are: + + - `{:ok, any}` - a successful result. + - `{:ok, any, [Reactor.Step.t]}` - a successful result with additional steps to + add to the running reactor. + - `:retry` - the step wants to be retried. + - `{:retry, Exception.t | any}` - the step wants to be retried, and here's why. + - `{:error, Exception.t | any}` - the step failed, and here's why. + - `{:halt, any}` - the step wants the Reactor to stop. + + ## `result`: + + #{inspect(error.result)} + + ## `step`: + + #{inspect(error.step)} + + ## `arguments`: + + #{inspect(error.arguments)} + """ + end +end diff --git a/lib/reactor/executor.ex b/lib/reactor/executor.ex index acfecb4..173a2cf 100644 --- a/lib/reactor/executor.ex +++ b/lib/reactor/executor.ex @@ -270,15 +270,16 @@ defmodule Reactor.Executor do end defp find_ready_steps(reactor, _state) do - step = - reactor.plan - |> Graph.vertices() - |> Enum.find(fn - step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0 - _ -> false - end) - - {:continue, [step]} + reactor.plan + |> Graph.vertices() + |> Enum.find(fn + step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0 + _ -> false + end) + |> case do + nil -> {:continue, []} + step -> {:continue, [step]} + end end defp maybe_release_pool(state) when state.pool_owner == true do diff --git a/lib/reactor/executor/async.ex b/lib/reactor/executor/async.ex index 35616c5..bc05ad5 100644 --- a/lib/reactor/executor/async.ex +++ b/lib/reactor/executor/async.ex @@ -71,105 +71,125 @@ defmodule Reactor.Executor.Async do end @doc """ - Check to see if any steps are completed, and if so handle them. + Handle zero or one completed async steps and then decide what to do. """ @spec handle_completed_steps(Reactor.t(), Executor.State.t()) :: {:recurse | :continue | :undo | :halt, Reactor.t(), Executor.State.t()} def handle_completed_steps(reactor, state) do - completed_task_results = get_normalised_task_results(state.current_tasks, 100) - - reactor - |> delete_vertices(Map.keys(completed_task_results)) - |> handle_completed_steps(state, completed_task_results) + completed_task_results = get_normalised_task_results(state, timeout: 100) + handle_completed_task_results(reactor, state, completed_task_results) end - defp handle_completed_steps(reactor, state, completed_task_results) - when map_size(completed_task_results) == 0, - do: {:continue, reactor, state} + defp handle_completed_task_results(reactor, state, []), + do: {:continue, reactor, state} - defp handle_completed_steps(reactor, state, completed_task_results) do - release_concurrency_resources_to_pool(state.concurrency_key, map_size(completed_task_results)) + defp handle_completed_task_results(reactor, state, completed_task_results) do + Enum.reduce( + completed_task_results, + {:recurse, reactor, state}, + fn task_result, {status, reactor, state} -> + {new_status, reactor, state} = handle_completed_step(reactor, state, task_result) - new_current_tasks = Map.drop(state.current_tasks, Map.keys(completed_task_results)) + if got_worse?(status, new_status) do + {new_status, reactor, state} + else + {status, reactor, state} + end + end + ) + end - completed_step_results = - completed_task_results - |> Map.values() - |> Map.new() + defp got_worse?(:recurse, :undo), do: true + defp got_worse?(:recurse, :halt), do: true + defp got_worse?(:undo, :halt), do: true + defp got_worse?(_old, _new), do: false - retry_steps = - completed_step_results - |> Enum.filter(fn - {_, :retry} -> true - {_, {:retry, _}} -> true - _ -> false - end) - |> Enum.map(&elem(&1, 0)) - - steps_to_remove = - completed_step_results - |> Map.keys() - |> MapSet.new() - |> MapSet.difference(MapSet.new(retry_steps)) - |> Enum.to_list() - - steps_to_append = - completed_step_results - |> Map.values() - |> Enum.flat_map(fn - {:ok, _, steps} -> steps - _ -> [] - end) + defp handle_completed_step(reactor, state, {task, step, {:ok, value, new_steps}}) do + state = + state + |> drop_task(task) reactor = reactor - |> store_successful_results_in_the_undo_stack(completed_step_results) - |> store_intermediate_results(completed_step_results) - |> delete_vertices(steps_to_remove) - |> append_steps(steps_to_append) + |> drop_from_plan(task) + |> maybe_store_undo(step, value) + |> maybe_store_intermediate_result(step, value) + reactor = + case Enum.split_with(new_steps, &(&1.name == step.name)) do + {[], new_steps} -> + reactor + |> drop_from_plan(step) + |> append_steps(new_steps) + + {recursive_steps, new_steps} -> + recursive_steps = Enum.map(recursive_steps, &%{&1 | ref: step.ref}) + + reactor + |> append_steps(recursive_steps) + |> append_steps(new_steps) + end + + {:recurse, reactor, state} + end + + defp handle_completed_step(reactor, state, {task, step, {:retry, error}}) do state = state - |> increment_retry_counts(retry_steps) - |> collect_errors(completed_step_results) + |> increment_retries(step) + |> drop_task(task) - status = - completed_task_results - |> Enum.find_value(:ok, fn - {_task, {_step, {:halt, _}}} -> - :halt + reactor = + reactor + |> drop_from_plan(task) - {_task, {_step, {:error, _}}} -> - :undo + if Map.get(state.retries, step.ref) >= step.max_retries do + error = + error || + RetriesExceededError.exception( + step: step, + retry_count: Map.get(state.retries, step.ref) + ) - {_task, {step, :retry}} -> - if Map.get(state.retries, step.ref) >= step.max_retries, - do: :undo - - _ -> - nil - end) - - state = %{state | current_tasks: new_current_tasks} - - case status do - :ok -> - {:recurse, reactor, state} - - :undo -> - {reactor, state} = collect_remaining_tasks_for_shutdown(reactor, state) - {:undo, reactor, state} - - :halt -> - {reactor, state} = collect_remaining_tasks_for_shutdown(reactor, state) - {:halt, reactor, state} + reactor = drop_from_plan(reactor, step) + {:undo, reactor, add_error(state, error)} + else + {:recurse, reactor, state} end end - defp get_normalised_task_results(current_tasks, timeout) do + defp handle_completed_step(reactor, state, {task, step, {:error, error}}) do + state = + state + |> drop_task(task) + |> add_error(error) + + reactor = + reactor + |> drop_from_plan(task) + |> drop_from_plan(step) + + {:undo, reactor, state} + end + + defp handle_completed_step(reactor, state, {task, step, {:halt, value}}) do + state = + state + |> drop_task(task) + + reactor = + reactor + |> drop_from_plan(task) + |> drop_from_plan(step) + |> store_intermediate_result(step, value) + + {:halt, reactor, state} + end + + defp get_normalised_task_results(%{current_tasks: current_tasks}, opts) do current_tasks |> Map.keys() - |> Task.yield_many(timeout) + |> Task.yield_many(opts) |> Stream.reject(&is_nil(elem(&1, 1))) |> Stream.map(fn {task, {:ok, {:error, reason}}} -> @@ -179,7 +199,7 @@ defmodule Reactor.Executor.Async do {task, {:halt, reason}} {task, {:ok, :retry}} -> - {task, :retry} + {task, {:retry, nil}} {task, {:ok, {:retry, reason}}} -> {task, {:retry, reason}} @@ -190,11 +210,53 @@ defmodule Reactor.Executor.Async do {task, {:exit, reason}} -> {task, {:error, reason}} end) - |> Map.new(fn {task, result} -> - {task, {Map.fetch!(current_tasks, task), result}} + |> Enum.map(fn {task, result} -> + {task, Map.fetch!(current_tasks, task), result} end) end + defp drop_task(state, task) do + ConcurrencyTracker.release(state.concurrency_key, 1) + + %{state | current_tasks: Map.delete(state.current_tasks, task)} + end + + defp increment_retries(state, step) do + %{state | retries: Map.update(state.retries, step.ref, 0, &(&1 + 1))} + end + + defp drop_from_plan(reactor, step) do + %{reactor | plan: Graph.delete_vertex(reactor.plan, step)} + end + + defp add_error(state, error) do + %{state | errors: [error | state.errors]} + end + + defp store_intermediate_result(reactor, step, value) do + %{reactor | intermediate_results: Map.put(reactor.intermediate_results, step.name, value)} + end + + defp maybe_store_undo(reactor, step, value) do + if Step.can?(step, :undo) do + %{reactor | undo: [{step, value} | reactor.undo]} + else + reactor + end + end + + defp maybe_store_intermediate_result(reactor, step, value) when reactor.return == step.name do + store_intermediate_result(reactor, step, value) + end + + defp maybe_store_intermediate_result(reactor, step, value) do + if Graph.out_degree(reactor.plan, step) > 0 do + store_intermediate_result(reactor, step, value) + else + reactor + end + end + defp store_successful_results_in_the_undo_stack(reactor, completed_step_results) when map_size(completed_step_results) == 0, do: reactor @@ -246,47 +308,6 @@ defmodule Reactor.Executor.Async do } end - defp increment_retry_counts(state, retry_steps) do - retries = - retry_steps - |> Enum.reduce(state.retries, fn step, retries -> - Map.update(retries, step.ref, 1, &(&1 + 1)) - end) - - %{state | retries: retries} - end - - defp collect_errors(state, completed_step_results) do - errors = - completed_step_results - |> Enum.filter(fn - {_step, {:error, _}} -> - true - - {step, {:retry, _}} -> - Map.get(state.retries, step.ref) >= step.max_retries - - {step, :retry} -> - Map.get(state.retries, step.ref) >= step.max_retries - - _ -> - false - end) - |> Enum.map(fn - {_step, {_, reason}} -> - reason - - {step, :retry} -> - RetriesExceededError.exception( - step: step, - retry_count: Map.get(state.retries, step.ref) - ) - end) - |> Enum.concat(state.errors) - - %{state | errors: errors} - end - @doc """ When the Reactor needs to shut down for any reason, we need to await all the currently running asynchronous steps and delete any task vertices. @@ -299,14 +320,16 @@ defmodule Reactor.Executor.Async do end def collect_remaining_tasks_for_shutdown(reactor, state) do - remaining_task_results = get_normalised_task_results(state.current_tasks, state.halt_timeout) + remaining_task_results = + get_normalised_task_results(state, timeout: state.halt_timeout, on_timeout: :ignore) - release_concurrency_resources_to_pool(state.concurrency_key, map_size(remaining_task_results)) + release_concurrency_resources_to_pool(state.concurrency_key, length(remaining_task_results)) remaining_step_results = remaining_task_results - |> Map.values() - |> Map.new() + |> Map.new(fn {_task, step, result} -> {step, result} end) + + finished_tasks = remaining_step_results |> Enum.map(&elem(&1, 0)) reactor = reactor @@ -315,7 +338,7 @@ defmodule Reactor.Executor.Async do unfinished_tasks = state.current_tasks - |> Map.delete(Map.keys(remaining_task_results)) + |> Map.delete(finished_tasks) unfinished_task_count = map_size(unfinished_tasks) @@ -327,7 +350,7 @@ defmodule Reactor.Executor.Async do |> Enum.map_join("\n * ", &inspect/1) """ - Waited #{state.halt_timeout}ms for async steps to complete, however #{unfinished_task_count} are still running and will be abandoned and cannot be undone. + Waited #{state.halt_timeout}ms for async steps to complete, however #{unfinished_task_count} are still running, will be abandoned and cannot be undone. * #{unfinished_steps} """ diff --git a/lib/reactor/executor/state.ex b/lib/reactor/executor/state.ex index 761dc17..dcd79de 100644 --- a/lib/reactor/executor/state.ex +++ b/lib/reactor/executor/state.ex @@ -53,7 +53,7 @@ defmodule Reactor.Executor.State do |> maybe_set_max_concurrency() |> maybe_allocate_concurrency_pool() |> Map.put(:started_at, DateTime.utc_now()) - |> then(&struct(__MODULE__, &1)) + |> then(&struct!(__MODULE__, &1)) end defp maybe_set_max_concurrency(attrs) diff --git a/lib/reactor/executor/step_runner.ex b/lib/reactor/executor/step_runner.ex index e0b32bd..27fc451 100644 --- a/lib/reactor/executor/step_runner.ex +++ b/lib/reactor/executor/step_runner.ex @@ -5,6 +5,7 @@ defmodule Reactor.Executor.StepRunner do alias Reactor.{ Error.Invalid.ArgumentSubpathError, Error.Invalid.CompensateStepError, + Error.Invalid.InvalidResultError, Error.Invalid.MissingInputError, Error.Invalid.MissingResultError, Error.Invalid.RunStepError, @@ -20,6 +21,8 @@ defmodule Reactor.Executor.StepRunner do import Reactor.Argument, only: :macros require Logger + @dialyzer {:nowarn_function, handle_run_result: 5} + # In the future this could be moved into a step property. @max_undo_count 5 @@ -161,6 +164,16 @@ defmodule Reactor.Executor.StepRunner do {:halt, value} end + defp handle_run_result(result, reactor, step, arguments, _context) do + {:error, + InvalidResultError.exception( + reactor: reactor, + step: step, + result: result, + arguments: arguments + )} + end + defp maybe_compensate(reactor, step, error, arguments, context) do if Step.can?(step, :compensate) do compensate(reactor, step, error, arguments, context) diff --git a/lib/reactor/executor/sync.ex b/lib/reactor/executor/sync.ex index f94bc6d..bc24ef6 100644 --- a/lib/reactor/executor/sync.ex +++ b/lib/reactor/executor/sync.ex @@ -15,57 +15,72 @@ defmodule Reactor.Executor.Sync do def run(reactor, state, nil), do: {:continue, reactor, state} def run(reactor, state, step) do - case Executor.StepRunner.run(reactor, state, step, state.concurrency_key) do - :retry -> - state = increment_retries(state, step) + result = Executor.StepRunner.run(reactor, state, step, state.concurrency_key) - if Map.get(state.retries, step.ref) >= step.max_retries do - reactor = drop_from_plan(reactor, step) + handle_completed_step(reactor, state, step, result) + end - error = - RetriesExceededError.exception( - step: step, - retry_count: Map.get(state.retries, step.ref) - ) + defp handle_completed_step(reactor, state, step, :retry) do + handle_completed_step(reactor, state, step, {:retry, nil}) + end - {:undo, reactor, %{state | errors: [error | state.errors]}} - else - {:recurse, reactor, state} - end + defp handle_completed_step(reactor, state, step, {:retry, error}) do + state = increment_retries(state, step) - {:retry, reason} -> - state = increment_retries(state, step) + if Map.get(state.retries, step.ref) >= step.max_retries do + reactor = drop_from_plan(reactor, step) - if Map.get(state.retries, step.ref) >= step.max_retries do - reactor = drop_from_plan(reactor, step) - {:undo, reactor, %{state | errors: [reason | state.errors]}} - else - {:recurse, reactor, state} - end + error = + error || + RetriesExceededError.exception( + step: step, + retry_count: Map.get(state.retries, step.ref) + ) - {:ok, value, new_steps} -> - reactor = + {:undo, reactor, %{state | errors: [error | state.errors]}} + else + {:recurse, reactor, state} + end + end + + defp handle_completed_step(reactor, state, step, {:ok, value, new_steps}) do + reactor = + reactor + |> maybe_store_undo(step, value) + |> maybe_store_intermediate_result(step, value) + + reactor = + case Enum.split_with(new_steps, &(&1.name == step.name)) do + {[], new_steps} -> reactor - |> maybe_store_undo(step, value) - |> maybe_store_intermediate_result(step, value) |> drop_from_plan(step) |> append_steps(new_steps) - {:recurse, reactor, state} + {recursive_steps, new_steps} -> + recursive_steps = Enum.map(recursive_steps, &%{&1 | ref: step.ref}) - {:error, reason} -> - state = %{state | errors: [reason | state.errors]} - reactor = drop_from_plan(reactor, step) - {:undo, reactor, state} - - {:halt, value} -> - reactor = reactor - |> drop_from_plan(step) |> store_intermediate_result(step, value) + |> append_steps(recursive_steps) + |> append_steps(new_steps) + end - {:halt, reactor, state} - end + {:recurse, reactor, state} + end + + defp handle_completed_step(reactor, state, step, {:error, reason}) do + state = %{state | errors: [reason | state.errors]} + reactor = drop_from_plan(reactor, step) + {:undo, reactor, state} + end + + defp handle_completed_step(reactor, state, step, {:halt, value}) do + reactor = + reactor + |> drop_from_plan(step) + |> store_intermediate_result(step, value) + + {:halt, reactor, state} end defp increment_retries(state, step) do @@ -84,16 +99,15 @@ defmodule Reactor.Executor.Sync do end end + defp maybe_store_intermediate_result(reactor, step, value) when reactor.return == step.name do + store_intermediate_result(reactor, step, value) + end + defp maybe_store_intermediate_result(reactor, step, value) do - cond do - Graph.out_degree(reactor.plan, step) > 0 -> - store_intermediate_result(reactor, step, value) - - reactor.return == step.name -> - store_intermediate_result(reactor, step, value) - - true -> - reactor + if Graph.out_degree(reactor.plan, step) > 0 do + store_intermediate_result(reactor, step, value) + else + reactor end end diff --git a/lib/reactor/planner.ex b/lib/reactor/planner.ex index dd51df7..03cf58e 100644 --- a/lib/reactor/planner.ex +++ b/lib/reactor/planner.ex @@ -50,15 +50,22 @@ defmodule Reactor.Planner do steps_by_name = graph |> Graph.vertices() - |> Enum.concat(steps) + |> Stream.filter(&is_struct(&1, Step)) + |> Stream.concat(steps) |> Map.new(&{&1.name, &1}) steps |> reduce_while_ok(graph, fn step, graph when is_struct(step, Step) -> - graph - |> Graph.add_vertex(step, step.name) - |> reduce_arguments_into_graph(step, steps_by_name) + if Graph.has_vertex?(graph, step) do + graph + |> Graph.replace_vertex(step, step) + |> reduce_arguments_into_graph(step, steps_by_name) + else + graph + |> Graph.add_vertex(step, step.name) + |> reduce_arguments_into_graph(step, steps_by_name) + end not_step, _ -> {:error, @@ -72,7 +79,7 @@ defmodule Reactor.Planner do defp reduce_arguments_into_graph(graph, current_step, steps_by_name) do reduce_while_ok(current_step.arguments, graph, fn - argument, graph when is_argument(argument) and is_from_result(argument) -> + argument, graph when is_from_result(argument) -> dependency_name = argument.source.name case Map.fetch(steps_by_name, dependency_name) do @@ -95,8 +102,7 @@ defmodule Reactor.Planner do )} end - argument, graph - when is_argument(argument) and (is_from_input(argument) or is_from_value(argument)) -> + argument, graph when is_from_input(argument) or is_from_value(argument) -> {:ok, graph} end) end diff --git a/lib/reactor/step/map.ex b/lib/reactor/step/map.ex new file mode 100644 index 0000000..67a0d1f --- /dev/null +++ b/lib/reactor/step/map.ex @@ -0,0 +1,352 @@ +defmodule Reactor.Step.Map do + use Reactor.Step + require Reactor.Argument + require Reactor.Error.Internal.UnreachableError + require Iter + alias Reactor.{Argument, Builder, Error.Internal.UnreachableError, Step, Template} + alias Spark.Options + import Reactor.Utils + + @option_schema [ + state: [ + type: {:in, [:init, :iterating]}, + required: true, + doc: """ + The current execution state of the map. This is required because it's recursive. + """ + ], + batch_size: [ + type: :pos_integer, + required: false, + default: 100, + doc: """ + The number of elements to consume off the source when emitting steps. + """ + ], + steps: [ + type: {:list, {:struct, Step}}, + required: true, + doc: """ + The steps to use when mapping source elements. + """ + ], + return: [ + type: :atom, + required: true, + doc: """ + The name of the step whose result will be used as the new value for each element. + """ + ], + strict_ordering?: [ + type: :boolean, + required: false, + default: true, + doc: """ + Whether the mapped values must be returned in the same order that they were provided. + """ + ], + allow_async?: [ + type: :boolean, + required: false, + default: true, + doc: """ + Whether the emitted steps should be allowed to run asynchronously. + """ + ], + descendant_step_names: [ + type: {:struct, MapSet}, + required: false, + doc: """ + The cached names of all descendant steps to aid rewriting. You don't need to provide this value - it is calculated by the init pass. + """ + ], + extra_arguments: [ + type: {:list, {:struct, Argument}}, + required: false, + doc: """ + Extra arguments to be passed by to every nested step. + """ + ] + ] + + @moduledoc """ + Given an iterable input run the provided steps for each element and collect + the results into a new value. + + > #### A note on ordering {: .tip} + > + > If your application doesn't need the results back in the same order that + > they were provided then setting `strict_ordering?` to `false` will increase + > performance - especially on large input sets. + + ## Options + + #{Options.docs(@option_schema)} + """ + + @doc false + @impl true + def run(arguments, context, options) do + with {:ok, options} <- Options.validate(options, @option_schema) do + case options[:state] do + :init -> do_init(arguments.source, arguments, options, context.current_step) + :iterating -> do_iterate(arguments, options, context.current_step) + end + end + end + + defp do_init(source, arguments, options, map_step) when Iter.is_iter(source) do + source = + source + |> Iter.with_index() + + extra_arguments = + arguments + |> Map.drop([:source, :result]) + |> Enum.map(fn {name, value} -> + Argument.from_value(name, value) + end) + + options = + options + |> Keyword.put_new_lazy(:descendant_step_names, fn -> + collect_all_step_names(options[:steps]) + end) + |> Keyword.put(:state, :iterating) + |> Keyword.put(:extra_arguments, extra_arguments) + + emit_batch(source, options, map_step, []) + end + + defp do_init(source, arguments, options, map_step) do + source + |> Iter.from() + |> do_init(arguments, options, map_step) + end + + defp do_iterate(arguments, options, map_step) do + {source, arguments} = Map.pop!(arguments, :source) + {result, arguments} = Map.pop!(arguments, :result) + + map_step_name = map_step.name + + result = + Enum.reduce(arguments, result, fn {{__MODULE__, ^map_step_name, :element, index}, value}, + result -> + [{index, value} | result] + end) + + emit_batch(source, options, map_step, result) + end + + defp collect_all_step_names(steps, into \\ MapSet.new()) + defp collect_all_step_names([], into), do: into + + defp collect_all_step_names([%{steps: [_ | _] = child_steps} = step | steps], into) do + into = collect_all_step_names(child_steps, MapSet.put(into, step.name)) + collect_all_step_names(steps, into) + end + + defp collect_all_step_names([step | steps], into), + do: collect_all_step_names(steps, MapSet.put(into, step.name)) + + defp emit_batch(source, options, map_step, result) do + with {:done, batch} <- Iter.take_chunk(source, options[:batch_size]), + {:done, []} <- {:done, Iter.to_list(batch)} do + finalise_result(result, options) + else + {:ok, batch, remainder} -> do_emit_batch(batch, remainder, options, map_step, result) + {:done, batch} -> do_emit_batch(batch, Iter.empty(), options, map_step, result) + end + end + + defp do_emit_batch(batch, remainder, options, map_step, result) do + with {:ok, arguments} <- arguments_for_batch(batch, options, map_step), + {:ok, recursive_step} <- + Builder.new_step( + map_step.name, + {__MODULE__, options}, + Enum.concat(arguments, [ + Argument.from_value(:source, remainder), + Argument.from_result(:result, map_step.name) + ]) + ), + {:ok, steps} <- steps_for_batch(batch, options, map_step) do + steps = Enum.concat(steps, [recursive_step]) + + {:ok, result, steps} + end + end + + defp finalise_result(result, options) do + if options[:strict_ordering?] do + result = + result + |> Enum.sort_by(&elem(&1, 0)) + |> Enum.map(&elem(&1, 1)) + + {:ok, result} + else + {:ok, Map.values(result)} + end + end + + # generate a whole heap of arguments for the recursive step so that it can + # collect up the whole batch. + defp arguments_for_batch(batch, options, map_step) do + arguments = + Enum.map(batch, fn {_element, index} -> + %Argument{ + name: {__MODULE__, map_step.name, :element, index}, + source: %Template.Result{name: {__MODULE__, map_step.name, options[:return], index}} + } + end) + + {:ok, arguments} + end + + defp steps_for_batch(batch, options, map_step) do + steps = options[:steps] + descendant_step_names = options[:descendant_step_names] + extra_arguments = options[:extra_arguments] + + reduce_while_ok(batch, [], fn {element, index}, result -> + case rewrite_steps_for_element( + {element, index}, + steps, + descendant_step_names, + map_step, + extra_arguments, + options[:allow_async?] + ) do + {:ok, steps} -> reduce_while_ok(steps, result, &{:ok, [&1 | &2]}) + {:error, reason} -> {:error, reason} + end + end) + end + + defp rewrite_steps_for_element( + {element, index}, + steps, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + map_while_ok( + steps, + &rewrite_step_for_element( + &1, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) + ) + end + + defp rewrite_step_for_element( + step, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + with {:ok, step} <- + rewrite_arguments( + step, + {element, index}, + descendant_step_names, + map_step + ), + {:ok, step} <- + rewrite_nested_steps_for_element( + step, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + {:ok, + %{ + step + | arguments: Enum.concat(step.arguments, extra_arguments), + name: {__MODULE__, map_step.name, step.name, index}, + ref: {__MODULE__, map_step.name, step.ref, index}, + async?: allow_async? + }} + end + end + + defp rewrite_arguments(step, {element, index}, descendant_step_names, map_step) do + map_while_ok(step.arguments, fn + argument + when Argument.is_from_element(argument) and argument.source.name != map_step.name -> + {:error, + UnreachableError.unreachable( + "Attempted to retrieve an element whose source doesn't match the current map step: #{inspect(argument.source.name)} vs #{inspect(map_step.name)}" + )} + + argument when Argument.is_from_element(argument) -> + argument = + argument.name + |> Argument.from_value(element) + |> Argument.sub_path(argument.source.sub_path) + + {:ok, argument} + + argument when Argument.is_from_result(argument) -> + if MapSet.member?(descendant_step_names, argument.source.name) do + argument = %{ + argument + | source: %{ + argument.source + | name: {__MODULE__, map_step.name, argument.source.name, index} + } + } + + {:ok, argument} + else + {:ok, argument} + end + + argument -> + {:ok, argument} + end) + |> and_then(&{:ok, %{step | arguments: &1}}) + end + + defp rewrite_nested_steps_for_element( + %{steps: [_ | _] = steps} = step, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + with {:ok, steps} <- + rewrite_steps_for_element( + {element, index}, + steps, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + {:ok, %{step | steps: steps}} + end + end + + defp rewrite_nested_steps_for_element( + step, + _element_index, + _descendant_step_names, + _map_step, + _extra_arguments, + _allow_async? + ), + do: {:ok, step} +end diff --git a/lib/reactor/template.ex b/lib/reactor/template.ex index 639a71e..ded0070 100644 --- a/lib/reactor/template.ex +++ b/lib/reactor/template.ex @@ -2,14 +2,13 @@ defmodule Reactor.Template do @moduledoc """ Templates used to refer to some sort of computed value. """ + alias __MODULE__.{Element, Input, Result, Value} - alias __MODULE__.{Input, Result, Value} - - @type t :: Input.t() | Result.t() | Value.t() + @type t :: Element.t() | Input.t() | Result.t() | Value.t() @doc "The type for use in option schemas" @spec type :: Spark.Options.type() - def type, do: {:or, [{:struct, Input}, {:struct, Result}, {:struct, Value}]} + def type, do: {:or, [{:struct, Element}, {:struct, Input}, {:struct, Result}, {:struct, Value}]} @doc "A guard for input templates" @spec is_input_template(any) :: Macro.output() @@ -23,9 +22,13 @@ defmodule Reactor.Template do @spec is_value_template(any) :: Macro.output() defguard is_value_template(template) when is_struct(template, Value) + @doc "A guard for element templates" + @spec is_element_template(any) :: Macro.output() + defguard is_element_template(template) when is_struct(template, Element) + @doc "A guard to detect all template types" @spec is_template(any) :: Macro.output() defguard is_template(template) when is_input_template(template) or is_result_template(template) or - is_value_template(template) + is_value_template(template) or is_element_template(template) end diff --git a/lib/reactor/template/element.ex b/lib/reactor/template/element.ex new file mode 100644 index 0000000..16ec104 --- /dev/null +++ b/lib/reactor/template/element.ex @@ -0,0 +1,9 @@ +defmodule Reactor.Template.Element do + @moduledoc """ + The `element` template. + """ + + defstruct name: nil, sub_path: [] + + @type t :: %__MODULE__{name: atom, sub_path: [atom]} +end diff --git a/lib/reactor/template/value.ex b/lib/reactor/template/value.ex index cdbba44..f58c9ed 100644 --- a/lib/reactor/template/value.ex +++ b/lib/reactor/template/value.ex @@ -3,7 +3,7 @@ defmodule Reactor.Template.Value do A statically `value` template. """ - defstruct value: nil + defstruct value: nil, sub_path: [] - @type t :: %__MODULE__{value: any} + @type t :: %__MODULE__{value: any, sub_path: [any]} end diff --git a/lib/reactor/utils.ex b/lib/reactor/utils.ex index 370211e..07327cc 100644 --- a/lib/reactor/utils.ex +++ b/lib/reactor/utils.ex @@ -98,6 +98,41 @@ defmodule Reactor.Utils do end end + @doc """ + Perform a flat map over an enumerable provided that the mapper function + continues to return ok tuples. + """ + @spec flat_map_while_ok(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) :: + {:ok, Enumerable.t(output)} | {:error, any} + when input: any, output: any + def flat_map_while_ok(inputs, mapper, preserve_order? \\ false) + + def flat_map_while_ok(inputs, mapper, false) do + reduce_while_ok(inputs, [], fn input, acc -> + case mapper.(input) do + {:ok, result} -> reduce_while_ok(result, acc, &[&1 | &2]) + {:error, reason} -> {:error, reason} + end + end) + end + + def flat_map_while_ok(inputs, mapper, true) do + inputs + |> flat_map_while_ok(mapper, false) + |> and_then(&{:ok, Enum.reverse(&1)}) + end + + @doc "Raising version of `flat_map_while_ok/3`" + @spec flat_map_while_ok!(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) :: + Enumerable.t(output) | no_return + when input: any, output: any + def flat_map_while_ok!(inputs, mapper, preserve_order? \\ false) do + case flat_map_while_ok(inputs, mapper, preserve_order?) do + {:ok, result} -> result + {:error, reason} -> raise reason + end + end + @doc """ Perform a reduction over an enumerable provided that the reduction function returns an ok tuple. diff --git a/mix.exs b/mix.exs index 53b804c..42993de 100644 --- a/mix.exs +++ b/mix.exs @@ -95,6 +95,7 @@ defmodule Reactor.MixProject do {:splode, "~> 0.2"}, {:libgraph, "~> 0.16"}, {:igniter, "~> 0.2"}, + {:iterex, "~> 0.1"}, {:telemetry, "~> 1.2"}, # Dev/Test dependencies diff --git a/mix.lock b/mix.lock index 368f7c2..baa7fb9 100644 --- a/mix.lock +++ b/mix.lock @@ -16,6 +16,7 @@ "glob_ex": {:hex, :glob_ex, "0.1.7", "eae6b6377147fb712ac45b360e6dbba00346689a87f996672fe07e97d70597b1", [:mix], [], "hexpm", "decc1c21c0c73df3c9c994412716345c1692477b9470e337f628a7e08da0da6a"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "igniter": {:hex, :igniter, "0.2.10", "078a1308924e2cffce7956b00e01794ec218ed09cea969d0a9911ee91f885c99", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.4", [hex: :req, repo: "hexpm", optional: false]}, {:rewrite, "~> 0.9", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "071080c4676226764b9513138f185e842993ff95bbf483e78536db50396986eb"}, + "iterex": {:hex, :iterex, "0.1.1", "90378a9561ad87da46737dceaf02e68a0b3023746216a4de34a0c509f5f505d4", [:mix], [], "hexpm", "c4f5916a6dbb03aa4c3d5c480069e13075ca6a57bd0c28d643da3891962440ad"}, "jason": {:hex, :jason, "1.4.3", "d3f984eeb96fe53b85d20e0b049f03e57d075b5acda3ac8d465c969a2536c17b", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9a90e868927f7c777689baa16d86f4d0e086d968db5c05d917ccff6d443e58a3"}, "libgraph": {:hex, :libgraph, "0.16.0", "3936f3eca6ef826e08880230f806bfea13193e49bf153f93edcf0239d4fd1d07", [:mix], [], "hexpm", "41ca92240e8a4138c30a7e06466acc709b0cbb795c643e9e17174a178982d6bf"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, diff --git a/test/reactor/dsl/map_test.exs b/test/reactor/dsl/map_test.exs new file mode 100644 index 0000000..4bc1455 --- /dev/null +++ b/test/reactor/dsl/map_test.exs @@ -0,0 +1,36 @@ +defmodule Reactor.Dsl.MapTest do + @moduledoc false + use ExUnit.Case, async: true + + defmodule MapOverNumbersReactor do + @moduledoc false + use Reactor + + input :numbers + + step :multiplier do + run fn _ -> {:ok, 2} end + end + + map :map_over_numbers do + source(input(:numbers)) + argument :multiplier, result(:multiplier) + batch_size(2) + + step :double do + argument :input, element(:map_over_numbers) + + run fn %{input: input, multiplier: multiplier}, _ -> + {:ok, input * multiplier} + end + end + end + end + + test "it maps over it's inputs" do + numbers = [0, 2, 4, 6, 8, 10] + + assert {:ok, [0, 4, 8, 12, 16, 20]} = + Reactor.run!(MapOverNumbersReactor, %{numbers: numbers}, %{}, async?: false) + end +end diff --git a/test/reactor/executor/async_test.exs b/test/reactor/executor/async_test.exs index 5b6507f..b7ef75d 100644 --- a/test/reactor/executor/async_test.exs +++ b/test/reactor/executor/async_test.exs @@ -204,12 +204,14 @@ defmodule Reactor.Executor.AsyncTest do assert Graph.has_vertex?(reactor.plan, step) end - test "when one of the steps asks to retry, it increments the retry count for the step", + test "when one of the steps asks to retry, it sets the retry count for the step", %{reactor: reactor, state: state, doable: doable, supervisor: supervisor} do task = Task.Supervisor.async_nolink(supervisor, fn -> :retry end) state = %{state | current_tasks: %{task => doable}} + refute is_map_key(state.retries, doable.ref) + assert {_, _reactor, state} = handle_completed_steps(reactor, state) - assert state.retries[doable.ref] == 1 + assert state.retries[doable.ref] == 0 end test "when one of the steps asks to retry (again), it increments the retry count for the step", diff --git a/test/reactor/executor/init_test.exs b/test/reactor/executor/init_test.exs index 9bfe97e..8dbc6f2 100644 --- a/test/reactor/executor/init_test.exs +++ b/test/reactor/executor/init_test.exs @@ -2,7 +2,7 @@ defmodule Reactor.Executor.InitTest do @moduledoc false use ExUnit.Case, async: true import Reactor.Executor.Init - alias Reactor.{Builder, Executor} + alias Reactor.Builder use Mimic describe "init/4" do @@ -52,14 +52,5 @@ defmodule Reactor.Executor.InitTest do assert {:error, error} = init(reactor, [], [], [:wat]) assert Exception.message(error) =~ "cannot be converted into a map" end - - test "options are passed into `State.init/1`", %{reactor: reactor} do - Executor.State - |> expect(:init, fn options -> - assert options.hello == :marty - end) - - assert {:ok, _reactor, _state} = init(reactor, [], [], hello: :marty) - end end end diff --git a/test/test_helper.exs b/test/test_helper.exs index d9bfe99..a523dca 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,5 +1,4 @@ Mimic.copy(Example.Step.Compensable) Mimic.copy(Example.Step.Doable) Mimic.copy(Example.Step.Undoable) -Mimic.copy(Reactor.Executor.State) ExUnit.start()