finalize engine refactor

This commit is contained in:
Zach Daniel 2020-05-27 00:29:25 -04:00
parent 9d76bb1d9b
commit 65aa5215fe
No known key found for this signature in database
GPG key ID: C377365383138D4B
4 changed files with 62 additions and 64 deletions

View file

@ -53,7 +53,7 @@ defmodule Ash.Actions.Update do
%Ecto.Changeset{} = changeset ->
{:error, Ash.Error.Changeset.changeset_to_errors(resource, changeset)}
%Ash.Engine{errors: errors} ->
%{errors: errors} ->
{:error, Ash.to_ash_error(errors)}
{:error, error} ->

View file

@ -114,23 +114,19 @@ defmodule Ash.Engine.Request do
add_initial_authorizer_state(request)
end
@spec next(%{state: :check | :complete | :fetch_data | :strict_check}) ::
{:already_complete, any, any}
| {:complete, any, any}
| {:continue, any, any}
| {:error, any, any}
| {:wait, any, any, any}
def next(request) do
case do_next(request) do
{:complete, new_request, notifications} ->
{:complete, new_request, notifications, dependencies} ->
dependencies = new_dependencies(dependencies)
if request.state != :complete do
{:complete, new_request, notifications}
{:complete, new_request, notifications, dependencies}
else
{:already_complete, new_request, notifications}
{:already_complete, new_request, notifications, dependencies}
end
{:waiting, new_request, notifications, dependencies} ->
{new_request, dependencies} = new_dependencies(new_request, dependencies)
dependencies = new_dependencies(dependencies)
{:wait, new_request, notifications, dependencies}
@ -186,21 +182,21 @@ defmodule Ash.Engine.Request do
def do_next(%{state: :check, authorize?: false} = request) do
log(request, "Skipping check due to `authorize?: false`")
{:complete, %{request | state: :complete}, []}
{:complete, %{request | state: :complete}, [], []}
end
def do_next(%{state: :check} = request) do
case Ash.authorizers(request.resource) do
[] ->
log(request, "No authorizers found, skipping check")
{:complete, %{request | state: :complete}, []}
{:complete, %{request | state: :complete}, [], []}
authorizers ->
case check(authorizers, request) do
{:ok, new_request, notifications, []} ->
log(new_request, "Check complete")
{:complete, %{request | state: :complete}, notifications}
{:complete, %{request | state: :complete}, notifications, []}
{:ok, new_request, notifications, waiting} ->
log(request, "Check incomplete, waiting on dependencies")
@ -215,7 +211,7 @@ defmodule Ash.Engine.Request do
def do_next(%{state: :complete} = request) do
if request.dependencies_to_send == %{} do
{:complete, request, []}
{:complete, request, [], []}
else
Enum.reduce_while(request.dependencies_to_send, {:complete, request, [], []}, fn
{field, _paths}, {:complete, request, notifications, deps} ->
@ -255,7 +251,7 @@ defmodule Ash.Engine.Request do
{:ok, new_request, notifications}
{:waiting, new_request, notifications, waiting_for} ->
{new_request, dependency_requests} = new_dependencies(new_request, waiting_for)
dependency_requests = new_dependencies(waiting_for)
{:waiting, new_request, notifications, dependency_requests}
@ -273,52 +269,20 @@ defmodule Ash.Engine.Request do
{:continue, new_request}
end
defp new_dependencies(request, waiting_for) do
{non_optional, optional} = Enum.split_with(waiting_for, &is_list/1)
defp new_dependencies(waiting_for) do
Enum.map(
waiting_for,
fn
{:optional, dep} ->
{dep, false}
{new_request, deps} =
Enum.reduce(non_optional, {request, []}, fn dep, {request, deps} ->
{new_request, new_dep} = new_dependency(request, dep, false)
{:required, dep} ->
{dep, true}
if new_dep do
{new_request, [new_dep | deps]}
else
{new_request, deps}
end
end)
Enum.reduce(optional, {new_request, deps}, fn dep, {request, deps} ->
{new_request, new_dep} = new_dependency(request, dep, true)
if new_dep do
{new_request, [new_dep | deps]}
else
{new_request, deps}
end
end)
end
defp new_dependency(request, dep, optional?) do
if Enum.any?(request.dependencies_requested, fn {dep_requested, _} -> dep_requested == dep end) do
new_request = %{
request
| dependencies_requested:
Enum.map(request.dependencies_requested, fn {dep_requested, requested_optional?} ->
if dep_requested == dep do
{dep_requested, optional? || requested_optional?}
else
{dep_requested, requested_optional?}
end
end)
}
{new_request, nil}
else
dependency_request = {dep, optional?}
{%{request | dependencies_requested: [dependency_request | request.dependencies_requested]},
dependency_request}
other ->
{other, true}
end
)
end
def put_dependency_data(request, dep, value) do
@ -630,6 +594,7 @@ defmodule Ash.Engine.Request do
defp notifications(request, field, value) do
case Map.fetch(request.dependencies_to_send, field) do
# TODO: This logic could technically cause double sends?
{:ok, paths} ->
new_request = %{
request

View file

@ -44,16 +44,19 @@ defmodule Ash.Engine.RequestHandler do
{:error, error, new_request} ->
{:stop, {:error, error, %{new_request | state: :error}}, state}
{:already_complete, new_request, notifications} ->
{:already_complete, new_request, notifications, dependencies} ->
new_state = %{state | request: new_request}
Enum.each(dependencies, &register_dependency(new_state, &1))
notify(new_state, notifications)
{:noreply, new_state}
{:complete, new_request, notifications} ->
{:complete, new_request, notifications, dependencies} ->
new_state = %{state | request: new_request}
notify(new_state, notifications)
Enum.each(dependencies, &register_dependency(new_state, &1))
complete(new_state)
{:noreply, new_state}

View file

@ -55,7 +55,7 @@ defmodule Ash.Engine.Runner do
if new_state.engine_pid do
wait_for_engine(new_state, false)
else
log(state, "Synchronous engine stuck.")
log(state, "Synchronous engine stuck:\n\n#{stuck_report(state)}")
add_error(state, :__engine__, "Synchronous engine stuck")
end
@ -65,6 +65,36 @@ defmodule Ash.Engine.Runner do
end
end
defp stuck_report(state) do
Enum.map_join(state.requests, "\n", fn request ->
if request.state in [:complete, :error] do
request.name <> ": " <> "#{request.state}"
else
case Request.next(request) do
{:wait, _, _, []} ->
request.name <> ": Waiting on nothing in state #{inspect(request.state)}"
{:wait, _, _, dependencies} ->
request.name <> ": Waiting on #{dependency_names(dependencies, state)}"
_other ->
request.name <> ": Not waiting, not complete"
end
end
end)
end
defp dependency_names(dependencies, state) do
Enum.map_join(dependencies, ", ", fn dependency ->
path = :lists.droplast(dependency)
field = List.last(dependency)
request = Enum.find(state.requests, &(&1.path == path))
request.name <> ": " <> to_string(field)
end)
end
defp wait_for_engine(state, complete?) do
engine_pid = state.engine_pid
log(state, "waiting for engine")
@ -310,9 +340,9 @@ defmodule Ash.Engine.Runner do
defp advance_request(request, notifications \\ [], dependencies \\ []) do
case Request.next(request) do
{complete, new_request, new_notifications}
{complete, new_request, new_notifications, new_dependencies}
when complete in [:complete, :already_complete] ->
{:ok, new_request, new_notifications ++ notifications, dependencies}
{:ok, new_request, new_notifications ++ notifications, new_dependencies ++ dependencies}
{:continue, new_request, new_notifications} ->
{:ok, new_request, new_notifications ++ notifications, []}