fix(Executor): don't double-iterate the graph each time through the loop.

This commit is contained in:
James Harton 2023-06-20 13:34:49 +12:00
parent 611bf314f0
commit 85bd7b77a2
Signed by: james
GPG key ID: 90E82DAA13F624F4

View file

@ -34,7 +34,7 @@ defmodule Reactor.Executor do
3. When a step or compensation asks for a retry then the step is placed back
in the graph to be run again next iteration.
"""
alias Reactor.{Executor, Planner}
alias Reactor.{Executor, Planner, Step}
@doc """
Run a reactor.
@ -70,8 +70,9 @@ defmodule Reactor.Executor do
with {:continue, reactor, state} <- maybe_timeout(reactor, state),
{:continue, reactor, state} <- handle_unplanned_steps(reactor, state),
{:continue, reactor, state} <- handle_completed_async_steps(reactor, state),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state),
{:continue, ready_steps} <- find_ready_steps(reactor),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state, ready_steps),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state, ready_steps),
{:continue, reactor} <- all_done(reactor) do
execute(reactor, subtract_iteration(state))
else
@ -120,30 +121,29 @@ defmodule Reactor.Executor do
defp handle_completed_async_steps(reactor, state),
do: Executor.Async.handle_completed_steps(reactor, state)
defp start_ready_async_steps(reactor, state) when state.async? == false,
defp start_ready_async_steps(reactor, state, _) when state.async? == false,
do: {:continue, reactor, state}
defp start_ready_async_steps(reactor, state)
defp start_ready_async_steps(reactor, state, []), do: {:continue, reactor, state}
defp start_ready_async_steps(reactor, state, _steps)
when map_size(state.current_tasks) == state.max_concurrency,
do: {:continue, reactor, state}
defp start_ready_async_steps(reactor, state) do
steps = find_ready_async_steps(reactor)
defp start_ready_async_steps(reactor, state, steps) do
steps = Enum.filter(steps, &(&1.async? == true))
Executor.Async.start_steps(reactor, state, steps)
end
defp run_ready_sync_step(reactor, state) when state.async? == false do
step =
reactor
|> find_ready_steps()
|> Enum.at(0)
defp run_ready_sync_step(reactor, state, []), do: {:continue, reactor, state}
defp run_ready_sync_step(reactor, state, [step | _]) when state.async? == false do
Executor.Sync.run(reactor, state, step)
end
defp run_ready_sync_step(reactor, state) do
step = find_ready_sync_step(reactor)
defp run_ready_sync_step(reactor, state, steps) do
step = Enum.find(steps, &(&1.async? == false))
Executor.Sync.run(reactor, state, step)
end
@ -176,28 +176,15 @@ defmodule Reactor.Executor do
end
end
defp find_ready_async_steps(reactor) do
reactor
|> find_ready_steps(& &1.async?)
|> Enum.to_list()
end
defp find_ready_sync_step(reactor) do
reactor
|> find_ready_steps(&(&1.async? == false))
|> Enum.at(0)
end
defp find_ready_steps(reactor) do
reactor.plan
|> Graph.vertices()
|> Stream.filter(&(Graph.in_degree(reactor.plan, &1) == 0))
|> Stream.reject(&is_struct(&1, Task))
end
steps =
reactor.plan
|> Graph.vertices()
|> Enum.filter(fn
step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0
_ -> false
end)
defp find_ready_steps(reactor, predicate) when is_function(predicate, 1) do
reactor
|> find_ready_steps()
|> Stream.filter(predicate)
{:continue, steps}
end
end