fix: honor soft destroys for atomic bulk destroys

This commit is contained in:
Zach Daniel 2024-02-22 08:01:47 -05:00
parent 7b5a3267be
commit 701f9b4cc6
2 changed files with 142 additions and 69 deletions

View file

@ -7,6 +7,61 @@ defmodule Ash.Actions.Destroy.Bulk do
def run(api, resource, action, input, opts, not_atomic_reason \\ nil)
def run(api, stream, action, input, opts, not_atomic_reason) when is_atom(action) do
resource =
opts[:resource] ||
case stream do
[%resource{} | _] ->
resource
%Ash.Query{resource: resource} ->
resource
_ ->
nil
end
if !resource do
raise ArgumentError,
"Could not determine resource for bulk destroy. Please provide the `resource` option if providing a stream of inputs."
end
run(api, stream, Ash.Resource.Info.action(resource, action), input, opts, not_atomic_reason)
end
def run(api, stream, nil, input, opts, not_atomic_reason) do
resource =
opts[:resource] ||
case stream do
[%resource{} | _] ->
resource
%Ash.Query{resource: resource} ->
resource
_ ->
nil
end
if !resource do
raise ArgumentError,
"Could not determine resource for bulk destroy. Please provide the `resource` option if providing a stream of inputs."
end
run(
api,
stream,
Ash.Resource.Info.primary_action!(resource, :destroy),
input,
opts,
not_atomic_reason
)
end
def run(api, stream, %{soft?: true} = action, input, opts, not_atomic_reason) do
Ash.Actions.Update.Bulk.run(api, stream, action, input, opts, not_atomic_reason)
end
def run(api, resource, action, input, opts, not_atomic_reason) when is_atom(resource) do
run(api, Ash.Query.new(resource), action, input, opts, not_atomic_reason)
end
@ -253,72 +308,68 @@ defmodule Ash.Actions.Destroy.Bulk do
action
end
if action.soft? do
Ash.Actions.Update.Bulk.run(api, stream, action.name, input, opts)
else
if opts[:transaction] == :all && opts[:return_stream?] do
raise ArgumentError,
"Cannot specify `transaction: :all` and `return_stream?: true` together"
end
if opts[:transaction] == :all && opts[:return_stream?] do
raise ArgumentError,
"Cannot specify `transaction: :all` and `return_stream?: true` together"
end
if opts[:return_stream?] && opts[:sorted?] do
raise ArgumentError, "Cannot specify `sorted?: true` and `return_stream?: true` together"
end
if opts[:return_stream?] && opts[:sorted?] do
raise ArgumentError, "Cannot specify `sorted?: true` and `return_stream?: true` together"
end
if opts[:transaction] == :all &&
Ash.DataLayer.data_layer_can?(resource, :transact) do
notify? =
if opts[:notify?] do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
else
if opts[:transaction] == :all &&
Ash.DataLayer.data_layer_can?(resource, :transact) do
notify? =
if opts[:notify?] do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
Ash.DataLayer.transaction(
List.wrap(resource) ++ action.touches_resources,
fn ->
do_run(api, stream, action, input, opts, not_atomic_reason)
end,
opts[:timeout],
%{
type: :bulk_destroy,
metadata: %{
resource: resource,
action: action.name,
actor: opts[:actor]
},
data_layer_context: opts[:data_layer_context] || %{}
}
)
|> case do
{:ok, bulk_result} ->
bulk_result =
if notify? do
%{
bulk_result
| notifications:
(bulk_result.notifications || []) ++ Process.delete(:ash_notifications) ||
[]
}
else
bulk_result
end
handle_bulk_result(bulk_result, resource, action, opts)
{:error, error} ->
{:error, error}
else
false
end
else
api
|> do_run(stream, action, input, opts, not_atomic_reason)
|> handle_bulk_result(resource, action, opts)
Ash.DataLayer.transaction(
List.wrap(resource) ++ action.touches_resources,
fn ->
do_run(api, stream, action, input, opts, not_atomic_reason)
end,
opts[:timeout],
%{
type: :bulk_destroy,
metadata: %{
resource: resource,
action: action.name,
actor: opts[:actor]
},
data_layer_context: opts[:data_layer_context] || %{}
}
)
|> case do
{:ok, bulk_result} ->
bulk_result =
if notify? do
%{
bulk_result
| notifications:
(bulk_result.notifications || []) ++ Process.delete(:ash_notifications) ||
[]
}
else
bulk_result
end
handle_bulk_result(bulk_result, resource, action, opts)
{:error, error} ->
{:error, error}
end
else
api
|> do_run(stream, action, input, opts, not_atomic_reason)
|> handle_bulk_result(resource, action, opts)
end
end

View file

@ -3,6 +3,7 @@ defmodule Ash.Test.Actions.BulkDestroyTest do
use ExUnit.Case, async: true
require Ash.Query
alias Ash.Test.AnyApi, as: Api
defmodule AddAfterToTitle do
use Ash.Resource.Change
@ -37,6 +38,7 @@ defmodule Ash.Test.Actions.BulkDestroyTest do
defmodule Post do
@moduledoc false
use Ash.Resource,
api: Api,
data_layer: Ash.DataLayer.Ets,
authorizers: [Ash.Policy.Authorizer]
@ -89,6 +91,11 @@ defmodule Ash.Test.Actions.BulkDestroyTest do
change set_context(%{authorize?: arg(:authorize?)})
end
destroy :soft do
soft? true
change set_attribute(:title2, "archived")
end
end
identities do
@ -117,15 +124,6 @@ defmodule Ash.Test.Actions.BulkDestroyTest do
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Post
end
end
test "returns destroyed records" do
assert %Ash.BulkResult{records: [%{}, %{}]} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
@ -313,6 +311,30 @@ defmodule Ash.Test.Actions.BulkDestroyTest do
assert [] = Api.read!(Post)
end
test "soft destroys" do
assert %Ash.BulkResult{
records: [
%{title2: "archived"},
%{title2: "archived"}
]
} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_destroy!(:soft, %{},
resource: Post,
return_records?: true,
return_errors?: true
)
|> Map.update!(:records, fn records ->
Enum.sort_by(records, & &1.title)
end)
end
describe "authorization" do
test "policy success results in successes" do
assert %Ash.BulkResult{records: [_, _], errors: []} =