mirror of
https://github.com/ash-project/reactor.git
synced 2024-09-19 21:03:28 +12:00
220 lines
6.2 KiB
Elixir
220 lines
6.2 KiB
Elixir
defmodule Reactor do
|
|
alias Reactor.{Dsl, Error.Validation.StateError, Executor, Step}
|
|
|
|
@moduledoc """
|
|
Reactor is a dynamic, concurrent, dependency resolving saga orchestrator.
|
|
|
|
## Usage
|
|
|
|
You can construct a reactor using the `Reactor` Spark DSL:
|
|
|
|
```elixir
|
|
defmodule HelloWorldReactor do
|
|
@moduledoc false
|
|
use Reactor
|
|
|
|
input :whom
|
|
|
|
step :greet, Greeter do
|
|
argument :whom, input(:whom)
|
|
end
|
|
|
|
return :greet
|
|
end
|
|
```
|
|
|
|
iex> Reactor.run(HelloWorldReactor, %{whom: "Dear Reader"})
|
|
{:ok, "Hello, Dear Reader!"}
|
|
|
|
or you can build it programmatically:
|
|
|
|
iex> reactor = Builder.new()
|
|
...> {:ok, reactor} = Builder.add_input(reactor, :whom)
|
|
...> {:ok, reactor} = Builder.add_step(reactor, :greet, Greeter, whom: {:input, :whom})
|
|
...> {:ok, reactor} = Builder.return(reactor, :greet)
|
|
...> Reactor.run(reactor, %{whom: nil})
|
|
{:ok, "Hello, World!"}
|
|
"""
|
|
|
|
defstruct context: %{},
|
|
id: nil,
|
|
inputs: [],
|
|
intermediate_results: %{},
|
|
middleware: [],
|
|
plan: nil,
|
|
return: nil,
|
|
state: :pending,
|
|
steps: [],
|
|
undo: []
|
|
|
|
use Spark.Dsl, default_extensions: [extensions: [Dsl]]
|
|
|
|
@type context :: %{optional(atom) => any}
|
|
@type context_arg :: Enumerable.t({atom, any})
|
|
|
|
@typedoc """
|
|
Specify the maximum number of asynchronous steps which can be run in parallel.
|
|
|
|
Defaults to the result of `System.schedulers_online/0`. Only used if
|
|
`async?` is set to `true`.
|
|
"""
|
|
@type max_concurrency_option :: {:max_concurrency, pos_integer()}
|
|
|
|
@typedoc """
|
|
Specify the amount of execution time after which to halt processing.
|
|
|
|
Note that this is not a hard limit. The Reactor will stop when the first step
|
|
completes _after_ the timeout has expired.
|
|
|
|
Defaults to `:infinity`.
|
|
"""
|
|
@type timeout_option :: {:timeout, pos_integer() | :infinity}
|
|
|
|
@typedoc """
|
|
The maximum number of iterations which after which the Reactor will halt.
|
|
|
|
Defaults to `:infinity`.
|
|
"""
|
|
@type max_iterations_option :: {:max_iterations, pos_integer() | :infinity}
|
|
|
|
@typedoc """
|
|
How long to wait for asynchronous steps to complete when halting.
|
|
|
|
Defaults to 5000ms.
|
|
"""
|
|
@type halt_timeout_option :: {:halt_timeout, pos_integer() | :infinity}
|
|
|
|
@typedoc """
|
|
When set to `false` forces the Reactor to run every step synchronously,
|
|
regardless of the step configuration.
|
|
|
|
Defaults to `true`.
|
|
"""
|
|
@type async_option :: {:async?, boolean}
|
|
|
|
@typedoc """
|
|
Use a `Reactor.Executor.ConcurrencyTracker.pool_key` to allow this Reactor to
|
|
share it's concurrency pool with other Reactor instances.
|
|
|
|
If you do not specify one then the Reactor will initialise a new pool and
|
|
place it in it's context for any child Reactors to re-use.
|
|
|
|
Only used if `async?` is set to `true`.
|
|
"""
|
|
@type concurrency_key_option :: {:concurrency_key, reference()}
|
|
|
|
@type options ::
|
|
Enumerable.t(
|
|
max_concurrency_option
|
|
| timeout_option
|
|
| max_iterations_option
|
|
| halt_timeout_option
|
|
| async_option
|
|
| concurrency_key_option
|
|
)
|
|
|
|
@type state :: :pending | :executing | :halted | :failed | :successful
|
|
@type inputs :: %{optional(atom) => any}
|
|
|
|
@type t :: %Reactor{
|
|
context: context,
|
|
id: any,
|
|
inputs: [atom],
|
|
intermediate_results: %{any => any},
|
|
middleware: [Reactor.Middleware.t()],
|
|
plan: nil | Graph.t(),
|
|
undo: [{Step.t(), any}],
|
|
return: any,
|
|
state: state,
|
|
steps: [Step.t()]
|
|
}
|
|
|
|
@doc "A guard which returns true if the value is a Reactor struct"
|
|
@spec is_reactor(any) :: Macro.t()
|
|
defguard is_reactor(reactor) when is_struct(reactor, __MODULE__)
|
|
|
|
@option_schema [
|
|
max_concurrency: [
|
|
type: :pos_integer,
|
|
required: false,
|
|
doc: "The maximum number of processes to use to run the Reactor"
|
|
],
|
|
timeout: [
|
|
type: {:or, [:pos_integer, {:literal, :infinity}]},
|
|
required: false,
|
|
default: :infinity,
|
|
doc: "How long to allow the Reactor to run for"
|
|
],
|
|
max_iterations: [
|
|
type: {:or, [:pos_integer, {:literal, :infinity}]},
|
|
required: false,
|
|
default: :infinity,
|
|
doc: "The maximum number of times to allow the Reactor to loop"
|
|
],
|
|
async_option: [
|
|
type: :boolean,
|
|
required: false,
|
|
default: true,
|
|
doc: "Whether to allow the Reactor to start processes"
|
|
],
|
|
concurrency_key_option: [
|
|
type: :reference,
|
|
required: false,
|
|
hide: true
|
|
]
|
|
]
|
|
|
|
@doc """
|
|
Attempt to run a Reactor.
|
|
|
|
## Arguments
|
|
|
|
* `reactor` - The Reactor to run, either a Reactor DSL module, or a Reactor
|
|
struct.
|
|
* `inputs` - A map of values passed in to satisfy the Reactor's expected
|
|
inputs.
|
|
* `context` - An arbitrary map that will be merged into the Reactor context
|
|
and passed into each step.
|
|
|
|
## Options
|
|
|
|
#{Spark.Options.docs(@option_schema)}
|
|
"""
|
|
@doc spark_opts: [{4, @option_schema}]
|
|
@spec run(t | module, inputs, context_arg, options) :: {:ok, any} | {:error, any} | {:halted, t}
|
|
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
|
|
|
|
def run(reactor, inputs, context, options) when is_atom(reactor) do
|
|
if Spark.Dsl.is?(reactor, Reactor) do
|
|
run(reactor.reactor(), inputs, context, options)
|
|
else
|
|
{:error,
|
|
ArgumentError.exception(message: "Module `#{inspect(reactor)}` is not a Reactor module")}
|
|
end
|
|
end
|
|
|
|
def run(reactor, inputs, context, options)
|
|
when is_reactor(reactor) and reactor.state in ~w[pending halted]a do
|
|
Executor.run(reactor, inputs, context, options)
|
|
end
|
|
|
|
def run(reactor, _inputs, _context, _options) do
|
|
{:error,
|
|
StateError.exception(
|
|
reactor: reactor,
|
|
state: reactor.state,
|
|
expected: ~w[pending halted]a
|
|
)}
|
|
end
|
|
|
|
@doc "Raising version of `run/4`."
|
|
@spec run!(t | module, inputs, context_arg, options) :: any | no_return
|
|
def run!(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
|
|
|
|
def run!(reactor, inputs, context, options) do
|
|
case run(reactor, inputs, context, options) do
|
|
{:ok, value} -> {:ok, value}
|
|
{:error, reason} -> raise reason
|
|
end
|
|
end
|
|
end
|