diff --git a/.formatter.exs b/.formatter.exs index 8b251d3..549d28d 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -9,6 +9,7 @@ spark_locals_without_parens = [ around: 2, around: 3, async?: 1, + batch_size: 1, before_all: 1, collect: 1, collect: 2, @@ -24,6 +25,8 @@ spark_locals_without_parens = [ input: 1, input: 2, level: 1, + map: 1, + map: 2, matches?: 1, matches?: 2, max_retries: 1, @@ -32,9 +35,11 @@ spark_locals_without_parens = [ on: 1, return: 1, run: 1, + source: 1, step: 1, step: 2, step: 3, + strict_ordering?: 1, switch: 1, switch: 2, transform: 1, diff --git a/.vscode/settings.json b/.vscode/settings.json index d3e22f1..c9c0d16 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,9 +4,11 @@ "backoff", "casted", "Desugars", + "lvalue", "mappish", "noreply", "Planable", + "rvalue", "splode", "Splode" ] diff --git a/documentation/dsls/DSL:-Reactor.md b/documentation/dsls/DSL:-Reactor.md index 1cad18a..444eb49 100644 --- a/documentation/dsls/DSL:-Reactor.md +++ b/documentation/dsls/DSL:-Reactor.md @@ -27,6 +27,9 @@ The top-level reactor DSL * argument * wait_for * [input](#reactor-input) + * [map](#reactor-map) + * argument + * wait_for * [step](#reactor-step) * argument * wait_for @@ -106,7 +109,7 @@ Wrap a function around a group of steps. | Name | Type | Default | Docs | |------|------|---------|------| -| [`name`](#reactor-around-name){: #reactor-around-name .spark-required} | `atom` | | A unique name of the group of steps. | +| [`name`](#reactor-around-name){: #reactor-around-name .spark-required} | `atom` | | A unique name for the group of steps. | | [`fun`](#reactor-around-fun){: #reactor-around-fun .spark-required} | `(any, any, any, any -> any) \| mfa` | | The around function. See `Reactor.Step.Around` for more information. | ### Options @@ -171,7 +174,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-around-argument-name){: #reactor-around-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-around-argument-source){: #reactor-around-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-around-argument-source){: #reactor-around-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -327,7 +330,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-collect-argument-name){: #reactor-collect-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-collect-argument-source){: #reactor-collect-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-collect-argument-source){: #reactor-collect-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -467,7 +470,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-compose-argument-name){: #reactor-compose-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-compose-argument-source){: #reactor-compose-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-compose-argument-source){: #reactor-compose-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -616,7 +619,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-debug-argument-name){: #reactor-debug-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-debug-argument-source){: #reactor-debug-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-debug-argument-source){: #reactor-debug-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -759,7 +762,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-group-argument-name){: #reactor-group-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-group-argument-source){: #reactor-group-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-group-argument-source){: #reactor-group-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -867,6 +870,206 @@ end Target: `Reactor.Dsl.Input` +## reactor.map +```elixir +map name +``` + + +Execute nested steps for every item of an iterator. + +Allows you to "map over" a collection applying a some steps to each element, +returning a list of new values. The input collection must be bounded - ie you +cannot pass infinite streams into this step or it will just loop forever - and +because it has to keep the results from each batch will eventually just use up +all available RAM. + +Under the hood we use `Iter` to handle lazy iteration of the collection. This +means that you can pass an `Iter.t` or any value for which `Iter.IntoIterable` +is implemented. + +> #### A note on ordering {: .tip} +> +> If your application doesn't need the results back in the same order that they +> were provided then setting `strict_ordering?` to `false` will increase +> performance - especially on large input sets. + + +### Nested DSLs + * [argument](#reactor-map-argument) + * [wait_for](#reactor-map-wait_for) + + +### Examples +``` +map :double_numbers do + input input(:numbers) + + step :double do + argument :number, element(:double_numbers) + + run %{number: number}, _, _ -> + {:ok, number * 2} + end + end +end + +``` + +``` +step :get_subscriptions do + run _, _, _ -> + Stripe.Subscription.list() + end +end + +map :cancel_subscriptions do + input result(:get_subscriptions) + + step :cancel do + argument :sub_id, element(:cancel_subscriptions, [:id]) + + run fn args, _, _ -> + Stripe.Subscription.cancel(arg.sub_id, %{prorate: true, invoice_now: true}) + end + end + + return :cancel +end + +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`name`](#reactor-map-name){: #reactor-map-name .spark-required} | `atom` | | A unique name for the step. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`source`](#reactor-map-source){: #reactor-map-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The iterator or enumerable to use as the source of the iteration. | +| [`allow_async?`](#reactor-map-allow_async?){: #reactor-map-allow_async? } | `boolean` | `false` | Whether the emitted steps should be allowed to run asynchronously. | +| [`batch_size`](#reactor-map-batch_size){: #reactor-map-batch_size } | `pos_integer` | `100` | The number of items to consume off the source when emitting steps. | +| [`return`](#reactor-map-return){: #reactor-map-return } | `atom` | | The name of the nested step to use as the return value. | +| [`strict_ordering?`](#reactor-map-strict_ordering?){: #reactor-map-strict_ordering? } | `boolean` | `true` | Whether the mapped values must be returned in the same order that they were provided. | + + +## reactor.map.argument +```elixir +argument name, source \\ nil +``` + + +Specifies an argument to a Reactor step. + +Each argument is a value which is either the result of another step, or an input value. + +Individual arguments can be transformed with an arbitrary function before +being passed to any steps. + + + + +### Examples +``` +argument :name, input(:name) + +``` + +``` +argument :year, input(:date, [:year]) + +``` + +``` +argument :user, result(:create_user) + +``` + +``` +argument :user_id, result(:create_user) do + transform & &1.id +end + +``` + +``` +argument :user_id, result(:create_user, [:id]) + +``` + +``` +argument :three, value(3) + +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`name`](#reactor-map-argument-name){: #reactor-map-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | +| [`source`](#reactor-map-argument-source){: #reactor-map-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`transform`](#reactor-map-argument-transform){: #reactor-map-argument-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the argument before it is passed to the step. | + + + + + +### Introspection + +Target: `Reactor.Dsl.Argument` + +## reactor.map.wait_for +```elixir +wait_for names +``` + + +Wait for the named step to complete before allowing this one to start. + +Desugars to `argument :_, result(step_to_wait_for)` + + + + +### Examples +``` +wait_for :create_user +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`names`](#reactor-map-wait_for-names){: #reactor-map-wait_for-names .spark-required} | `atom \| list(atom)` | | The name of the step to wait for. | + + + + + + +### Introspection + +Target: `Reactor.Dsl.WaitFor` + + + + +### Introspection + +Target: `Reactor.Dsl.Map` + ## reactor.step ```elixir step name, impl \\ nil @@ -983,7 +1186,7 @@ argument :three, value(3) | Name | Type | Default | Docs | |------|------|---------|------| | [`name`](#reactor-step-argument-name){: #reactor-step-argument-name .spark-required} | `atom` | | The name of the argument which will be used as the key in the `arguments` map passed to the implementation. | -| [`source`](#reactor-step-argument-source){: #reactor-step-argument-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | +| [`source`](#reactor-step-argument-source){: #reactor-step-argument-source .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the argument. See `Reactor.Dsl.Argument` for more information. | ### Options | Name | Type | Default | Docs | @@ -1067,7 +1270,7 @@ Use a predicate to determine which steps should be executed. | Name | Type | Default | Docs | |------|------|---------|------| -| [`on`](#reactor-switch-on){: #reactor-switch-on .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The value to match against. | +| [`on`](#reactor-switch-on){: #reactor-switch-on .spark-required} | `Reactor.Template.Element \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | The value to match against. | | [`allow_async?`](#reactor-switch-allow_async?){: #reactor-switch-allow_async? } | `boolean` | `true` | Whether the emitted steps should be allowed to run asynchronously. | diff --git a/lib/reactor/argument.ex b/lib/reactor/argument.ex index d651d48..9b381c8 100644 --- a/lib/reactor/argument.ex +++ b/lib/reactor/argument.ex @@ -6,6 +6,7 @@ defmodule Reactor.Argument do defstruct name: nil, source: nil, transform: nil alias Reactor.{Argument, Template} + import Reactor.Template, only: :macros @type t :: %Argument{ name: atom, @@ -64,6 +65,49 @@ defmodule Reactor.Argument do def from_value(name, value, transform \\ nil) when is_atom(name) and maybe_transform(transform), do: %Argument{name: name, source: %Template.Value{value: value}, transform: transform} + @doc """ + Build an argument which refers to to an element within a map step with an optional transformation applied. + + ## Example + + iex> Argument.from_element(:argument_name, &Atom.to_string/1) + + """ + @spec from_element(atom, atom, nil | (any -> any)) :: Argument.t() + def from_element(name, element_name, transform \\ nil) + when is_atom(name) and maybe_transform(transform), + do: %Argument{ + name: name, + source: %Template.Element{name: element_name}, + transform: transform + } + + @doc """ + Build an argument directly from a template. + + ## Example + + iex> Argument.from_template(:argument_name, Reactor.Dsl.Argument.input(:input_name)) + + """ + @spec from_template(atom, Template.t(), nil | (any -> any)) :: Argument.t() + def from_template(name, template, transform \\ nil) + when is_atom(name) and is_template(template) and maybe_transform(transform), + do: %Argument{name: name, source: template, transform: transform} + + @doc """ + Set a sub-path on the argument. + + ## Example + + iex> Argument.from_value(:example, :value) + ...> |> Argument.sub_path([:nested, :values]) + + """ + @spec sub_path(Argument.t(), [any]) :: Argument.t() + def sub_path(argument, sub_path), + do: %{argument | source: %{argument.source | sub_path: sub_path}} + @doc """ Validate that the argument is an Argument struct. """ @@ -72,17 +116,26 @@ defmodule Reactor.Argument do @doc """ Validate that the argument refers to a reactor input. """ - defguard is_from_input(argument) when is_struct(argument.source, Template.Input) + defguard is_from_input(argument) + when is_argument(argument) and is_input_template(argument.source) @doc """ Validate that the argument refers to a step result. """ - defguard is_from_result(argument) when is_struct(argument.source, Template.Result) + defguard is_from_result(argument) + when is_argument(argument) and is_result_template(argument.source) @doc """ Validate that the argument contains a static value. """ - defguard is_from_value(argument) when is_struct(argument.source, Template.Value) + defguard is_from_value(argument) + when is_argument(argument) and is_value_template(argument.source) + + @doc """ + Validate that the argument contains an element. + """ + defguard is_from_element(argument) + when is_argument(argument) and is_element_template(argument.source) @doc """ Validate that the argument has a transform. diff --git a/lib/reactor/builder/step.ex b/lib/reactor/builder/step.ex index c5b74e3..5f560d4 100644 --- a/lib/reactor/builder/step.ex +++ b/lib/reactor/builder/step.ex @@ -224,7 +224,8 @@ defmodule Reactor.Builder.Step do {:ok, argument} end - argument when is_from_result(argument) or is_from_value(argument) -> + argument + when is_from_result(argument) or is_from_value(argument) or is_from_element(argument) -> {:ok, argument} end) end diff --git a/lib/reactor/dsl.ex b/lib/reactor/dsl.ex index 4dc3d2d..b213f4a 100644 --- a/lib/reactor/dsl.ex +++ b/lib/reactor/dsl.ex @@ -30,6 +30,7 @@ defmodule Reactor.Dsl do Dsl.Debug.__entity__(), Dsl.Group.__entity__(), Dsl.Input.__entity__(), + Dsl.Map.__entity__(), Dsl.Step.__entity__(), Dsl.Switch.__entity__() ], diff --git a/lib/reactor/dsl/argument.ex b/lib/reactor/dsl/argument.ex index 64f6de1..5c74902 100644 --- a/lib/reactor/dsl/argument.ex +++ b/lib/reactor/dsl/argument.ex @@ -117,6 +117,37 @@ defmodule Reactor.Dsl.Argument do @spec value(any) :: Template.Value.t() def value(value), do: %Template.Value{value: value} + @doc ~S""" + The `element` template helper for the Reactor DSL. + + ## Example + + ```elixir + defmodule ExampleReactor do + use Reactor + + input :numbers + + map :double_numbers do + source input(:numbers) + + step :double do + argument :number, element(:double_numbers) + + run fn args, _, _ -> + {:ok, args.number * 2} + end + end + + return :double + end + end + ``` + """ + @spec element(any, [any]) :: Template.Element.t() + def element(name, sub_path \\ []) + def element(name, sub_path), do: %Template.Element{name: name, sub_path: List.wrap(sub_path)} + @doc false def __entity__, do: %Spark.Dsl.Entity{ diff --git a/lib/reactor/dsl/around.ex b/lib/reactor/dsl/around.ex index 9e49280..fd8d913 100644 --- a/lib/reactor/dsl/around.ex +++ b/lib/reactor/dsl/around.ex @@ -39,7 +39,7 @@ defmodule Reactor.Dsl.Around do type: :atom, required: true, doc: """ - A unique name of the group of steps. + A unique name for the group of steps. """ ], fun: [ @@ -91,9 +91,15 @@ defmodule Reactor.Dsl.Around do )} end - def verify(_around, _dsl_state), do: :ok - - def transform(_around, dsl_state), do: {:ok, dsl_state} + def verify(around, dsl_state) do + around.steps + |> Enum.reduce_while(:ok, fn step, :ok -> + case Dsl.Build.verify(step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end defp build_inputs(reactor, around) do around.arguments diff --git a/lib/reactor/dsl/build.ex b/lib/reactor/dsl/build.ex index a68e8df..ffc69f8 100644 --- a/lib/reactor/dsl/build.ex +++ b/lib/reactor/dsl/build.ex @@ -13,15 +13,6 @@ defprotocol Reactor.Dsl.Build do @spec build(t, Reactor.t()) :: {:ok, Reactor.t()} | {:error, any} def build(entity, reactor) - @doc """ - Perform any transformation that is needed to make the entity work in the - system before building. - - See `Spark.Dsl.Transformer` for more information. - """ - @spec transform(t, Spark.Dsl.t()) :: {:ok, Spark.Dsl.t()} | {:error, any} - def transform(entity, dsl_state) - @doc """ Perform any after-compilation verification that is needed to make the entity work. diff --git a/lib/reactor/dsl/group.ex b/lib/reactor/dsl/group.ex index 5c4928e..700fd39 100644 --- a/lib/reactor/dsl/group.ex +++ b/lib/reactor/dsl/group.ex @@ -103,12 +103,18 @@ defmodule Reactor.Dsl.Group do )} end - def verify(_group, _dsl_state), do: :ok + def verify(group, dsl_state) do + group.steps + |> Enum.reduce_while(:ok, fn step, :ok -> + case Dsl.Build.verify(step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end - def transform(_around, dsl_state), do: {:ok, dsl_state} - - defp build_inputs(reactor, around) do - around.arguments + defp build_inputs(reactor, group) do + group.arguments |> Enum.map(& &1.name) |> reduce_while_ok(reactor, &Builder.add_input(&2, &1)) end diff --git a/lib/reactor/dsl/info.ex b/lib/reactor/dsl/info.ex new file mode 100644 index 0000000..dd1f3e5 --- /dev/null +++ b/lib/reactor/dsl/info.ex @@ -0,0 +1,61 @@ +defmodule Reactor.Dsl.Info do + @moduledoc """ + Introspection for the Reactor DSL. + """ + use Spark.InfoGenerator, sections: [:reactor], extension: Reactor.Dsl + + alias Reactor.{Builder, Dsl} + alias Spark.Dsl.Extension + import Reactor.Utils + + @doc """ + Convert a reactor DSL module into a reactor struct. + """ + @spec to_struct(module | Reactor.t() | Spark.Dsl.t()) :: {:ok, Reactor.t()} | {:error, any} + def to_struct(reactor) when is_struct(reactor, Reactor), do: {:ok, reactor} + + def to_struct(module) do + with {:ok, reactor} <- entities_to_struct(module), + {:ok, reactor} <- maybe_set_return(module, reactor) do + add_middleware(module, reactor) + end + end + + @doc """ + Raising version of `to_struct/1`. + """ + @spec to_struct!(module | Reactor.t() | Spark.Dsl.t()) :: Reactor.t() | no_return + def to_struct!(reactor) do + case to_struct(reactor) do + {:ok, reactor} -> reactor + {:error, reason} -> raise reason + end + end + + defp entities_to_struct(module) when is_atom(module) do + module + |> reactor() + |> reduce_while_ok(Builder.new(module), &Dsl.Build.build/2) + end + + defp entities_to_struct(dsl_state) when is_map(dsl_state) do + module = Extension.get_persisted(dsl_state, :module) + + dsl_state + |> reactor() + |> reduce_while_ok(Builder.new(module), &Dsl.Build.build/2) + end + + defp maybe_set_return(module, reactor) do + case reactor_return(module) do + {:ok, value} -> {:ok, %{reactor | return: value}} + :error -> {:ok, reactor} + end + end + + defp add_middleware(module, reactor) do + module + |> reactor_middlewares() + |> reduce_while_ok(reactor, &Dsl.Build.build/2) + end +end diff --git a/lib/reactor/dsl/map.ex b/lib/reactor/dsl/map.ex new file mode 100644 index 0000000..e47d2dd --- /dev/null +++ b/lib/reactor/dsl/map.ex @@ -0,0 +1,235 @@ +defmodule Reactor.Dsl.Map do + @moduledoc """ + The `map` DSL entity struct. + + See `d:Reactor.map`. + """ + + defstruct __identifier__: nil, + allow_async?: false, + arguments: [], + batch_size: 100, + iterable?: true, + name: nil, + return: nil, + source: nil, + steps: [], + strict_ordering?: true + + alias Reactor.{Builder, Dsl, Step, Template} + + @type t :: %Dsl.Map{ + __identifier__: any, + allow_async?: boolean, + arguments: [Dsl.Argument.t()], + batch_size: pos_integer(), + iterable?: true, + name: atom, + return: atom, + source: Template.t(), + steps: [Dsl.Step.t()], + strict_ordering?: boolean + } + + @doc false + def __entity__, + do: %Spark.Dsl.Entity{ + name: :map, + describe: """ + Execute nested steps for every item of an iterator. + + Allows you to "map over" a collection applying a some steps to each element, + returning a list of new values. The input collection must be bounded - ie you + cannot pass infinite streams into this step or it will just loop forever - and + because it has to keep the results from each batch will eventually just use up + all available RAM. + + Under the hood we use `Iter` to handle lazy iteration of the collection. This + means that you can pass an `Iter.t` or any value for which `Iter.IntoIterable` + is implemented. + + > #### A note on ordering {: .tip} + > + > If your application doesn't need the results back in the same order that they + > were provided then setting `strict_ordering?` to `false` will increase + > performance - especially on large input sets. + """, + examples: [ + """ + map :double_numbers do + input input(:numbers) + + step :double do + argument :number, element(:double_numbers) + + run %{number: number}, _, _ -> + {:ok, number * 2} + end + end + end + """, + """ + step :get_subscriptions do + run _, _, _ -> + Stripe.Subscription.list() + end + end + + map :cancel_subscriptions do + input result(:get_subscriptions) + + step :cancel do + argument :sub_id, element(:cancel_subscriptions, [:id]) + + run fn args, _, _ -> + Stripe.Subscription.cancel(arg.sub_id, %{prorate: true, invoice_now: true}) + end + end + + return :cancel + end + """ + ], + target: Dsl.Map, + args: [:name], + identifier: :name, + imports: [Dsl.Argument], + entities: [steps: [], arguments: [Dsl.Argument.__entity__(), Dsl.WaitFor.__entity__()]], + recursive_as: :steps, + schema: [ + name: [ + type: :atom, + required: true, + doc: """ + A unique name for the step. + """ + ], + allow_async?: [ + type: :boolean, + required: false, + default: false, + doc: """ + Whether the emitted steps should be allowed to run asynchronously. + """ + ], + batch_size: [ + type: :pos_integer, + required: false, + default: 100, + doc: """ + The number of items to consume off the source when emitting steps. + """ + ], + source: [ + type: Template.type(), + required: true, + doc: """ + The iterator or enumerable to use as the source of the iteration. + """ + ], + return: [ + type: :atom, + required: false, + doc: """ + The name of the nested step to use as the return value. + """ + ], + strict_ordering?: [ + type: :boolean, + required: false, + default: true, + doc: """ + Whether the mapped values must be returned in the same order that they were provided. + """ + ] + ] + } + + defimpl Dsl.Build do + import Reactor.Utils + require Reactor.Argument + alias Reactor.Argument + alias Spark.{Dsl.Verifier, Error.DslError} + + def build(map, reactor) do + sub_reactor = Builder.new(reactor.id) + + with {:ok, sub_reactor} <- build_steps(sub_reactor, map) do + arguments = + map.arguments + |> Enum.concat([Argument.from_template(:source, map.source)]) + + step_options = + map + |> Map.take([:allow_async?, :batch_size, :return, :strict_ordering]) + |> Map.put(:state, :init) + |> Map.put(:steps, sub_reactor.steps) + |> Map.update!(:return, fn + nil -> + sub_reactor.steps + |> List.first() + |> Map.fetch!(:name) + + return -> + return + end) + |> Enum.to_list() + + Builder.add_step( + reactor, + map.name, + {Step.Map, step_options}, + arguments, + max_retries: 0, + ref: :step_name + ) + end + end + + @spec verify(Reactor.Dsl.Map.t(), any()) :: :ok | struct() + def verify(step, dsl_state) do + with :ok <- verify_at_least_one_step(step, dsl_state) do + verify_return(step, dsl_state) + end + end + + defp verify_at_least_one_step(%{steps: [], name: name}, dsl_state) do + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, :map, name], + message: "You must provide at least one child step to execute." + ) + end + + defp verify_at_least_one_step(_step, _dsl_state), do: :ok + + defp verify_return(%{return: nil, steps: [_]}, _dsl_state), do: :ok + + defp verify_return(%{return: nil, name: name}, dsl_state), + do: + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, :map, name], + message: + "You must specify which step to use as the return value when more than one nested step is present in a map." + ) + + defp verify_return(%{steps: steps, return: return, name: name}, dsl_state) do + if Enum.any?(steps, &(&1.name == return)) do + :ok + else + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, :map, name], + message: + "The name `#{inspect(return)}` does not refer to a direct descendant of the `#{inspect(name)}` step." + ) + end + end + + defp build_steps(reactor, map) do + map.steps + |> reduce_while_ok(reactor, &Dsl.Build.build/2) + end + end +end diff --git a/lib/reactor/dsl/switch.ex b/lib/reactor/dsl/switch.ex index a44a73f..bdb4833 100644 --- a/lib/reactor/dsl/switch.ex +++ b/lib/reactor/dsl/switch.ex @@ -152,7 +152,17 @@ defmodule Reactor.Dsl.Switch do )} end - def verify(_switch, _dsl_state), do: :ok + def verify(switch, dsl_state) do + switch.matches + |> Enum.flat_map(& &1.steps) + |> Enum.concat(switch.default.steps) + |> Enum.reduce_while(:ok, fn step, :ok -> + case Build.verify(step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end def transform(_switch, dsl_state), do: {:ok, dsl_state} diff --git a/lib/reactor/dsl/transformer.ex b/lib/reactor/dsl/transformer.ex index 718d168..4cc4a22 100644 --- a/lib/reactor/dsl/transformer.ex +++ b/lib/reactor/dsl/transformer.ex @@ -11,7 +11,6 @@ defmodule Reactor.Dsl.Transformer do with {:ok, step_names} <- step_names(dsl_state), {:ok, dsl_state} <- maybe_set_return(dsl_state, step_names), {:ok, dsl_state} <- validate_return(dsl_state, step_names), - {:ok, dsl_state} <- do_entity_transform(dsl_state), {:ok, reactor} <- Info.to_struct(dsl_state), {:ok, reactor} <- Planner.plan(reactor) do dsl_state = @@ -53,12 +52,6 @@ defmodule Reactor.Dsl.Transformer do end end - defp do_entity_transform(dsl_state) do - dsl_state - |> Transformer.get_entities([:reactor]) - |> reduce_while_ok(dsl_state, &Dsl.Build.transform/2) - end - defp maybe_set_return(dsl_state, step_names) do case Transformer.get_option(dsl_state, [:reactor], :return) do nil -> diff --git a/lib/reactor/dsl/verifier.ex b/lib/reactor/dsl/verifier.ex index bb9c059..68239a6 100644 --- a/lib/reactor/dsl/verifier.ex +++ b/lib/reactor/dsl/verifier.ex @@ -3,8 +3,9 @@ defmodule Reactor.Dsl.Verifier do Runs `Reactor.Dsl.Build.verify/2` for all the entities in the reactor. """ use Spark.Dsl.Verifier - alias Reactor.Dsl.Build - alias Spark.Dsl.Verifier + alias Reactor.{Argument, Dsl.Build, Dsl.Input} + alias Spark.{Dsl.Verifier, Error.DslError} + require Argument @doc false @impl true @@ -12,11 +13,89 @@ defmodule Reactor.Dsl.Verifier do def verify(dsl_state) do dsl_state |> Verifier.get_entities([:reactor]) - |> Enum.reduce_while(:ok, fn entity, :ok -> - case Build.verify(entity, dsl_state) do + |> Enum.reject(&is_struct(&1, Input)) + |> Enum.reduce_while(:ok, fn step, :ok -> + case recursively_verify_step(step, nil, dsl_state) do :ok -> {:cont, :ok} {:error, reason} -> {:halt, {:error, reason}} end end) end + + defp recursively_verify_step(%{step: [_ | _]} = step, parent_step, dsl_state) do + with :ok <- verify_step(step, parent_step, dsl_state) do + Enum.reduce_while(step.steps, :ok, fn child, :ok -> + case recursively_verify_step(child, step, dsl_state) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end + end + + defp recursively_verify_step(step, parent_step, dsl_state), + do: verify_step(step, parent_step, dsl_state) + + defp verify_step(step, parent_step, dsl_state) do + with :ok <- maybe_verify_element_arguments(step, parent_step, dsl_state) do + Build.verify(step, dsl_state) + end + end + + defp maybe_verify_element_arguments(step, parent_step, dsl_state) + when parent_step.iterable? == true do + step.arguments + |> Enum.reduce_while(:ok, fn + argument, :ok + when Argument.is_from_element(argument) and argument.source.name == parent_step.name -> + {:cont, :ok} + + argument, :ok when Argument.is_from_element(argument) -> + {:halt, + {:error, + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, step.name, :argument, argument.name], + message: """ + Element template refers to non-parent step. + + The argument `#{inspect(argument.name)}` is sourced from an element template, + however this template refers to a step which is not it's immediate parent. This + is an unsupported configuration. + """ + )}} + + _argument, :ok -> + {:cont, :ok} + end) + end + + defp maybe_verify_element_arguments(step, _parent_step, dsl_state) + when is_map_key(step, :arguments) do + step.arguments + |> Enum.reduce_while(:ok, fn + argument, :ok when Argument.is_from_element(argument) -> + {:halt, + {:error, + DslError.exception( + module: Verifier.get_persisted(dsl_state, :module), + path: [:reactor, step.name, :argument, argument.name], + message: """ + Unsupported element template in argument. + + The argument `#{inspect(argument.name)}` is sourced from an element template, + this is fine if it's being passed to a step which is an iterator however the + step type `#{inspect(step.__struct__)}` isn't an iterator. + + If you're defining your own iterable step type then you need to add the + `iterable?` field to its struct with its value set to `true`. + """ + )}} + + _argument, :ok -> + {:cont, :ok} + end) + end + + defp maybe_verify_element_arguments(_step, _parent_step, _dsl_state), do: :ok end diff --git a/lib/reactor/error/internal/unreachable_error.ex b/lib/reactor/error/internal/unreachable_error.ex new file mode 100644 index 0000000..81bb1de --- /dev/null +++ b/lib/reactor/error/internal/unreachable_error.ex @@ -0,0 +1,161 @@ +defmodule Reactor.Error.Internal.UnreachableError do + @moduledoc """ + An error that should never happen. + """ + + use Reactor.Error, + fields: [:bindings, :message, :file, :line], + class: :reactor + + @doc false + @impl true + def message(error) do + [ + """ + # Unreachable Error + + You should _never_ see this error in the wild. If you do please raise an issue on + the Reactor repository: + + https://github.com/ash-project/reactor/issues/new + + And paste the following information: + + --- BEGIN COPY --- + + Reached unreachable code at #{error.file}:#{error.line}: + + #{error.message} + """ + ] + |> maybe_append(maybe_format_bindings(error)) + |> maybe_append(maybe_format_stacktrace(error)) + |> maybe_append(format_system_info()) + |> maybe_append(format_running_applications()) + |> Enum.join("\n") + end + + @doc """ + Create an unreachable error. + """ + @spec unreachable(String.t()) :: Macro.output() + defmacro unreachable(message) do + quote do + unquote(__MODULE__).exception( + bindings: binding(), + line: __ENV__.line, + file: __ENV__.file, + message: unquote(message) + ) + end + end + + @doc """ + Bang version of `unreachable/1`. + """ + @spec unreachable!(String.t()) :: Macro.output() + defmacro unreachable!(message) do + quote do + raise unquote(__MODULE__).exception( + bindings: binding(), + line: __ENV__.line, + file: __ENV__.file, + message: unquote(message) + ) + end + end + + defp maybe_format_bindings(error) do + if Enum.any?(error.bindings) do + bindings = + error.bindings + |> Enum.map_join("\n", fn {name, value} -> + " - `#{inspect(name)}`: `#{inspect(value)}`" + end) + + """ + Bindings: + + #{bindings} + """ + end + end + + defp maybe_format_stacktrace(error) do + if error.stacktrace do + stacktrace = + error.stacktrace.stacktrace + |> Enum.drop(2) + |> Exception.format_stacktrace() + + """ + Backtrace: + + #{stacktrace} + """ + end + end + + # sobelow_skip ["Traversal.FileModule"] + defp format_system_info do + elixir = System.build_info() + + erlang_vsn = + [ + :code.root_dir(), + "releases", + :erlang.system_info(:otp_release), + "OTP_VERSION" + ] + |> Path.join() + |> File.read!() + |> String.trim() + + system = + with {_, code} when code > 0 <- System.cmd("uname", ["-a"]), + {_, code} when code > 0 <- System.cmd("ver", []) do + {family, name} = :os.type() + + version = + case :os.version() do + version when is_tuple(version) -> + version + |> Tuple.to_list() + |> Enum.map_join(".", &to_string/1) + + version when is_list(version) -> + to_string(version) + end + + "#{name} #{family} / #{version}" + else + {uname, 0} -> uname + end + + """ + System: + + Elixir #{elixir["version"]} (#{elixir[:revision]}) compiled with Erlang/OTP #{elixir[:otp_release]} + Erlang/OTP #{erlang_vsn} [erts-#{:erlang.system_info(:version)}] + #{system} + """ + end + + defp format_running_applications do + applications = + Application.loaded_applications() + |> Enum.map_join("\n", fn {app, _, vsn} -> + " - #{app} #{vsn}" + end) + + """ + Running applications: + + #{applications} + + --- END COPY --- + + Please carefully read all of the above and redact any sensitive information. + """ + end +end diff --git a/lib/reactor/error/invalid/invalid_result_error.ex b/lib/reactor/error/invalid/invalid_result_error.ex new file mode 100644 index 0000000..b51f3c5 --- /dev/null +++ b/lib/reactor/error/invalid/invalid_result_error.ex @@ -0,0 +1,38 @@ +defmodule Reactor.Error.Invalid.InvalidResultError do + @moduledoc """ + This error is returned when a step returns an invalid result. + """ + + use Reactor.Error, fields: [:reactor, :step, :result, :arguments], class: :invalid + + @doc false + def message(error) do + """ + # Invalid Result Error + + The step `#{inspect(error.step.name)}` returned an invalid result. + + Valid return types from the `c:Reactor.Step.run/3` callback are: + + - `{:ok, any}` - a successful result. + - `{:ok, any, [Reactor.Step.t]}` - a successful result with additional steps to + add to the running reactor. + - `:retry` - the step wants to be retried. + - `{:retry, Exception.t | any}` - the step wants to be retried, and here's why. + - `{:error, Exception.t | any}` - the step failed, and here's why. + - `{:halt, any}` - the step wants the Reactor to stop. + + ## `result`: + + #{inspect(error.result)} + + ## `step`: + + #{inspect(error.step)} + + ## `arguments`: + + #{inspect(error.arguments)} + """ + end +end diff --git a/lib/reactor/executor.ex b/lib/reactor/executor.ex index acfecb4..173a2cf 100644 --- a/lib/reactor/executor.ex +++ b/lib/reactor/executor.ex @@ -270,15 +270,16 @@ defmodule Reactor.Executor do end defp find_ready_steps(reactor, _state) do - step = - reactor.plan - |> Graph.vertices() - |> Enum.find(fn - step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0 - _ -> false - end) - - {:continue, [step]} + reactor.plan + |> Graph.vertices() + |> Enum.find(fn + step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0 + _ -> false + end) + |> case do + nil -> {:continue, []} + step -> {:continue, [step]} + end end defp maybe_release_pool(state) when state.pool_owner == true do diff --git a/lib/reactor/executor/async.ex b/lib/reactor/executor/async.ex index 35616c5..bc05ad5 100644 --- a/lib/reactor/executor/async.ex +++ b/lib/reactor/executor/async.ex @@ -71,105 +71,125 @@ defmodule Reactor.Executor.Async do end @doc """ - Check to see if any steps are completed, and if so handle them. + Handle zero or one completed async steps and then decide what to do. """ @spec handle_completed_steps(Reactor.t(), Executor.State.t()) :: {:recurse | :continue | :undo | :halt, Reactor.t(), Executor.State.t()} def handle_completed_steps(reactor, state) do - completed_task_results = get_normalised_task_results(state.current_tasks, 100) - - reactor - |> delete_vertices(Map.keys(completed_task_results)) - |> handle_completed_steps(state, completed_task_results) + completed_task_results = get_normalised_task_results(state, timeout: 100) + handle_completed_task_results(reactor, state, completed_task_results) end - defp handle_completed_steps(reactor, state, completed_task_results) - when map_size(completed_task_results) == 0, - do: {:continue, reactor, state} + defp handle_completed_task_results(reactor, state, []), + do: {:continue, reactor, state} - defp handle_completed_steps(reactor, state, completed_task_results) do - release_concurrency_resources_to_pool(state.concurrency_key, map_size(completed_task_results)) + defp handle_completed_task_results(reactor, state, completed_task_results) do + Enum.reduce( + completed_task_results, + {:recurse, reactor, state}, + fn task_result, {status, reactor, state} -> + {new_status, reactor, state} = handle_completed_step(reactor, state, task_result) - new_current_tasks = Map.drop(state.current_tasks, Map.keys(completed_task_results)) + if got_worse?(status, new_status) do + {new_status, reactor, state} + else + {status, reactor, state} + end + end + ) + end - completed_step_results = - completed_task_results - |> Map.values() - |> Map.new() + defp got_worse?(:recurse, :undo), do: true + defp got_worse?(:recurse, :halt), do: true + defp got_worse?(:undo, :halt), do: true + defp got_worse?(_old, _new), do: false - retry_steps = - completed_step_results - |> Enum.filter(fn - {_, :retry} -> true - {_, {:retry, _}} -> true - _ -> false - end) - |> Enum.map(&elem(&1, 0)) - - steps_to_remove = - completed_step_results - |> Map.keys() - |> MapSet.new() - |> MapSet.difference(MapSet.new(retry_steps)) - |> Enum.to_list() - - steps_to_append = - completed_step_results - |> Map.values() - |> Enum.flat_map(fn - {:ok, _, steps} -> steps - _ -> [] - end) + defp handle_completed_step(reactor, state, {task, step, {:ok, value, new_steps}}) do + state = + state + |> drop_task(task) reactor = reactor - |> store_successful_results_in_the_undo_stack(completed_step_results) - |> store_intermediate_results(completed_step_results) - |> delete_vertices(steps_to_remove) - |> append_steps(steps_to_append) + |> drop_from_plan(task) + |> maybe_store_undo(step, value) + |> maybe_store_intermediate_result(step, value) + reactor = + case Enum.split_with(new_steps, &(&1.name == step.name)) do + {[], new_steps} -> + reactor + |> drop_from_plan(step) + |> append_steps(new_steps) + + {recursive_steps, new_steps} -> + recursive_steps = Enum.map(recursive_steps, &%{&1 | ref: step.ref}) + + reactor + |> append_steps(recursive_steps) + |> append_steps(new_steps) + end + + {:recurse, reactor, state} + end + + defp handle_completed_step(reactor, state, {task, step, {:retry, error}}) do state = state - |> increment_retry_counts(retry_steps) - |> collect_errors(completed_step_results) + |> increment_retries(step) + |> drop_task(task) - status = - completed_task_results - |> Enum.find_value(:ok, fn - {_task, {_step, {:halt, _}}} -> - :halt + reactor = + reactor + |> drop_from_plan(task) - {_task, {_step, {:error, _}}} -> - :undo + if Map.get(state.retries, step.ref) >= step.max_retries do + error = + error || + RetriesExceededError.exception( + step: step, + retry_count: Map.get(state.retries, step.ref) + ) - {_task, {step, :retry}} -> - if Map.get(state.retries, step.ref) >= step.max_retries, - do: :undo - - _ -> - nil - end) - - state = %{state | current_tasks: new_current_tasks} - - case status do - :ok -> - {:recurse, reactor, state} - - :undo -> - {reactor, state} = collect_remaining_tasks_for_shutdown(reactor, state) - {:undo, reactor, state} - - :halt -> - {reactor, state} = collect_remaining_tasks_for_shutdown(reactor, state) - {:halt, reactor, state} + reactor = drop_from_plan(reactor, step) + {:undo, reactor, add_error(state, error)} + else + {:recurse, reactor, state} end end - defp get_normalised_task_results(current_tasks, timeout) do + defp handle_completed_step(reactor, state, {task, step, {:error, error}}) do + state = + state + |> drop_task(task) + |> add_error(error) + + reactor = + reactor + |> drop_from_plan(task) + |> drop_from_plan(step) + + {:undo, reactor, state} + end + + defp handle_completed_step(reactor, state, {task, step, {:halt, value}}) do + state = + state + |> drop_task(task) + + reactor = + reactor + |> drop_from_plan(task) + |> drop_from_plan(step) + |> store_intermediate_result(step, value) + + {:halt, reactor, state} + end + + defp get_normalised_task_results(%{current_tasks: current_tasks}, opts) do current_tasks |> Map.keys() - |> Task.yield_many(timeout) + |> Task.yield_many(opts) |> Stream.reject(&is_nil(elem(&1, 1))) |> Stream.map(fn {task, {:ok, {:error, reason}}} -> @@ -179,7 +199,7 @@ defmodule Reactor.Executor.Async do {task, {:halt, reason}} {task, {:ok, :retry}} -> - {task, :retry} + {task, {:retry, nil}} {task, {:ok, {:retry, reason}}} -> {task, {:retry, reason}} @@ -190,11 +210,53 @@ defmodule Reactor.Executor.Async do {task, {:exit, reason}} -> {task, {:error, reason}} end) - |> Map.new(fn {task, result} -> - {task, {Map.fetch!(current_tasks, task), result}} + |> Enum.map(fn {task, result} -> + {task, Map.fetch!(current_tasks, task), result} end) end + defp drop_task(state, task) do + ConcurrencyTracker.release(state.concurrency_key, 1) + + %{state | current_tasks: Map.delete(state.current_tasks, task)} + end + + defp increment_retries(state, step) do + %{state | retries: Map.update(state.retries, step.ref, 0, &(&1 + 1))} + end + + defp drop_from_plan(reactor, step) do + %{reactor | plan: Graph.delete_vertex(reactor.plan, step)} + end + + defp add_error(state, error) do + %{state | errors: [error | state.errors]} + end + + defp store_intermediate_result(reactor, step, value) do + %{reactor | intermediate_results: Map.put(reactor.intermediate_results, step.name, value)} + end + + defp maybe_store_undo(reactor, step, value) do + if Step.can?(step, :undo) do + %{reactor | undo: [{step, value} | reactor.undo]} + else + reactor + end + end + + defp maybe_store_intermediate_result(reactor, step, value) when reactor.return == step.name do + store_intermediate_result(reactor, step, value) + end + + defp maybe_store_intermediate_result(reactor, step, value) do + if Graph.out_degree(reactor.plan, step) > 0 do + store_intermediate_result(reactor, step, value) + else + reactor + end + end + defp store_successful_results_in_the_undo_stack(reactor, completed_step_results) when map_size(completed_step_results) == 0, do: reactor @@ -246,47 +308,6 @@ defmodule Reactor.Executor.Async do } end - defp increment_retry_counts(state, retry_steps) do - retries = - retry_steps - |> Enum.reduce(state.retries, fn step, retries -> - Map.update(retries, step.ref, 1, &(&1 + 1)) - end) - - %{state | retries: retries} - end - - defp collect_errors(state, completed_step_results) do - errors = - completed_step_results - |> Enum.filter(fn - {_step, {:error, _}} -> - true - - {step, {:retry, _}} -> - Map.get(state.retries, step.ref) >= step.max_retries - - {step, :retry} -> - Map.get(state.retries, step.ref) >= step.max_retries - - _ -> - false - end) - |> Enum.map(fn - {_step, {_, reason}} -> - reason - - {step, :retry} -> - RetriesExceededError.exception( - step: step, - retry_count: Map.get(state.retries, step.ref) - ) - end) - |> Enum.concat(state.errors) - - %{state | errors: errors} - end - @doc """ When the Reactor needs to shut down for any reason, we need to await all the currently running asynchronous steps and delete any task vertices. @@ -299,14 +320,16 @@ defmodule Reactor.Executor.Async do end def collect_remaining_tasks_for_shutdown(reactor, state) do - remaining_task_results = get_normalised_task_results(state.current_tasks, state.halt_timeout) + remaining_task_results = + get_normalised_task_results(state, timeout: state.halt_timeout, on_timeout: :ignore) - release_concurrency_resources_to_pool(state.concurrency_key, map_size(remaining_task_results)) + release_concurrency_resources_to_pool(state.concurrency_key, length(remaining_task_results)) remaining_step_results = remaining_task_results - |> Map.values() - |> Map.new() + |> Map.new(fn {_task, step, result} -> {step, result} end) + + finished_tasks = remaining_step_results |> Enum.map(&elem(&1, 0)) reactor = reactor @@ -315,7 +338,7 @@ defmodule Reactor.Executor.Async do unfinished_tasks = state.current_tasks - |> Map.delete(Map.keys(remaining_task_results)) + |> Map.delete(finished_tasks) unfinished_task_count = map_size(unfinished_tasks) @@ -327,7 +350,7 @@ defmodule Reactor.Executor.Async do |> Enum.map_join("\n * ", &inspect/1) """ - Waited #{state.halt_timeout}ms for async steps to complete, however #{unfinished_task_count} are still running and will be abandoned and cannot be undone. + Waited #{state.halt_timeout}ms for async steps to complete, however #{unfinished_task_count} are still running, will be abandoned and cannot be undone. * #{unfinished_steps} """ diff --git a/lib/reactor/executor/state.ex b/lib/reactor/executor/state.ex index 761dc17..dcd79de 100644 --- a/lib/reactor/executor/state.ex +++ b/lib/reactor/executor/state.ex @@ -53,7 +53,7 @@ defmodule Reactor.Executor.State do |> maybe_set_max_concurrency() |> maybe_allocate_concurrency_pool() |> Map.put(:started_at, DateTime.utc_now()) - |> then(&struct(__MODULE__, &1)) + |> then(&struct!(__MODULE__, &1)) end defp maybe_set_max_concurrency(attrs) diff --git a/lib/reactor/executor/step_runner.ex b/lib/reactor/executor/step_runner.ex index e0b32bd..27fc451 100644 --- a/lib/reactor/executor/step_runner.ex +++ b/lib/reactor/executor/step_runner.ex @@ -5,6 +5,7 @@ defmodule Reactor.Executor.StepRunner do alias Reactor.{ Error.Invalid.ArgumentSubpathError, Error.Invalid.CompensateStepError, + Error.Invalid.InvalidResultError, Error.Invalid.MissingInputError, Error.Invalid.MissingResultError, Error.Invalid.RunStepError, @@ -20,6 +21,8 @@ defmodule Reactor.Executor.StepRunner do import Reactor.Argument, only: :macros require Logger + @dialyzer {:nowarn_function, handle_run_result: 5} + # In the future this could be moved into a step property. @max_undo_count 5 @@ -161,6 +164,16 @@ defmodule Reactor.Executor.StepRunner do {:halt, value} end + defp handle_run_result(result, reactor, step, arguments, _context) do + {:error, + InvalidResultError.exception( + reactor: reactor, + step: step, + result: result, + arguments: arguments + )} + end + defp maybe_compensate(reactor, step, error, arguments, context) do if Step.can?(step, :compensate) do compensate(reactor, step, error, arguments, context) diff --git a/lib/reactor/executor/sync.ex b/lib/reactor/executor/sync.ex index f94bc6d..bc24ef6 100644 --- a/lib/reactor/executor/sync.ex +++ b/lib/reactor/executor/sync.ex @@ -15,57 +15,72 @@ defmodule Reactor.Executor.Sync do def run(reactor, state, nil), do: {:continue, reactor, state} def run(reactor, state, step) do - case Executor.StepRunner.run(reactor, state, step, state.concurrency_key) do - :retry -> - state = increment_retries(state, step) + result = Executor.StepRunner.run(reactor, state, step, state.concurrency_key) - if Map.get(state.retries, step.ref) >= step.max_retries do - reactor = drop_from_plan(reactor, step) + handle_completed_step(reactor, state, step, result) + end - error = - RetriesExceededError.exception( - step: step, - retry_count: Map.get(state.retries, step.ref) - ) + defp handle_completed_step(reactor, state, step, :retry) do + handle_completed_step(reactor, state, step, {:retry, nil}) + end - {:undo, reactor, %{state | errors: [error | state.errors]}} - else - {:recurse, reactor, state} - end + defp handle_completed_step(reactor, state, step, {:retry, error}) do + state = increment_retries(state, step) - {:retry, reason} -> - state = increment_retries(state, step) + if Map.get(state.retries, step.ref) >= step.max_retries do + reactor = drop_from_plan(reactor, step) - if Map.get(state.retries, step.ref) >= step.max_retries do - reactor = drop_from_plan(reactor, step) - {:undo, reactor, %{state | errors: [reason | state.errors]}} - else - {:recurse, reactor, state} - end + error = + error || + RetriesExceededError.exception( + step: step, + retry_count: Map.get(state.retries, step.ref) + ) - {:ok, value, new_steps} -> - reactor = + {:undo, reactor, %{state | errors: [error | state.errors]}} + else + {:recurse, reactor, state} + end + end + + defp handle_completed_step(reactor, state, step, {:ok, value, new_steps}) do + reactor = + reactor + |> maybe_store_undo(step, value) + |> maybe_store_intermediate_result(step, value) + + reactor = + case Enum.split_with(new_steps, &(&1.name == step.name)) do + {[], new_steps} -> reactor - |> maybe_store_undo(step, value) - |> maybe_store_intermediate_result(step, value) |> drop_from_plan(step) |> append_steps(new_steps) - {:recurse, reactor, state} + {recursive_steps, new_steps} -> + recursive_steps = Enum.map(recursive_steps, &%{&1 | ref: step.ref}) - {:error, reason} -> - state = %{state | errors: [reason | state.errors]} - reactor = drop_from_plan(reactor, step) - {:undo, reactor, state} - - {:halt, value} -> - reactor = reactor - |> drop_from_plan(step) |> store_intermediate_result(step, value) + |> append_steps(recursive_steps) + |> append_steps(new_steps) + end - {:halt, reactor, state} - end + {:recurse, reactor, state} + end + + defp handle_completed_step(reactor, state, step, {:error, reason}) do + state = %{state | errors: [reason | state.errors]} + reactor = drop_from_plan(reactor, step) + {:undo, reactor, state} + end + + defp handle_completed_step(reactor, state, step, {:halt, value}) do + reactor = + reactor + |> drop_from_plan(step) + |> store_intermediate_result(step, value) + + {:halt, reactor, state} end defp increment_retries(state, step) do @@ -84,16 +99,15 @@ defmodule Reactor.Executor.Sync do end end + defp maybe_store_intermediate_result(reactor, step, value) when reactor.return == step.name do + store_intermediate_result(reactor, step, value) + end + defp maybe_store_intermediate_result(reactor, step, value) do - cond do - Graph.out_degree(reactor.plan, step) > 0 -> - store_intermediate_result(reactor, step, value) - - reactor.return == step.name -> - store_intermediate_result(reactor, step, value) - - true -> - reactor + if Graph.out_degree(reactor.plan, step) > 0 do + store_intermediate_result(reactor, step, value) + else + reactor end end diff --git a/lib/reactor/planner.ex b/lib/reactor/planner.ex index dd51df7..03cf58e 100644 --- a/lib/reactor/planner.ex +++ b/lib/reactor/planner.ex @@ -50,15 +50,22 @@ defmodule Reactor.Planner do steps_by_name = graph |> Graph.vertices() - |> Enum.concat(steps) + |> Stream.filter(&is_struct(&1, Step)) + |> Stream.concat(steps) |> Map.new(&{&1.name, &1}) steps |> reduce_while_ok(graph, fn step, graph when is_struct(step, Step) -> - graph - |> Graph.add_vertex(step, step.name) - |> reduce_arguments_into_graph(step, steps_by_name) + if Graph.has_vertex?(graph, step) do + graph + |> Graph.replace_vertex(step, step) + |> reduce_arguments_into_graph(step, steps_by_name) + else + graph + |> Graph.add_vertex(step, step.name) + |> reduce_arguments_into_graph(step, steps_by_name) + end not_step, _ -> {:error, @@ -72,7 +79,7 @@ defmodule Reactor.Planner do defp reduce_arguments_into_graph(graph, current_step, steps_by_name) do reduce_while_ok(current_step.arguments, graph, fn - argument, graph when is_argument(argument) and is_from_result(argument) -> + argument, graph when is_from_result(argument) -> dependency_name = argument.source.name case Map.fetch(steps_by_name, dependency_name) do @@ -95,8 +102,7 @@ defmodule Reactor.Planner do )} end - argument, graph - when is_argument(argument) and (is_from_input(argument) or is_from_value(argument)) -> + argument, graph when is_from_input(argument) or is_from_value(argument) -> {:ok, graph} end) end diff --git a/lib/reactor/step/map.ex b/lib/reactor/step/map.ex new file mode 100644 index 0000000..67a0d1f --- /dev/null +++ b/lib/reactor/step/map.ex @@ -0,0 +1,352 @@ +defmodule Reactor.Step.Map do + use Reactor.Step + require Reactor.Argument + require Reactor.Error.Internal.UnreachableError + require Iter + alias Reactor.{Argument, Builder, Error.Internal.UnreachableError, Step, Template} + alias Spark.Options + import Reactor.Utils + + @option_schema [ + state: [ + type: {:in, [:init, :iterating]}, + required: true, + doc: """ + The current execution state of the map. This is required because it's recursive. + """ + ], + batch_size: [ + type: :pos_integer, + required: false, + default: 100, + doc: """ + The number of elements to consume off the source when emitting steps. + """ + ], + steps: [ + type: {:list, {:struct, Step}}, + required: true, + doc: """ + The steps to use when mapping source elements. + """ + ], + return: [ + type: :atom, + required: true, + doc: """ + The name of the step whose result will be used as the new value for each element. + """ + ], + strict_ordering?: [ + type: :boolean, + required: false, + default: true, + doc: """ + Whether the mapped values must be returned in the same order that they were provided. + """ + ], + allow_async?: [ + type: :boolean, + required: false, + default: true, + doc: """ + Whether the emitted steps should be allowed to run asynchronously. + """ + ], + descendant_step_names: [ + type: {:struct, MapSet}, + required: false, + doc: """ + The cached names of all descendant steps to aid rewriting. You don't need to provide this value - it is calculated by the init pass. + """ + ], + extra_arguments: [ + type: {:list, {:struct, Argument}}, + required: false, + doc: """ + Extra arguments to be passed by to every nested step. + """ + ] + ] + + @moduledoc """ + Given an iterable input run the provided steps for each element and collect + the results into a new value. + + > #### A note on ordering {: .tip} + > + > If your application doesn't need the results back in the same order that + > they were provided then setting `strict_ordering?` to `false` will increase + > performance - especially on large input sets. + + ## Options + + #{Options.docs(@option_schema)} + """ + + @doc false + @impl true + def run(arguments, context, options) do + with {:ok, options} <- Options.validate(options, @option_schema) do + case options[:state] do + :init -> do_init(arguments.source, arguments, options, context.current_step) + :iterating -> do_iterate(arguments, options, context.current_step) + end + end + end + + defp do_init(source, arguments, options, map_step) when Iter.is_iter(source) do + source = + source + |> Iter.with_index() + + extra_arguments = + arguments + |> Map.drop([:source, :result]) + |> Enum.map(fn {name, value} -> + Argument.from_value(name, value) + end) + + options = + options + |> Keyword.put_new_lazy(:descendant_step_names, fn -> + collect_all_step_names(options[:steps]) + end) + |> Keyword.put(:state, :iterating) + |> Keyword.put(:extra_arguments, extra_arguments) + + emit_batch(source, options, map_step, []) + end + + defp do_init(source, arguments, options, map_step) do + source + |> Iter.from() + |> do_init(arguments, options, map_step) + end + + defp do_iterate(arguments, options, map_step) do + {source, arguments} = Map.pop!(arguments, :source) + {result, arguments} = Map.pop!(arguments, :result) + + map_step_name = map_step.name + + result = + Enum.reduce(arguments, result, fn {{__MODULE__, ^map_step_name, :element, index}, value}, + result -> + [{index, value} | result] + end) + + emit_batch(source, options, map_step, result) + end + + defp collect_all_step_names(steps, into \\ MapSet.new()) + defp collect_all_step_names([], into), do: into + + defp collect_all_step_names([%{steps: [_ | _] = child_steps} = step | steps], into) do + into = collect_all_step_names(child_steps, MapSet.put(into, step.name)) + collect_all_step_names(steps, into) + end + + defp collect_all_step_names([step | steps], into), + do: collect_all_step_names(steps, MapSet.put(into, step.name)) + + defp emit_batch(source, options, map_step, result) do + with {:done, batch} <- Iter.take_chunk(source, options[:batch_size]), + {:done, []} <- {:done, Iter.to_list(batch)} do + finalise_result(result, options) + else + {:ok, batch, remainder} -> do_emit_batch(batch, remainder, options, map_step, result) + {:done, batch} -> do_emit_batch(batch, Iter.empty(), options, map_step, result) + end + end + + defp do_emit_batch(batch, remainder, options, map_step, result) do + with {:ok, arguments} <- arguments_for_batch(batch, options, map_step), + {:ok, recursive_step} <- + Builder.new_step( + map_step.name, + {__MODULE__, options}, + Enum.concat(arguments, [ + Argument.from_value(:source, remainder), + Argument.from_result(:result, map_step.name) + ]) + ), + {:ok, steps} <- steps_for_batch(batch, options, map_step) do + steps = Enum.concat(steps, [recursive_step]) + + {:ok, result, steps} + end + end + + defp finalise_result(result, options) do + if options[:strict_ordering?] do + result = + result + |> Enum.sort_by(&elem(&1, 0)) + |> Enum.map(&elem(&1, 1)) + + {:ok, result} + else + {:ok, Map.values(result)} + end + end + + # generate a whole heap of arguments for the recursive step so that it can + # collect up the whole batch. + defp arguments_for_batch(batch, options, map_step) do + arguments = + Enum.map(batch, fn {_element, index} -> + %Argument{ + name: {__MODULE__, map_step.name, :element, index}, + source: %Template.Result{name: {__MODULE__, map_step.name, options[:return], index}} + } + end) + + {:ok, arguments} + end + + defp steps_for_batch(batch, options, map_step) do + steps = options[:steps] + descendant_step_names = options[:descendant_step_names] + extra_arguments = options[:extra_arguments] + + reduce_while_ok(batch, [], fn {element, index}, result -> + case rewrite_steps_for_element( + {element, index}, + steps, + descendant_step_names, + map_step, + extra_arguments, + options[:allow_async?] + ) do + {:ok, steps} -> reduce_while_ok(steps, result, &{:ok, [&1 | &2]}) + {:error, reason} -> {:error, reason} + end + end) + end + + defp rewrite_steps_for_element( + {element, index}, + steps, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + map_while_ok( + steps, + &rewrite_step_for_element( + &1, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) + ) + end + + defp rewrite_step_for_element( + step, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + with {:ok, step} <- + rewrite_arguments( + step, + {element, index}, + descendant_step_names, + map_step + ), + {:ok, step} <- + rewrite_nested_steps_for_element( + step, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + {:ok, + %{ + step + | arguments: Enum.concat(step.arguments, extra_arguments), + name: {__MODULE__, map_step.name, step.name, index}, + ref: {__MODULE__, map_step.name, step.ref, index}, + async?: allow_async? + }} + end + end + + defp rewrite_arguments(step, {element, index}, descendant_step_names, map_step) do + map_while_ok(step.arguments, fn + argument + when Argument.is_from_element(argument) and argument.source.name != map_step.name -> + {:error, + UnreachableError.unreachable( + "Attempted to retrieve an element whose source doesn't match the current map step: #{inspect(argument.source.name)} vs #{inspect(map_step.name)}" + )} + + argument when Argument.is_from_element(argument) -> + argument = + argument.name + |> Argument.from_value(element) + |> Argument.sub_path(argument.source.sub_path) + + {:ok, argument} + + argument when Argument.is_from_result(argument) -> + if MapSet.member?(descendant_step_names, argument.source.name) do + argument = %{ + argument + | source: %{ + argument.source + | name: {__MODULE__, map_step.name, argument.source.name, index} + } + } + + {:ok, argument} + else + {:ok, argument} + end + + argument -> + {:ok, argument} + end) + |> and_then(&{:ok, %{step | arguments: &1}}) + end + + defp rewrite_nested_steps_for_element( + %{steps: [_ | _] = steps} = step, + {element, index}, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + with {:ok, steps} <- + rewrite_steps_for_element( + {element, index}, + steps, + descendant_step_names, + map_step, + extra_arguments, + allow_async? + ) do + {:ok, %{step | steps: steps}} + end + end + + defp rewrite_nested_steps_for_element( + step, + _element_index, + _descendant_step_names, + _map_step, + _extra_arguments, + _allow_async? + ), + do: {:ok, step} +end diff --git a/lib/reactor/template.ex b/lib/reactor/template.ex index 639a71e..ded0070 100644 --- a/lib/reactor/template.ex +++ b/lib/reactor/template.ex @@ -2,14 +2,13 @@ defmodule Reactor.Template do @moduledoc """ Templates used to refer to some sort of computed value. """ + alias __MODULE__.{Element, Input, Result, Value} - alias __MODULE__.{Input, Result, Value} - - @type t :: Input.t() | Result.t() | Value.t() + @type t :: Element.t() | Input.t() | Result.t() | Value.t() @doc "The type for use in option schemas" @spec type :: Spark.Options.type() - def type, do: {:or, [{:struct, Input}, {:struct, Result}, {:struct, Value}]} + def type, do: {:or, [{:struct, Element}, {:struct, Input}, {:struct, Result}, {:struct, Value}]} @doc "A guard for input templates" @spec is_input_template(any) :: Macro.output() @@ -23,9 +22,13 @@ defmodule Reactor.Template do @spec is_value_template(any) :: Macro.output() defguard is_value_template(template) when is_struct(template, Value) + @doc "A guard for element templates" + @spec is_element_template(any) :: Macro.output() + defguard is_element_template(template) when is_struct(template, Element) + @doc "A guard to detect all template types" @spec is_template(any) :: Macro.output() defguard is_template(template) when is_input_template(template) or is_result_template(template) or - is_value_template(template) + is_value_template(template) or is_element_template(template) end diff --git a/lib/reactor/template/element.ex b/lib/reactor/template/element.ex new file mode 100644 index 0000000..16ec104 --- /dev/null +++ b/lib/reactor/template/element.ex @@ -0,0 +1,9 @@ +defmodule Reactor.Template.Element do + @moduledoc """ + The `element` template. + """ + + defstruct name: nil, sub_path: [] + + @type t :: %__MODULE__{name: atom, sub_path: [atom]} +end diff --git a/lib/reactor/template/value.ex b/lib/reactor/template/value.ex index cdbba44..f58c9ed 100644 --- a/lib/reactor/template/value.ex +++ b/lib/reactor/template/value.ex @@ -3,7 +3,7 @@ defmodule Reactor.Template.Value do A statically `value` template. """ - defstruct value: nil + defstruct value: nil, sub_path: [] - @type t :: %__MODULE__{value: any} + @type t :: %__MODULE__{value: any, sub_path: [any]} end diff --git a/lib/reactor/utils.ex b/lib/reactor/utils.ex index 370211e..07327cc 100644 --- a/lib/reactor/utils.ex +++ b/lib/reactor/utils.ex @@ -98,6 +98,41 @@ defmodule Reactor.Utils do end end + @doc """ + Perform a flat map over an enumerable provided that the mapper function + continues to return ok tuples. + """ + @spec flat_map_while_ok(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) :: + {:ok, Enumerable.t(output)} | {:error, any} + when input: any, output: any + def flat_map_while_ok(inputs, mapper, preserve_order? \\ false) + + def flat_map_while_ok(inputs, mapper, false) do + reduce_while_ok(inputs, [], fn input, acc -> + case mapper.(input) do + {:ok, result} -> reduce_while_ok(result, acc, &[&1 | &2]) + {:error, reason} -> {:error, reason} + end + end) + end + + def flat_map_while_ok(inputs, mapper, true) do + inputs + |> flat_map_while_ok(mapper, false) + |> and_then(&{:ok, Enum.reverse(&1)}) + end + + @doc "Raising version of `flat_map_while_ok/3`" + @spec flat_map_while_ok!(Enumerable.t(input), (input -> {:ok, output} | {:error, any}), boolean) :: + Enumerable.t(output) | no_return + when input: any, output: any + def flat_map_while_ok!(inputs, mapper, preserve_order? \\ false) do + case flat_map_while_ok(inputs, mapper, preserve_order?) do + {:ok, result} -> result + {:error, reason} -> raise reason + end + end + @doc """ Perform a reduction over an enumerable provided that the reduction function returns an ok tuple. diff --git a/mix.exs b/mix.exs index 53b804c..42993de 100644 --- a/mix.exs +++ b/mix.exs @@ -95,6 +95,7 @@ defmodule Reactor.MixProject do {:splode, "~> 0.2"}, {:libgraph, "~> 0.16"}, {:igniter, "~> 0.2"}, + {:iterex, "~> 0.1"}, {:telemetry, "~> 1.2"}, # Dev/Test dependencies diff --git a/mix.lock b/mix.lock index 368f7c2..baa7fb9 100644 --- a/mix.lock +++ b/mix.lock @@ -16,6 +16,7 @@ "glob_ex": {:hex, :glob_ex, "0.1.7", "eae6b6377147fb712ac45b360e6dbba00346689a87f996672fe07e97d70597b1", [:mix], [], "hexpm", "decc1c21c0c73df3c9c994412716345c1692477b9470e337f628a7e08da0da6a"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "igniter": {:hex, :igniter, "0.2.10", "078a1308924e2cffce7956b00e01794ec218ed09cea969d0a9911ee91f885c99", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.4", [hex: :req, repo: "hexpm", optional: false]}, {:rewrite, "~> 0.9", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "071080c4676226764b9513138f185e842993ff95bbf483e78536db50396986eb"}, + "iterex": {:hex, :iterex, "0.1.1", "90378a9561ad87da46737dceaf02e68a0b3023746216a4de34a0c509f5f505d4", [:mix], [], "hexpm", "c4f5916a6dbb03aa4c3d5c480069e13075ca6a57bd0c28d643da3891962440ad"}, "jason": {:hex, :jason, "1.4.3", "d3f984eeb96fe53b85d20e0b049f03e57d075b5acda3ac8d465c969a2536c17b", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9a90e868927f7c777689baa16d86f4d0e086d968db5c05d917ccff6d443e58a3"}, "libgraph": {:hex, :libgraph, "0.16.0", "3936f3eca6ef826e08880230f806bfea13193e49bf153f93edcf0239d4fd1d07", [:mix], [], "hexpm", "41ca92240e8a4138c30a7e06466acc709b0cbb795c643e9e17174a178982d6bf"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, diff --git a/test/reactor/dsl/map_test.exs b/test/reactor/dsl/map_test.exs new file mode 100644 index 0000000..4bc1455 --- /dev/null +++ b/test/reactor/dsl/map_test.exs @@ -0,0 +1,36 @@ +defmodule Reactor.Dsl.MapTest do + @moduledoc false + use ExUnit.Case, async: true + + defmodule MapOverNumbersReactor do + @moduledoc false + use Reactor + + input :numbers + + step :multiplier do + run fn _ -> {:ok, 2} end + end + + map :map_over_numbers do + source(input(:numbers)) + argument :multiplier, result(:multiplier) + batch_size(2) + + step :double do + argument :input, element(:map_over_numbers) + + run fn %{input: input, multiplier: multiplier}, _ -> + {:ok, input * multiplier} + end + end + end + end + + test "it maps over it's inputs" do + numbers = [0, 2, 4, 6, 8, 10] + + assert {:ok, [0, 4, 8, 12, 16, 20]} = + Reactor.run!(MapOverNumbersReactor, %{numbers: numbers}, %{}, async?: false) + end +end diff --git a/test/reactor/executor/async_test.exs b/test/reactor/executor/async_test.exs index 5b6507f..b7ef75d 100644 --- a/test/reactor/executor/async_test.exs +++ b/test/reactor/executor/async_test.exs @@ -204,12 +204,14 @@ defmodule Reactor.Executor.AsyncTest do assert Graph.has_vertex?(reactor.plan, step) end - test "when one of the steps asks to retry, it increments the retry count for the step", + test "when one of the steps asks to retry, it sets the retry count for the step", %{reactor: reactor, state: state, doable: doable, supervisor: supervisor} do task = Task.Supervisor.async_nolink(supervisor, fn -> :retry end) state = %{state | current_tasks: %{task => doable}} + refute is_map_key(state.retries, doable.ref) + assert {_, _reactor, state} = handle_completed_steps(reactor, state) - assert state.retries[doable.ref] == 1 + assert state.retries[doable.ref] == 0 end test "when one of the steps asks to retry (again), it increments the retry count for the step", diff --git a/test/reactor/executor/init_test.exs b/test/reactor/executor/init_test.exs index 9bfe97e..8dbc6f2 100644 --- a/test/reactor/executor/init_test.exs +++ b/test/reactor/executor/init_test.exs @@ -2,7 +2,7 @@ defmodule Reactor.Executor.InitTest do @moduledoc false use ExUnit.Case, async: true import Reactor.Executor.Init - alias Reactor.{Builder, Executor} + alias Reactor.Builder use Mimic describe "init/4" do @@ -52,14 +52,5 @@ defmodule Reactor.Executor.InitTest do assert {:error, error} = init(reactor, [], [], [:wat]) assert Exception.message(error) =~ "cannot be converted into a map" end - - test "options are passed into `State.init/1`", %{reactor: reactor} do - Executor.State - |> expect(:init, fn options -> - assert options.hello == :marty - end) - - assert {:ok, _reactor, _state} = init(reactor, [], [], hello: :marty) - end end end diff --git a/test/test_helper.exs b/test/test_helper.exs index d9bfe99..a523dca 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,5 +1,4 @@ Mimic.copy(Example.Step.Compensable) Mimic.copy(Example.Step.Doable) Mimic.copy(Example.Step.Undoable) -Mimic.copy(Reactor.Executor.State) ExUnit.start()