improvement: Support timeout and async? Reactor options. (#20)

This commit is contained in:
James Harton 2023-06-16 16:16:40 +12:00 committed by GitHub
parent 614e6725e2
commit 611bf314f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 179 additions and 14 deletions

View file

@ -62,13 +62,55 @@ defmodule Reactor do
use Spark.Dsl, default_extensions: [extensions: Dsl]
@type context :: Enumerable.t({any, any})
@type context :: %{optional(atom) => any}
@type context_arg :: Enumerable.t({atom, any})
@typedoc """
Specify the maximum number of asynchronous steps which can be run in parallel.
Defaults to the result of `System.schedulers_online/0`.
"""
@type max_concurrency_option :: {:max_concurrency, pos_integer()}
@typedoc """
Specify the amount of execution time after which to halt processing.
Note that this is not a hard limit. The Reactor will stop when the first step
completes _after_ the timeout has expired.
Defaults to `:infinity`.
"""
@type timeout_option :: {:timeout, pos_integer() | :infinity}
@typedoc """
The maximum number of iterations which after which the Reactor will halt.
Defaults to `:infinity`.
"""
@type max_iterations_option :: {:max_iterations, pos_integer() | :infinity}
@typedoc """
How long to wait for asynchronous steps to complete when halting.
Defaults to 5000ms.
"""
@type halt_timeout_option :: {:halt_timeout, pos_integer() | :infinity}
@typedoc """
When set to `false` forces the Reactor to run every step synchronously,
regardless of the step configuration.
Defaults to `true`.
"""
@type async_option :: {:async?, boolean}
@type options ::
Enumerable.t(
{:max_concurrency, pos_integer()}
| {:timeout, pos_integer() | :infinity}
| {:max_iterations, pos_integer() | :infinity}
| {:halt_timeout, pos_integer() | :infinity}
max_concurrency_option
| timeout_option
| max_iterations_option
| halt_timeout_option
| async_option
)
@type state :: :pending | :executing | :halted | :failed | :successful
@ -93,7 +135,7 @@ defmodule Reactor do
@doc """
Run a reactor.
"""
@spec run(t | module, inputs, context, options) :: {:ok, any} | {:error, any} | {:halted, t}
@spec run(t | module, inputs, context_arg, options) :: {:ok, any} | {:error, any} | {:halted, t}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
def run(reactor, inputs, context, options) when is_atom(reactor) do

View file

@ -67,7 +67,8 @@ defmodule Reactor.Executor do
end
defp execute(reactor, state) do
with {:continue, reactor, state} <- handle_unplanned_steps(reactor, state),
with {:continue, reactor, state} <- maybe_timeout(reactor, state),
{:continue, reactor, state} <- handle_unplanned_steps(reactor, state),
{:continue, reactor, state} <- handle_completed_async_steps(reactor, state),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state),
@ -91,6 +92,18 @@ defmodule Reactor.Executor do
end
end
defp maybe_timeout(reactor, state) when state.timeout == :infinity,
do: {:continue, reactor, state}
defp maybe_timeout(reactor, state) do
if DateTime.diff(DateTime.utc_now(), state.started_at, :millisecond) >= state.timeout do
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
{:halt, reactor, state}
else
{:continue, reactor, state}
end
end
defp handle_unplanned_steps(reactor, state) when reactor.steps == [],
do: {:continue, reactor, state}
@ -101,9 +114,14 @@ defmodule Reactor.Executor do
end
end
defp handle_completed_async_steps(reactor, state) do
Executor.Async.handle_completed_steps(reactor, state)
end
defp handle_completed_async_steps(reactor, state) when state.async? == false,
do: {:continue, reactor, state}
defp handle_completed_async_steps(reactor, state),
do: Executor.Async.handle_completed_steps(reactor, state)
defp start_ready_async_steps(reactor, state) when state.async? == false,
do: {:continue, reactor, state}
defp start_ready_async_steps(reactor, state)
when map_size(state.current_tasks) == state.max_concurrency,
@ -115,6 +133,15 @@ defmodule Reactor.Executor do
Executor.Async.start_steps(reactor, state, steps)
end
defp run_ready_sync_step(reactor, state) when state.async? == false do
step =
reactor
|> find_ready_steps()
|> Enum.at(0)
Executor.Sync.run(reactor, state, step)
end
defp run_ready_sync_step(reactor, state) do
step = find_ready_sync_step(reactor)
@ -161,11 +188,16 @@ defmodule Reactor.Executor do
|> Enum.at(0)
end
defp find_ready_steps(reactor, predicate) when is_function(predicate, 1) do
defp find_ready_steps(reactor) do
reactor.plan
|> Graph.vertices()
|> Stream.filter(&(Graph.in_degree(reactor.plan, &1) == 0))
|> Stream.reject(&is_struct(&1, Task))
end
defp find_ready_steps(reactor, predicate) when is_function(predicate, 1) do
reactor
|> find_ready_steps()
|> Stream.filter(predicate)
end
end

View file

@ -5,24 +5,28 @@ defmodule Reactor.Executor.State do
This is run-time only information.
"""
defstruct current_tasks: %{},
defstruct async?: true,
current_tasks: %{},
errors: [],
halt_timeout: 5000,
max_concurrency: nil,
max_iterations: :infinity,
retries: %{},
timeout: 5000
timeout: :infinity,
started_at: nil
alias Reactor.Step
@type t :: %__MODULE__{
async?: boolean,
current_tasks: %{Task.t() => Step.t()},
errors: [any],
halt_timeout: pos_integer() | :infinity,
max_concurrency: pos_integer(),
max_iterations: pos_integer() | :infinity,
retries: %{reference() => pos_integer()},
timeout: pos_integer() | :infinity
timeout: pos_integer() | :infinity,
started_at: DateTime.t()
}
@doc false
@ -30,6 +34,7 @@ defmodule Reactor.Executor.State do
def init(attrs \\ %{}) do
attrs
|> Map.put_new_lazy(:max_concurrency, &System.schedulers_online/0)
|> Map.put(:started_at, DateTime.utc_now())
|> then(&struct(__MODULE__, &1))
end
end

View file

@ -1,5 +1,6 @@
defmodule Reactor.ExecutorTest do
@moduledoc false
alias Reactor.ExecutorTest.SleepyReactor
use ExUnit.Case, async: true
describe "synchronous execution" do
@ -297,4 +298,89 @@ defmodule Reactor.ExecutorTest do
})
end
end
describe "async? option" do
defmodule SleepyReactor do
@moduledoc false
use Reactor
step :a do
run fn _, _ ->
Process.sleep(100)
{:ok, 1}
end
end
step :b do
run fn _, _ ->
Process.sleep(100)
{:ok, 2}
end
end
step :c do
run fn _, _ ->
Process.sleep(100)
{:ok, 3}
end
end
step :d do
run fn _, _ ->
Process.sleep(100)
{:ok, 4}
end
end
step :e do
run fn _, _ ->
Process.sleep(100)
{:ok, 5}
end
end
end
# Yes I know this is a dumb methodology, but what my theory presupposes is -
# maybe it isn't?
test "it can be run synchronously" do
elapsed =
measure_elapsed(fn ->
assert {:ok, _} = Reactor.run(SleepyReactor, %{}, %{}, async?: false)
end)
assert elapsed >= 500 and elapsed <= 600
end
test "it can be run asynchronously" do
elapsed =
measure_elapsed(fn ->
assert {:ok, _} = Reactor.run(SleepyReactor, %{}, %{}, async?: true)
end)
assert elapsed >= 100 and elapsed <= 500
end
defp measure_elapsed(fun) do
started_at = DateTime.utc_now()
fun.()
DateTime.diff(DateTime.utc_now(), started_at, :millisecond)
end
end
describe "reactor timeout" do
test "when the timeout is elapsed, it halts the reactor" do
elapsed =
measure_elapsed(fn ->
assert {:halted, reactor} =
Reactor.run(SleepyReactor, %{}, %{}, async?: false, timeout: 200)
assert Graph.num_vertices(reactor.plan) == 3
end)
assert elapsed <= 300
end
end
end