fix: don't revisit nodes in the ash engine

fix: properly map to `:destroy` step names
This commit is contained in:
Zach Daniel 2022-11-25 14:09:30 -05:00
parent 49693669dd
commit 52e7c133a6
6 changed files with 96 additions and 37 deletions

View file

@ -106,11 +106,11 @@ defmodule Ash.Actions.Destroy do
{:ok, %{data: data} = engine_result} -> {:ok, %{data: data} = engine_result} ->
resource resource
|> add_notifications(action, engine_result, return_notifications?) |> add_notifications(action, engine_result, return_notifications?)
|> add_destroyed(return_destroyed?, Map.get(data, :destroy)) |> add_destroyed(return_destroyed?, data)
{:error, %Ash.Engine{errors: errors, requests: requests}} -> {:error, %Ash.Engine{errors: errors, requests: requests}} ->
case Enum.find_value(requests, fn request -> case Enum.find_value(requests, fn request ->
if request.path == [:destroy] && match?(%Ash.Changeset{}, request.changeset) do if request.path == [:commit] && match?(%Ash.Changeset{}, request.changeset) do
request.changeset request.changeset
end end
end) do end) do
@ -251,7 +251,7 @@ defmodule Ash.Actions.Destroy do
Request.new( Request.new(
resource: resource, resource: resource,
api: api, api: api,
path: path ++ [:destroy], path: path ++ [:commit],
action: action, action: action,
authorize?: false, authorize?: false,
error_path: error_path, error_path: error_path,
@ -264,9 +264,9 @@ defmodule Ash.Actions.Destroy do
name: "commit #{inspect(resource)}.#{action.name}", name: "commit #{inspect(resource)}.#{action.name}",
data: data:
Request.resolve( Request.resolve(
[path ++ [:data, :data], path ++ [:destroy, :changeset]], [path ++ [:data, :data], path ++ [:commit, :changeset]],
fn %{actor: actor, authorize?: authorize?} = context -> fn %{actor: actor, authorize?: authorize?} = context ->
changeset = get_in(context, path ++ [:destroy, :changeset]) changeset = get_in(context, path ++ [:commit, :changeset])
record = changeset.data record = changeset.data
changeset changeset
@ -323,11 +323,11 @@ defmodule Ash.Actions.Destroy do
end end
end end
defp add_destroyed(:ok, true, destroyed) do defp add_destroyed(:ok, true, %{commit: destroyed}) do
{:ok, destroyed} {:ok, destroyed}
end end
defp add_destroyed({:ok, notifications}, true, destroyed) do defp add_destroyed({:ok, notifications}, true, %{commit: destroyed}) do
{:ok, destroyed, notifications} {:ok, destroyed, notifications}
end end

View file

@ -225,12 +225,16 @@ defmodule Ash.Engine do
|> case do |> case do
^state -> ^state ->
if state.tasks == [] && state.pending_tasks == [] do if state.tasks == [] && state.pending_tasks == [] do
if state.errors == [] do
detect_deadlocks(state) detect_deadlocks(state)
raise """ raise """
Engine Deadlock! No async tasks and state is the same after iteration. Engine Deadlock! No async tasks and state is the same after iteration.
#{long_breakdown(state)} #{long_breakdown(state)}
""" """
else
state
end
else else
state state
|> start_pending_tasks() |> start_pending_tasks()
@ -400,22 +404,12 @@ defmodule Ash.Engine do
end) end)
|> Enum.split_with(& &1.async?) |> Enum.split_with(& &1.async?)
{state, do_sync?} = new_state = Enum.reduce(async, state, &do_run_iteration(&2, &1))
Enum.reduce(async, {state, true}, fn
request, {state, false} ->
{do_run_iteration(state, request), false}
request, {state, true} -> if state == new_state do
new_state = do_run_iteration(state, request) Enum.reduce(sync, new_state, &do_run_iteration(&2, &1))
# We only want to process synchronous requests once all asynchronous requests
# have done all of their work.
{new_state, state == new_state}
end)
if do_sync? do
Enum.reduce(sync, state, &do_run_iteration(&2, &1))
else else
state new_state
end end
request -> request ->
@ -850,16 +844,16 @@ defmodule Ash.Engine do
|> Enum.split_with(& &1.async?) |> Enum.split_with(& &1.async?)
%{state | requests: async ++ state.requests ++ non_async} %{state | requests: async ++ state.requests ++ non_async}
|> add_dependencies_waiting_on_request() |> add_dependencies_waiting_on_request(requests)
end end
defp add_dependencies_waiting_on_request(state) do defp add_dependencies_waiting_on_request(state, new_requests) do
state.dependencies_waiting_on_request state.dependencies_waiting_on_request
|> Enum.reduce(state, fn |> Enum.reduce(state, fn
{request_path, dep}, state -> {request_path, dep}, state ->
dep_path = :lists.droplast(dep) dep_path = :lists.droplast(dep)
if Enum.any?(state.requests, &(&1.path == dep_path)) do if Enum.any?(new_requests, &(&1.path == dep_path)) do
%{ %{
state state
| unsent_dependencies: [{request_path, dep} | state.unsent_dependencies], | unsent_dependencies: [{request_path, dep} | state.unsent_dependencies],

View file

@ -462,18 +462,32 @@ defmodule Ash.Engine.Request do
{:waiting, new_request, notifications, waiting} {:waiting, new_request, notifications, waiting}
{:ok, new_request, _, _} -> {:ok, new_request, notifications, []} ->
case Map.get(new_request, field) do case Map.get(new_request, field) do
%UnresolvedField{} -> %UnresolvedField{} ->
log(request, fn -> "Field could not be resolved #{field}, registering dependency" end) log(request, fn -> "Field could not be resolved #{field}, registering dependency" end)
{:ok, new_request, []} {:ok, new_request, notifications}
value -> value ->
log(request, fn -> "Field #{field}, was resolved and provided" end) log(request, fn -> "Field #{field}, was resolved and provided" end)
{new_request, notifications} = notifications(request, field, value) {new_request, new_notifications} = notifications(new_request, field, value)
{:ok, new_request, notifications} {:ok, new_request, new_notifications ++ notifications}
end
{:ok, new_request, notifications, waiting} ->
case Map.get(new_request, field) do
%UnresolvedField{} ->
log(request, fn -> "Field could not be resolved #{field}, registering dependency" end)
{:waiting, new_request, notifications, waiting}
value ->
log(request, fn -> "Field #{field}, was resolved and provided" end)
{new_request, new_notifications} = notifications(new_request, field, value)
{:ok, new_request, new_notifications ++ notifications, waiting}
end end
{:error, error} -> {:error, error} ->

View file

@ -323,6 +323,7 @@ defmodule Ash.Flow.Executor.AshEngine do
get_dep_paths(all_steps, deps, transaction_name, wait_for_deps ++ halt_if_deps) get_dep_paths(all_steps, deps, transaction_name, wait_for_deps ++ halt_if_deps)
request_deps = dependable_request_paths(dep_paths) request_deps = dependable_request_paths(dep_paths)
id = System.unique_integer()
[ [
Ash.Engine.Request.new( Ash.Engine.Request.new(
@ -345,13 +346,15 @@ defmodule Ash.Flow.Executor.AshEngine do
}) })
|> Ash.Flow.handle_modifiers() |> Ash.Flow.handle_modifiers()
IO.puts(""" output = """
Debug Output for: #{inspect(name)} Debug Output for: #{inspect(name)} | #{id}
#{inspect(input)} #{inspect(input)}
""") """
{:ok, input} IO.puts(output)
{:ok, output}
end) end)
end) end)
) )

View file

@ -7,6 +7,7 @@ defmodule Ash.FlowTest.SimpleFlowTest do
alias Ash.Test.Flow.Flows.{ alias Ash.Test.Flow.Flows.{
GetOrgAndUsers, GetOrgAndUsers,
GetOrgAndUsersAndDestroyThem,
GetOrgByName, GetOrgByName,
SignUpAndDeleteUser, SignUpAndDeleteUser,
SignUpUser SignUpUser
@ -58,6 +59,25 @@ defmodule Ash.FlowTest.SimpleFlowTest do
assert users |> Enum.map(& &1.first_name) |> Enum.sort() == ["abc", "def"] assert users |> Enum.map(& &1.first_name) |> Enum.sort() == ["abc", "def"]
end end
test "a flow with a destroy inside of a map works" do
org =
Org
|> Ash.Changeset.for_create(:create, %{name: "Org 1"})
|> Api.create!()
User
|> Ash.Changeset.for_create(:create, %{first_name: "abc", org: org.id})
|> Api.create!()
User
|> Ash.Changeset.for_create(:create, %{first_name: "def", org: org.id})
|> Api.create!()
GetOrgAndUsersAndDestroyThem.run!("Org 1")
assert User |> Api.read!() |> Enum.empty?()
end
test "a flow with a create and an update step works" do test "a flow with a create and an update step works" do
org = org =
Org Org

View file

@ -0,0 +1,28 @@
defmodule Ash.Test.Flow.Flows.GetOrgAndUsersAndDestroyThem do
@moduledoc false
use Ash.Flow
flow do
api Ash.Test.Flow.Api
argument :org_name, :string do
allow_nil? false
end
returns :unapprove_users
end
steps do
run_flow :get_org_and_users, Ash.Test.Flow.Flows.GetOrgAndUsers do
input %{
org_name: arg(:org_name)
}
end
map :unapprove_users, path(result(:get_org_and_users), :users) do
destroy :destroy_user, Ash.Test.Flow.User, :destroy do
record element(:unapprove_users)
end
end
end
end