mirror of
https://github.com/ash-project/reactor.git
synced 2024-09-19 12:53:19 +12:00
improvement(Reactor.Executor): track concurrent process usage across multiple reactors. (#26)
This commit is contained in:
parent
cb51823f83
commit
b985126894
15 changed files with 375 additions and 62 deletions
|
@ -1,5 +1,5 @@
|
||||||
defmodule Reactor do
|
defmodule Reactor do
|
||||||
alias Reactor.{Dsl, Executor, Planner, Step}
|
alias Reactor.{Dsl, Executor, Step}
|
||||||
|
|
||||||
@moduledoc """
|
@moduledoc """
|
||||||
Reactor is a dynamic, concurrent, dependency resolving saga orchestrator.
|
Reactor is a dynamic, concurrent, dependency resolving saga orchestrator.
|
||||||
|
@ -68,7 +68,8 @@ defmodule Reactor do
|
||||||
@typedoc """
|
@typedoc """
|
||||||
Specify the maximum number of asynchronous steps which can be run in parallel.
|
Specify the maximum number of asynchronous steps which can be run in parallel.
|
||||||
|
|
||||||
Defaults to the result of `System.schedulers_online/0`.
|
Defaults to the result of `System.schedulers_online/0`. Only used if
|
||||||
|
`async?` is set to `true`.
|
||||||
"""
|
"""
|
||||||
@type max_concurrency_option :: {:max_concurrency, pos_integer()}
|
@type max_concurrency_option :: {:max_concurrency, pos_integer()}
|
||||||
|
|
||||||
|
@ -104,6 +105,17 @@ defmodule Reactor do
|
||||||
"""
|
"""
|
||||||
@type async_option :: {:async?, boolean}
|
@type async_option :: {:async?, boolean}
|
||||||
|
|
||||||
|
@typedoc """
|
||||||
|
Use a `Reactor.Executor.ConcurrencyTracker.pool_key` to allow this Reactor to
|
||||||
|
share it's concurrency pool with other Reactor instances.
|
||||||
|
|
||||||
|
If you do not specify one then the Reactor will initialise a new pool and
|
||||||
|
place it in it's context for any child Reactors to re-use.
|
||||||
|
|
||||||
|
Only used if `async?` is set to `true`.
|
||||||
|
"""
|
||||||
|
@type concurrency_key_option :: {:concurrency_key, reference()}
|
||||||
|
|
||||||
@type options ::
|
@type options ::
|
||||||
Enumerable.t(
|
Enumerable.t(
|
||||||
max_concurrency_option
|
max_concurrency_option
|
||||||
|
@ -111,6 +123,7 @@ defmodule Reactor do
|
||||||
| max_iterations_option
|
| max_iterations_option
|
||||||
| halt_timeout_option
|
| halt_timeout_option
|
||||||
| async_option
|
| async_option
|
||||||
|
| concurrency_key_option
|
||||||
)
|
)
|
||||||
|
|
||||||
@type state :: :pending | :executing | :halted | :failed | :successful
|
@type state :: :pending | :executing | :halted | :failed | :successful
|
||||||
|
@ -148,11 +161,6 @@ defmodule Reactor do
|
||||||
|
|
||||||
def run(reactor, inputs, context, options)
|
def run(reactor, inputs, context, options)
|
||||||
when is_reactor(reactor) and reactor.state in ~w[pending halted]a do
|
when is_reactor(reactor) and reactor.state in ~w[pending halted]a do
|
||||||
with {:ok, reactor} <- maybe_plan(reactor) do
|
Executor.run(reactor, inputs, context, options)
|
||||||
Executor.run(reactor, inputs, context, options)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_plan(reactor) when reactor.steps == [], do: {:ok, reactor}
|
|
||||||
defp maybe_plan(reactor), do: Planner.plan(reactor)
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -6,7 +6,10 @@ defmodule Reactor.Application do
|
||||||
@impl true
|
@impl true
|
||||||
@spec start(any, any) :: {:error, any} | {:ok, pid}
|
@spec start(any, any) :: {:error, any} | {:ok, pid}
|
||||||
def start(_type, _args) do
|
def start(_type, _args) do
|
||||||
[{PartitionSupervisor, child_spec: Task.Supervisor, name: Reactor.TaskSupervisor}]
|
[
|
||||||
|
{PartitionSupervisor, child_spec: Task.Supervisor, name: Reactor.TaskSupervisor},
|
||||||
|
Reactor.Executor.ConcurrencyTracker
|
||||||
|
]
|
||||||
|> Supervisor.start_link(strategy: :one_for_one, name: __MODULE__.Supervisor)
|
|> Supervisor.start_link(strategy: :one_for_one, name: __MODULE__.Supervisor)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -2,7 +2,7 @@ defmodule Reactor.Dsl.Compose do
|
||||||
@moduledoc """
|
@moduledoc """
|
||||||
The `compose` DSL entity struct.
|
The `compose` DSL entity struct.
|
||||||
|
|
||||||
See `Reactor.Dsl`.
|
See the `Reactor` DSL docs.
|
||||||
"""
|
"""
|
||||||
defstruct arguments: [], name: nil, reactor: nil
|
defstruct arguments: [], name: nil, reactor: nil
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,8 @@ defmodule Reactor.Executor do
|
||||||
recurse or continue if none are found.
|
recurse or continue if none are found.
|
||||||
2. Find any async steps in the plan which are ready to run (they have no
|
2. Find any async steps in the plan which are ready to run (they have no
|
||||||
in-edges in the graph) and start as many as possible (given the constraints
|
in-edges in the graph) and start as many as possible (given the constraints
|
||||||
of `max_concurrency`). Either start over, or continue if none are found.
|
of `max_concurrency` and the state of the concurrency pool). Either start
|
||||||
|
over, or continue if none are found.
|
||||||
3. Find a single synchronous step which is ready to run and execute it. If
|
3. Find a single synchronous step which is ready to run and execute it. If
|
||||||
there was one then recurse, otherwise continue.
|
there was one then recurse, otherwise continue.
|
||||||
4. Check if there are no more steps left in the plan (there are zero
|
4. Check if there are no more steps left in the plan (there are zero
|
||||||
|
@ -20,20 +21,21 @@ defmodule Reactor.Executor do
|
||||||
following happens:
|
following happens:
|
||||||
|
|
||||||
1. When the step is successful:
|
1. When the step is successful:
|
||||||
a. If the step is undoable (ie `Step.can?(module, :undo)?` returns `true`)
|
a. If the step is undoable (ie `Step.can?(module, :undo)?` returns `true`)
|
||||||
then the step and the result are stored in the Reactor's undo stack.
|
then the step and the result are stored in the Reactor's undo stack.
|
||||||
b. If the result is depended upon by another step (the graph has out-edges
|
b. If the result is depended upon by another step (the graph has out-edges
|
||||||
for the step) _or_ the step is asking the reactor to halt then the
|
for the step) _or_ the step is asking the reactor to halt then the
|
||||||
result is stored in the Reactor's intermediate results.
|
result is stored in the Reactor's intermediate results.
|
||||||
c. The step is removed from the graph (along with it's out-edges, freeing
|
c. The step is removed from the graph (along with it's out-edges, freeing
|
||||||
up it's dependents to run).
|
up it's dependents to run).
|
||||||
2. When the step is unsuccessful (returns an error tuple or raises):
|
2. When the step is unsuccessful (returns an error tuple or raises):
|
||||||
a. If the step can be compensated then compensation is attempted up to
|
a. If the step can be compensated then compensation is attempted up to five
|
||||||
five times before giving up.
|
times before giving up.
|
||||||
b. The reactor iterates it's undo stack calling undo on each step.
|
b. The reactor iterates it's undo stack calling undo on each step.
|
||||||
3. When a step or compensation asks for a retry then the step is placed back
|
3. When a step or compensation asks for a retry then the step is placed back
|
||||||
in the graph to be run again next iteration.
|
in the graph to be run again next iteration.
|
||||||
"""
|
"""
|
||||||
|
alias Reactor.Executor.ConcurrencyTracker
|
||||||
alias Reactor.{Executor, Planner, Step}
|
alias Reactor.{Executor, Planner, Step}
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
@ -63,6 +65,7 @@ defmodule Reactor.Executor do
|
||||||
|
|
||||||
defp execute(reactor, state) when state.max_iterations == 0 do
|
defp execute(reactor, state) when state.max_iterations == 0 do
|
||||||
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
|
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
|
||||||
|
maybe_release_pool(state)
|
||||||
{:halted, %{reactor | state: :halted}}
|
{:halted, %{reactor | state: :halted}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -83,12 +86,15 @@ defmodule Reactor.Executor do
|
||||||
handle_undo(reactor, state)
|
handle_undo(reactor, state)
|
||||||
|
|
||||||
{:halt, reactor, _state} ->
|
{:halt, reactor, _state} ->
|
||||||
|
maybe_release_pool(state)
|
||||||
{:halted, %{reactor | state: :halted}}
|
{:halted, %{reactor | state: :halted}}
|
||||||
|
|
||||||
{:ok, result} ->
|
{:ok, result} ->
|
||||||
|
maybe_release_pool(state)
|
||||||
{:ok, result}
|
{:ok, result}
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
|
maybe_release_pool(state)
|
||||||
{:error, reason}
|
{:error, reason}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -160,7 +166,7 @@ defmodule Reactor.Executor do
|
||||||
defp handle_undo(_reactor, state, []), do: {:error, state.errors}
|
defp handle_undo(_reactor, state, []), do: {:error, state.errors}
|
||||||
|
|
||||||
defp handle_undo(reactor, state, [{step, value} | tail]) do
|
defp handle_undo(reactor, state, [{step, value} | tail]) do
|
||||||
case Executor.StepRunner.undo(reactor, step, value) do
|
case Executor.StepRunner.undo(reactor, step, value, state.concurrency_key) do
|
||||||
:ok -> handle_undo(reactor, state, tail)
|
:ok -> handle_undo(reactor, state, tail)
|
||||||
{:error, reason} -> handle_undo(reactor, %{state | errors: [reason | state.errors]}, tail)
|
{:error, reason} -> handle_undo(reactor, %{state | errors: [reason | state.errors]}, tail)
|
||||||
end
|
end
|
||||||
|
@ -187,4 +193,10 @@ defmodule Reactor.Executor do
|
||||||
|
|
||||||
{:continue, steps}
|
{:continue, steps}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp maybe_release_pool(state) when state.pool_owner == true do
|
||||||
|
ConcurrencyTracker.release_pool(state.concurrency_key)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_release_pool(_), do: :ok
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,6 +3,7 @@ defmodule Reactor.Executor.Async do
|
||||||
Handle the asynchronous execution of a batch of steps, along with any
|
Handle the asynchronous execution of a batch of steps, along with any
|
||||||
mutations to the reactor or execution state.
|
mutations to the reactor or execution state.
|
||||||
"""
|
"""
|
||||||
|
alias Reactor.Executor.ConcurrencyTracker
|
||||||
alias Reactor.{Executor, Step}
|
alias Reactor.{Executor, Step}
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
|
@ -34,25 +35,30 @@ defmodule Reactor.Executor.Async do
|
||||||
started =
|
started =
|
||||||
steps
|
steps
|
||||||
|> Enum.take(available_concurrency)
|
|> Enum.take(available_concurrency)
|
||||||
|
|> Enum.take_while(&acquire_concurrency_resource_from_pool(state.concurrency_key, &1))
|
||||||
|> Enum.reduce_while(%{}, fn step, started ->
|
|> Enum.reduce_while(%{}, fn step, started ->
|
||||||
case start_task_for_step(reactor, step, supervisor) do
|
case start_task_for_step(reactor, step, supervisor, state.concurrency_key) do
|
||||||
{:ok, task} -> {:cont, Map.put(started, task, step)}
|
{:ok, task} -> {:cont, Map.put(started, task, step)}
|
||||||
{:error, reason} -> {:halt, {:error, reason}}
|
{:error, reason} -> {:halt, {:error, reason}}
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
reactor = add_task_edges(reactor, started)
|
if map_size(started) > 0 do
|
||||||
state = %{state | current_tasks: Map.merge(state.current_tasks, started)}
|
reactor = add_task_edges(reactor, started)
|
||||||
{:recurse, reactor, state}
|
state = %{state | current_tasks: Map.merge(state.current_tasks, started)}
|
||||||
|
{:recurse, reactor, state}
|
||||||
|
else
|
||||||
|
{:continue, reactor, state}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp start_task_for_step(reactor, step, supervisor) do
|
defp start_task_for_step(reactor, step, supervisor, pool_key) do
|
||||||
{:ok,
|
{:ok,
|
||||||
Task.Supervisor.async_nolink(
|
Task.Supervisor.async_nolink(
|
||||||
supervisor,
|
supervisor,
|
||||||
Executor.StepRunner,
|
Executor.StepRunner,
|
||||||
:run,
|
:run,
|
||||||
[reactor, step]
|
[reactor, step, pool_key]
|
||||||
)}
|
)}
|
||||||
rescue
|
rescue
|
||||||
error -> {:error, error}
|
error -> {:error, error}
|
||||||
|
@ -76,6 +82,8 @@ defmodule Reactor.Executor.Async do
|
||||||
do: {:continue, reactor, state}
|
do: {:continue, reactor, state}
|
||||||
|
|
||||||
defp handle_completed_steps(reactor, state, completed_task_results) do
|
defp handle_completed_steps(reactor, state, completed_task_results) do
|
||||||
|
release_concurrency_resources_to_pool(state.concurrency_key, map_size(completed_task_results))
|
||||||
|
|
||||||
new_current_tasks = Map.drop(state.current_tasks, Map.keys(completed_task_results))
|
new_current_tasks = Map.drop(state.current_tasks, Map.keys(completed_task_results))
|
||||||
|
|
||||||
completed_step_results =
|
completed_step_results =
|
||||||
|
@ -262,6 +270,8 @@ defmodule Reactor.Executor.Async do
|
||||||
def collect_remaining_tasks_for_shutdown(reactor, state) do
|
def collect_remaining_tasks_for_shutdown(reactor, state) do
|
||||||
remaining_task_results = get_normalised_task_results(state.current_tasks, state.halt_timeout)
|
remaining_task_results = get_normalised_task_results(state.current_tasks, state.halt_timeout)
|
||||||
|
|
||||||
|
release_concurrency_resources_to_pool(state.concurrency_key, map_size(remaining_task_results))
|
||||||
|
|
||||||
remaining_step_results =
|
remaining_step_results =
|
||||||
remaining_task_results
|
remaining_task_results
|
||||||
|> Map.values()
|
|> Map.values()
|
||||||
|
@ -326,4 +336,18 @@ defmodule Reactor.Executor.Async do
|
||||||
defp append_steps(reactor, steps) do
|
defp append_steps(reactor, steps) do
|
||||||
%{reactor | steps: Enum.concat(steps, reactor.steps)}
|
%{reactor | steps: Enum.concat(steps, reactor.steps)}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp release_concurrency_resources_to_pool(_pool_key, 0), do: :ok
|
||||||
|
|
||||||
|
defp release_concurrency_resources_to_pool(pool_key, n) when n > 0 do
|
||||||
|
ConcurrencyTracker.release(pool_key)
|
||||||
|
release_concurrency_resources_to_pool(pool_key, n - 1)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp acquire_concurrency_resource_from_pool(pool_key, _) do
|
||||||
|
case ConcurrencyTracker.acquire(pool_key) do
|
||||||
|
:ok -> true
|
||||||
|
:error -> false
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
112
lib/reactor/executor/concurrency_tracker.ex
Normal file
112
lib/reactor/executor/concurrency_tracker.ex
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
defmodule Reactor.Executor.ConcurrencyTracker do
|
||||||
|
@moduledoc """
|
||||||
|
Manage shared concurrency pools for multiple Reactors.
|
||||||
|
|
||||||
|
When running a Reactor you can pass the `concurrency_key` option, which will
|
||||||
|
cause the Reactor to use the specified pool to ensure that the combined
|
||||||
|
Reactors never exceed the pool's available concurrency limit.
|
||||||
|
|
||||||
|
This avoids nested Reactors spawning too many workers and thrashing the
|
||||||
|
system.
|
||||||
|
"""
|
||||||
|
|
||||||
|
use GenServer
|
||||||
|
|
||||||
|
@type pool_key :: reference()
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@spec start_link(any) :: GenServer.on_start()
|
||||||
|
def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl true
|
||||||
|
@spec init(any) :: {:ok, atom | :ets.tid()}
|
||||||
|
def init(_) do
|
||||||
|
table = :ets.new(__MODULE__, ~w[set named_table public]a)
|
||||||
|
{:ok, table}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl true
|
||||||
|
def handle_cast({:monitor, pid}, table) do
|
||||||
|
Process.monitor(pid)
|
||||||
|
{:noreply, table}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc false
|
||||||
|
@impl true
|
||||||
|
def handle_info({:DOWN, _ref, :process, pid, _reason}, table) do
|
||||||
|
:ets.select_delete(table, [{{:_, :_, :_, :"$1"}, [], [{:==, :"$1", pid}]}])
|
||||||
|
{:noreply, table}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Allocate a new concurrency pool and set the maximum limit.
|
||||||
|
"""
|
||||||
|
@spec allocate_pool(non_neg_integer) :: pool_key
|
||||||
|
def allocate_pool(concurrency_limit) do
|
||||||
|
key = make_ref()
|
||||||
|
caller = self()
|
||||||
|
:ets.insert(__MODULE__, {key, concurrency_limit, concurrency_limit, caller})
|
||||||
|
GenServer.cast(__MODULE__, {:monitor, caller})
|
||||||
|
key
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Release the concurrency pool.
|
||||||
|
|
||||||
|
This deletes the pool, however doesn't affect any processes currently using
|
||||||
|
it. No more resources can be acquired by users of the pool key.
|
||||||
|
"""
|
||||||
|
@spec release_pool(pool_key) :: :ok
|
||||||
|
def release_pool(pool_key) do
|
||||||
|
:ets.delete(__MODULE__, pool_key)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Release a concurrency allocation back to the pool.
|
||||||
|
"""
|
||||||
|
@spec release(pool_key) :: :ok
|
||||||
|
def release(key) do
|
||||||
|
:ets.select_replace(__MODULE__, [
|
||||||
|
{{:"$1", :"$2", :"$3", :"$4"},
|
||||||
|
[{:andalso, {:"=<", {:+, :"$2", 1}, :"$3"}, {:==, :"$1", key}}],
|
||||||
|
[{{:"$1", {:+, :"$2", 1}, :"$3", :"$4"}}]}
|
||||||
|
])
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Attempt to acquire a concurrency allocation from the pool.
|
||||||
|
|
||||||
|
Returns `:ok` if the allocation was successful, otherwise `:error`.
|
||||||
|
"""
|
||||||
|
@spec acquire(pool_key) :: :ok | :error
|
||||||
|
def acquire(key) do
|
||||||
|
__MODULE__
|
||||||
|
|> :ets.select_replace([
|
||||||
|
{{:"$1", :"$2", :"$3", :"$4"}, [{:andalso, {:>=, {:-, :"$2", 1}, 0}, {:==, :"$1", key}}],
|
||||||
|
[{{:"$1", {:-, :"$2", 1}, :"$3", :"$4"}}]}
|
||||||
|
])
|
||||||
|
|> case do
|
||||||
|
0 -> :error
|
||||||
|
1 -> :ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Report the available and maximum concurrency for a pool.
|
||||||
|
"""
|
||||||
|
@spec status(pool_key) :: {:ok, available, limit} | {:error, any}
|
||||||
|
when available: non_neg_integer(), limit: pos_integer()
|
||||||
|
def status(key) do
|
||||||
|
__MODULE__
|
||||||
|
|> :ets.lookup(key)
|
||||||
|
|> case do
|
||||||
|
[{_, available, limit, _}] -> {:ok, available, limit}
|
||||||
|
[] -> {:error, "Unknown concurrency pool"}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -5,36 +5,75 @@ defmodule Reactor.Executor.State do
|
||||||
This is run-time only information.
|
This is run-time only information.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
defstruct async?: true,
|
@defaults %{
|
||||||
|
async?: true,
|
||||||
|
halt_timeout: 5000,
|
||||||
|
max_iterations: :infinity,
|
||||||
|
timeout: :infinity
|
||||||
|
}
|
||||||
|
|
||||||
|
defstruct async?: @defaults.async?,
|
||||||
|
concurrency_key: nil,
|
||||||
current_tasks: %{},
|
current_tasks: %{},
|
||||||
errors: [],
|
errors: [],
|
||||||
halt_timeout: 5000,
|
halt_timeout: @defaults.halt_timeout,
|
||||||
max_concurrency: nil,
|
max_concurrency: nil,
|
||||||
max_iterations: :infinity,
|
max_iterations: @defaults.max_iterations,
|
||||||
|
pool_owner: false,
|
||||||
retries: %{},
|
retries: %{},
|
||||||
timeout: :infinity,
|
started_at: nil,
|
||||||
started_at: nil
|
timeout: @defaults.timeout
|
||||||
|
|
||||||
alias Reactor.Step
|
alias Reactor.{Executor.ConcurrencyTracker, Step}
|
||||||
|
|
||||||
@type t :: %__MODULE__{
|
@type t :: %__MODULE__{
|
||||||
async?: boolean,
|
async?: boolean,
|
||||||
|
concurrency_key: ConcurrencyTracker.pool_key(),
|
||||||
current_tasks: %{Task.t() => Step.t()},
|
current_tasks: %{Task.t() => Step.t()},
|
||||||
errors: [any],
|
errors: [any],
|
||||||
halt_timeout: pos_integer() | :infinity,
|
halt_timeout: pos_integer() | :infinity,
|
||||||
max_concurrency: pos_integer(),
|
max_concurrency: pos_integer(),
|
||||||
max_iterations: pos_integer() | :infinity,
|
max_iterations: pos_integer() | :infinity,
|
||||||
|
pool_owner: boolean,
|
||||||
retries: %{reference() => pos_integer()},
|
retries: %{reference() => pos_integer()},
|
||||||
timeout: pos_integer() | :infinity,
|
started_at: DateTime.t(),
|
||||||
started_at: DateTime.t()
|
timeout: pos_integer() | :infinity
|
||||||
}
|
}
|
||||||
|
|
||||||
@doc false
|
@doc false
|
||||||
@spec init(map) :: t
|
@spec init(map) :: t
|
||||||
def init(attrs \\ %{}) do
|
def init(attrs \\ %{}) do
|
||||||
|
@defaults
|
||||||
|
|> Map.merge(attrs)
|
||||||
|
|> do_init()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_init(attrs) do
|
||||||
attrs
|
attrs
|
||||||
|> Map.put_new_lazy(:max_concurrency, &System.schedulers_online/0)
|
|> maybe_set_max_concurrency()
|
||||||
|
|> maybe_allocate_concurrency_pool()
|
||||||
|> Map.put(:started_at, DateTime.utc_now())
|
|> Map.put(:started_at, DateTime.utc_now())
|
||||||
|> then(&struct(__MODULE__, &1))
|
|> then(&struct(__MODULE__, &1))
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp maybe_set_max_concurrency(attrs)
|
||||||
|
when is_integer(attrs.max_concurrency) and attrs.max_concurrency > 0,
|
||||||
|
do: attrs
|
||||||
|
|
||||||
|
defp maybe_set_max_concurrency(attrs) when attrs.async? == false,
|
||||||
|
do: Map.put(attrs, :max_concurrency, 0)
|
||||||
|
|
||||||
|
defp maybe_set_max_concurrency(attrs),
|
||||||
|
do: Map.put(attrs, :max_concurrency, System.schedulers_online())
|
||||||
|
|
||||||
|
defp maybe_allocate_concurrency_pool(attrs) when is_reference(attrs.concurrency_key) do
|
||||||
|
attrs
|
||||||
|
|> Map.put(:pool_owner, false)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_allocate_concurrency_pool(attrs) do
|
||||||
|
attrs
|
||||||
|
|> Map.put(:concurrency_key, ConcurrencyTracker.allocate_pool(attrs.max_concurrency))
|
||||||
|
|> Map.put(:pool_owner, true)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -2,7 +2,7 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
@moduledoc """
|
@moduledoc """
|
||||||
Run an individual step, including compensation if possible.
|
Run an individual step, including compensation if possible.
|
||||||
"""
|
"""
|
||||||
alias Reactor.Step
|
alias Reactor.{Executor.ConcurrencyTracker, Step}
|
||||||
import Reactor.Utils
|
import Reactor.Utils
|
||||||
import Reactor.Argument, only: :macros
|
import Reactor.Argument, only: :macros
|
||||||
require Logger
|
require Logger
|
||||||
|
@ -12,11 +12,12 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
@doc """
|
@doc """
|
||||||
Collect the arguments and and run a step, with compensation if required.
|
Collect the arguments and and run a step, with compensation if required.
|
||||||
"""
|
"""
|
||||||
@spec run(Reactor.t(), Step.t()) :: {:ok, any, [Step.t()]} | :retry | {:error | :halt, any}
|
@spec run(Reactor.t(), Step.t(), ConcurrencyTracker.pool_key()) ::
|
||||||
def run(reactor, step) do
|
{:ok, any, [Step.t()]} | :retry | {:error | :halt, any}
|
||||||
|
def run(reactor, step, concurrency_key) do
|
||||||
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
||||||
{module, options} <- module_and_opts(step),
|
{module, options} <- module_and_opts(step),
|
||||||
{:ok, context} <- build_context(reactor, step),
|
{:ok, context} <- build_context(reactor, step, concurrency_key),
|
||||||
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
||||||
do_run(module, options, arguments, context)
|
do_run(module, options, arguments, context)
|
||||||
end
|
end
|
||||||
|
@ -25,11 +26,11 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
@doc """
|
@doc """
|
||||||
Undo a step if possible.
|
Undo a step if possible.
|
||||||
"""
|
"""
|
||||||
@spec undo(Reactor.t(), Step.t(), any) :: :ok | {:error, any}
|
@spec undo(Reactor.t(), Step.t(), any, ConcurrencyTracker.pool_key()) :: :ok | {:error, any}
|
||||||
def undo(reactor, step, value) do
|
def undo(reactor, step, value, concurrency_key) do
|
||||||
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
with {:ok, arguments} <- get_step_arguments(reactor, step),
|
||||||
{module, options} <- module_and_opts(step),
|
{module, options} <- module_and_opts(step),
|
||||||
{:ok, context} <- build_context(reactor, step),
|
{:ok, context} <- build_context(reactor, step, concurrency_key),
|
||||||
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
|
||||||
do_undo(value, module, options, arguments, context)
|
do_undo(value, module, options, arguments, context)
|
||||||
end
|
end
|
||||||
|
@ -115,11 +116,12 @@ defmodule Reactor.Executor.StepRunner do
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp build_context(reactor, step) do
|
defp build_context(reactor, step, concurrency_key) do
|
||||||
context =
|
context =
|
||||||
step.context
|
step.context
|
||||||
|> deep_merge(reactor.context)
|
|> deep_merge(reactor.context)
|
||||||
|> Map.put(:current_step, step)
|
|> Map.put(:current_step, step)
|
||||||
|
|> Map.put(:concurrency_key, concurrency_key)
|
||||||
|
|
||||||
{:ok, context}
|
{:ok, context}
|
||||||
end
|
end
|
||||||
|
|
|
@ -14,7 +14,7 @@ defmodule Reactor.Executor.Sync do
|
||||||
def run(reactor, state, nil), do: {:continue, reactor, state}
|
def run(reactor, state, nil), do: {:continue, reactor, state}
|
||||||
|
|
||||||
def run(reactor, state, step) do
|
def run(reactor, state, step) do
|
||||||
case Executor.StepRunner.run(reactor, step) do
|
case Executor.StepRunner.run(reactor, step, state.concurrency_key) do
|
||||||
:retry ->
|
:retry ->
|
||||||
state = increment_retries(state, step)
|
state = increment_retries(state, step)
|
||||||
|
|
||||||
|
|
|
@ -121,7 +121,11 @@ defmodule Reactor.Step.Around do
|
||||||
with {:ok, reactor} <- build_inputs(Builder.new(), arguments),
|
with {:ok, reactor} <- build_inputs(Builder.new(), arguments),
|
||||||
{:ok, reactor} <- build_steps(reactor, steps),
|
{:ok, reactor} <- build_steps(reactor, steps),
|
||||||
{:ok, reactor} <- build_return_step(reactor, steps),
|
{:ok, reactor} <- build_return_step(reactor, steps),
|
||||||
{:ok, result} <- Reactor.run(reactor, context, async?: allow_async?) do
|
{:ok, result} <-
|
||||||
|
Reactor.run(reactor, context,
|
||||||
|
async?: allow_async?,
|
||||||
|
concurrency_key: context.concurrency_key
|
||||||
|
) do
|
||||||
{:ok, result}
|
{:ok, result}
|
||||||
else
|
else
|
||||||
{:error, reason} -> {:error, reason}
|
{:error, reason} -> {:error, reason}
|
||||||
|
|
|
@ -34,7 +34,7 @@ defmodule Reactor.Step.Compose do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp handle_recursive_reactor(reactor, arguments, context),
|
defp handle_recursive_reactor(reactor, arguments, context),
|
||||||
do: Reactor.run(reactor, arguments, context, [])
|
do: Reactor.run(reactor, arguments, context, concurrency_key: context.concurrency_key)
|
||||||
|
|
||||||
defp handle_non_recursive_reactor(reactor, arguments, context) when is_atom(reactor) do
|
defp handle_non_recursive_reactor(reactor, arguments, context) when is_atom(reactor) do
|
||||||
with {:ok, reactor} <- Info.to_struct(reactor) do
|
with {:ok, reactor} <- Info.to_struct(reactor) do
|
||||||
|
|
64
test/reactor/executor/concurrency_tracker_test.exs
Normal file
64
test/reactor/executor/concurrency_tracker_test.exs
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
defmodule Reactor.Executor.ConcurrencyTrackerTest do
|
||||||
|
@moduledoc false
|
||||||
|
use ExUnit.Case, async: true
|
||||||
|
import Reactor.Executor.ConcurrencyTracker
|
||||||
|
|
||||||
|
describe "allocate_pool/1" do
|
||||||
|
test "it returns a new pool key" do
|
||||||
|
assert pool = allocate_pool(16)
|
||||||
|
assert is_reference(pool)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it monitors the requesting process and destroys the pool when it exits" do
|
||||||
|
pool =
|
||||||
|
fn ->
|
||||||
|
pool = allocate_pool(16)
|
||||||
|
assert {:ok, _, _} = status(pool)
|
||||||
|
pool
|
||||||
|
end
|
||||||
|
|> Task.async()
|
||||||
|
|> Task.await()
|
||||||
|
|
||||||
|
# We have to wait for the concurrency tracker to process the request.
|
||||||
|
Process.sleep(10)
|
||||||
|
|
||||||
|
assert {:error, _} = status(pool)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "acquire/1" do
|
||||||
|
test "when there is available concurrency in the pool, it returns ok" do
|
||||||
|
pool = allocate_pool(16)
|
||||||
|
assert :ok = acquire(pool)
|
||||||
|
assert {:ok, 15, 16} = status(pool)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "when there is no available concurrency in the pool, it returns error" do
|
||||||
|
pool = allocate_pool(0)
|
||||||
|
assert :error = acquire(pool)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "when there is 1 slot left, it can be acquired" do
|
||||||
|
pool = allocate_pool(1)
|
||||||
|
assert :ok = acquire(pool)
|
||||||
|
assert {:ok, 0, 1} = status(pool)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "release/1" do
|
||||||
|
test "it increments the available concurrency in the pool when possible" do
|
||||||
|
pool = allocate_pool(16)
|
||||||
|
:ok = acquire(pool)
|
||||||
|
assert {:ok, 15, 16} = status(pool)
|
||||||
|
assert :ok = release(pool)
|
||||||
|
assert {:ok, 16, 16} = status(pool)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it doesn't allow the pool to grow" do
|
||||||
|
pool = allocate_pool(16)
|
||||||
|
assert {:ok, 16, 16} = status(pool)
|
||||||
|
assert :ok = release(pool)
|
||||||
|
assert {:ok, 16, 16} = status(pool)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -7,6 +7,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
|
|
||||||
setup do
|
setup do
|
||||||
reactor = Builder.new()
|
reactor = Builder.new()
|
||||||
|
|
||||||
{:ok, reactor: reactor}
|
{:ok, reactor: reactor}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -18,7 +19,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
{:ok, reactor} = Builder.add_step(reactor, :marty, Example.Step.Doable, [argument])
|
{:ok, reactor} = Builder.add_step(reactor, :marty, Example.Step.Doable, [argument])
|
||||||
step = reactor.steps |> hd()
|
step = reactor.steps |> hd()
|
||||||
|
|
||||||
assert {:error, reason} = run(reactor, step)
|
assert {:error, reason} = run(reactor, step, nil)
|
||||||
assert reason =~ "argument `:current_year` is missing"
|
assert reason =~ "argument `:current_year` is missing"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -37,7 +38,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
{:ok, :marty}
|
{:ok, :marty}
|
||||||
end)
|
end)
|
||||||
|
|
||||||
assert {:ok, :marty, []} = run(reactor, marty)
|
assert {:ok, :marty, []} = run(reactor, marty, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the step is successful, it returns an ok tuple", %{reactor: reactor} do
|
test "when the step is successful, it returns an ok tuple", %{reactor: reactor} do
|
||||||
|
@ -49,7 +50,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
{:ok, :marty, []}
|
{:ok, :marty, []}
|
||||||
end)
|
end)
|
||||||
|
|
||||||
assert {:ok, :marty, []} = run(reactor, step)
|
assert {:ok, :marty, []} = run(reactor, step, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the step asks to be retried, it returns a retry atom", %{reactor: reactor} do
|
test "when the step asks to be retried, it returns a retry atom", %{reactor: reactor} do
|
||||||
|
@ -61,7 +62,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
:retry
|
:retry
|
||||||
end)
|
end)
|
||||||
|
|
||||||
assert :retry = run(reactor, step)
|
assert :retry = run(reactor, step, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when a step returns an error and cannot be compensated, it returns an error tuple", %{
|
test "when a step returns an error and cannot be compensated, it returns an error tuple", %{
|
||||||
|
@ -75,7 +76,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
{:error, :doc}
|
{:error, :doc}
|
||||||
end)
|
end)
|
||||||
|
|
||||||
assert {:error, :doc} = run(reactor, step)
|
assert {:error, :doc} = run(reactor, step, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when a step raises an error it returns an error tuple", %{reactor: reactor} do
|
test "when a step raises an error it returns an error tuple", %{reactor: reactor} do
|
||||||
|
@ -87,7 +88,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
raise RuntimeError, "Not enough plutonium!"
|
raise RuntimeError, "Not enough plutonium!"
|
||||||
end)
|
end)
|
||||||
|
|
||||||
assert {:error, error} = run(reactor, step)
|
assert {:error, error} = run(reactor, step, nil)
|
||||||
assert is_struct(error, RuntimeError)
|
assert is_struct(error, RuntimeError)
|
||||||
assert Exception.message(error) == "Not enough plutonium!"
|
assert Exception.message(error) == "Not enough plutonium!"
|
||||||
end
|
end
|
||||||
|
@ -101,7 +102,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
|> stub(:run, fn _, _, _ -> {:error, :doc} end)
|
|> stub(:run, fn _, _, _ -> {:error, :doc} end)
|
||||||
|> stub(:compensate, fn :doc, _, _, _ -> {:continue, :marty} end)
|
|> stub(:compensate, fn :doc, _, _, _ -> {:continue, :marty} end)
|
||||||
|
|
||||||
assert {:ok, :marty} = run(reactor, step)
|
assert {:ok, :marty} = run(reactor, step, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when a step returns an error and can be compensated and the compensation succeed it returns an error tuple",
|
test "when a step returns an error and can be compensated and the compensation succeed it returns an error tuple",
|
||||||
|
@ -113,7 +114,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
|> stub(:run, fn _, _, _ -> {:error, :doc} end)
|
|> stub(:run, fn _, _, _ -> {:error, :doc} end)
|
||||||
|> stub(:compensate, fn :doc, _, _, _ -> :ok end)
|
|> stub(:compensate, fn :doc, _, _, _ -> :ok end)
|
||||||
|
|
||||||
assert {:error, :doc} = run(reactor, step)
|
assert {:error, :doc} = run(reactor, step, nil)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -129,7 +130,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
:ok
|
:ok
|
||||||
end)
|
end)
|
||||||
|
|
||||||
undo(reactor, step, :marty)
|
undo(reactor, step, :marty, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the step can be undone it returns ok", %{reactor: reactor} do
|
test "when the step can be undone it returns ok", %{reactor: reactor} do
|
||||||
|
@ -139,7 +140,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
Example.Step.Undoable
|
Example.Step.Undoable
|
||||||
|> stub(:undo, fn _, _, _, _ -> :ok end)
|
|> stub(:undo, fn _, _, _, _ -> :ok end)
|
||||||
|
|
||||||
assert :ok = undo(reactor, step, :marty)
|
assert :ok = undo(reactor, step, :marty, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the step undo needs to be retried it eventually returns ok", %{reactor: reactor} do
|
test "when the step undo needs to be retried it eventually returns ok", %{reactor: reactor} do
|
||||||
|
@ -152,7 +153,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
|> expect(:undo, fn _, _, _, _ -> :retry end)
|
|> expect(:undo, fn _, _, _, _ -> :retry end)
|
||||||
|> expect(:undo, fn _, _, _, _ -> :ok end)
|
|> expect(:undo, fn _, _, _, _ -> :ok end)
|
||||||
|
|
||||||
assert :ok = undo(reactor, step, :marty)
|
assert :ok = undo(reactor, step, :marty, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "when the step undo is stuck in a retry loop, it eventually returns an error", %{
|
test "when the step undo is stuck in a retry loop, it eventually returns an error", %{
|
||||||
|
@ -164,7 +165,7 @@ defmodule Reactor.Executor.StepRunnerTest do
|
||||||
Example.Step.Undoable
|
Example.Step.Undoable
|
||||||
|> stub(:undo, fn _, _, _, _ -> :retry end)
|
|> stub(:undo, fn _, _, _, _ -> :retry end)
|
||||||
|
|
||||||
assert {:error, message} = undo(reactor, step, :marty)
|
assert {:error, message} = undo(reactor, step, :marty, nil)
|
||||||
assert message =~ "retried 5 times"
|
assert message =~ "retried 5 times"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
defmodule Reactor.ExecutorTest do
|
defmodule Reactor.ExecutorTest do
|
||||||
@moduledoc false
|
@moduledoc false
|
||||||
|
alias Reactor.Executor.ConcurrencyTracker
|
||||||
use ExUnit.Case, async: true
|
use ExUnit.Case, async: true
|
||||||
|
|
||||||
describe "synchronous execution" do
|
describe "synchronous execution" do
|
||||||
|
@ -379,7 +380,49 @@ defmodule Reactor.ExecutorTest do
|
||||||
assert Graph.num_vertices(reactor.plan) == 3
|
assert Graph.num_vertices(reactor.plan) == 3
|
||||||
end)
|
end)
|
||||||
|
|
||||||
assert elapsed <= 300
|
assert elapsed <= 500
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "shared concurrency pools" do
|
||||||
|
test "when multiple reactors share a concurrency pool, it limits the simultaneous number of processes" do
|
||||||
|
defmodule ShortSleepReactor do
|
||||||
|
@moduledoc false
|
||||||
|
use Reactor
|
||||||
|
|
||||||
|
step :a do
|
||||||
|
run fn _ ->
|
||||||
|
{:ok, Process.sleep(100)}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
concurrency_key = ConcurrencyTracker.allocate_pool(1)
|
||||||
|
|
||||||
|
assert {:ok, _} = Reactor.run(ShortSleepReactor, %{}, %{}, concurrency_key: concurrency_key)
|
||||||
|
|
||||||
|
elapsed =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
t0 =
|
||||||
|
Task.async(fn ->
|
||||||
|
{:ok, _} =
|
||||||
|
Reactor.run(ShortSleepReactor, %{}, %{}, concurrency_key: concurrency_key)
|
||||||
|
end)
|
||||||
|
|
||||||
|
t1 =
|
||||||
|
Task.async(fn ->
|
||||||
|
{:ok, _} =
|
||||||
|
Reactor.run(ShortSleepReactor, %{}, %{}, concurrency_key: concurrency_key)
|
||||||
|
end)
|
||||||
|
|
||||||
|
Task.await(t0)
|
||||||
|
Task.await(t1)
|
||||||
|
end)
|
||||||
|
|> elem(0)
|
||||||
|
|> System.convert_time_unit(:microsecond, :millisecond)
|
||||||
|
|
||||||
|
assert elapsed >= 200
|
||||||
|
assert elapsed < 300
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
defmodule Reactor.Step.ComposeTest do
|
defmodule Reactor.Step.ComposeTest do
|
||||||
@moduledoc false
|
@moduledoc false
|
||||||
use ExUnit.Case, async: true
|
use ExUnit.Case, async: true
|
||||||
alias Reactor.{Argument, Step}
|
alias Reactor.{Argument, Executor.ConcurrencyTracker, Step}
|
||||||
import Reactor.Builder
|
import Reactor.Builder
|
||||||
require Reactor.Argument
|
require Reactor.Argument
|
||||||
|
|
||||||
|
@ -31,6 +31,7 @@ defmodule Reactor.Step.ComposeTest do
|
||||||
%{whom: "Marty McFly"},
|
%{whom: "Marty McFly"},
|
||||||
%{
|
%{
|
||||||
current_step: %{name: :greet_marty},
|
current_step: %{name: :greet_marty},
|
||||||
|
concurrency_key: ConcurrencyTracker.allocate_pool(16),
|
||||||
private: %{composed_reactors: MapSet.new([inner_reactor.id])}
|
private: %{composed_reactors: MapSet.new([inner_reactor.id])}
|
||||||
},
|
},
|
||||||
reactor: inner_reactor
|
reactor: inner_reactor
|
||||||
|
|
Loading…
Reference in a new issue