fix: roll back bulk update/destroy on before action error

This commit is contained in:
Zach Daniel 2024-08-01 12:09:01 -04:00
parent f8fc4cd521
commit 28f73546cc
4 changed files with 119 additions and 3 deletions

View file

@ -1041,9 +1041,17 @@ defmodule Ash.Actions.Destroy.Bulk do
else else
if opts[:notify?] do if opts[:notify?] do
Ash.Notifier.notify(Process.delete({:bulk_destroy_notifications, ref})) Ash.Notifier.notify(Process.delete({:bulk_destroy_notifications, ref}))
else
[]
end end
end
notifications =
if Process.get(:ash_started_transaction?) && !opts[:return_notifications?] do
Process.put(:ash_notifications, Process.get(:ash_notifications, []) ++ notifications)
[] []
else
notifications
end end
{errors, error_count} = Process.get({:bulk_destroy_errors, ref}) || {[], 0} {errors, error_count} = Process.get({:bulk_destroy_errors, ref}) || {[], 0}
@ -1804,6 +1812,14 @@ defmodule Ash.Actions.Destroy.Bulk do
{changeset, %{notifications: new_notifications}} = {changeset, %{notifications: new_notifications}} =
Ash.Changeset.run_before_actions(changeset) Ash.Changeset.run_before_actions(changeset)
if !changeset.valid? && opts[:rollback_on_error?] do
Ash.Actions.Helpers.rollback_if_in_transaction(
{:error, changeset.errors},
resource,
nil
)
end
new_notifications = store_notification(ref, new_notifications, opts) new_notifications = store_notification(ref, new_notifications, opts)
{changeset, manage_notifications} = {changeset, manage_notifications} =

View file

@ -1179,7 +1179,10 @@ defmodule Ash.Actions.Update.Bulk do
) )
after after
if opts[:notify?] && !opts[:return_notifications?] do if opts[:notify?] && !opts[:return_notifications?] do
Ash.Notifier.notify(Process.delete({:bulk_update_notifications, ref})) Process.put(
{:bulk_update_notifications, ref},
Ash.Notifier.notify(Process.delete({:bulk_update_notifications, ref}) || [])
)
end end
end end
end end
@ -1206,9 +1209,17 @@ defmodule Ash.Actions.Update.Bulk do
else else
if opts[:notify?] do if opts[:notify?] do
Ash.Notifier.notify(Process.delete({:bulk_update_notifications, ref})) Ash.Notifier.notify(Process.delete({:bulk_update_notifications, ref}))
else
[]
end end
end
notifications =
if Process.get(:ash_started_transaction?) && !opts[:return_notifications?] do
Process.put(:ash_notifications, Process.get(:ash_notifications, []) ++ notifications)
[] []
else
notifications
end end
{errors, error_count} = Process.get({:bulk_update_errors, ref}) || {[], 0} {errors, error_count} = Process.get({:bulk_update_errors, ref}) || {[], 0}
@ -2055,6 +2066,14 @@ defmodule Ash.Actions.Update.Bulk do
{changeset, %{notifications: new_notifications}} = {changeset, %{notifications: new_notifications}} =
Ash.Changeset.run_before_actions(changeset) Ash.Changeset.run_before_actions(changeset)
if !changeset.valid? && opts[:rollback_on_error?] do
Ash.Actions.Helpers.rollback_if_in_transaction(
{:error, changeset.errors},
resource,
nil
)
end
changeset = changeset =
changeset changeset
|> Ash.Changeset.handle_allow_nil_atomics(opts[:actor]) |> Ash.Changeset.handle_allow_nil_atomics(opts[:actor])

View file

@ -2194,8 +2194,8 @@ defmodule Ash.Query do
#{Spark.Options.docs(@build_opts)} #{Spark.Options.docs(@build_opts)}
""" """
@spec build(Ash.Resource.t(), Ash.Domain.t() | nil, Keyword.t()) :: t() @spec build(Ash.Resource.t() | t(), Ash.Domain.t() | nil, Keyword.t()) :: t()
@spec build(Ash.Resource.t(), Keyword.t()) :: t() @spec build(Ash.Resource.t() | t(), Keyword.t()) :: t()
def build(resource, domain \\ nil, keyword) do def build(resource, domain \\ nil, keyword) do
query = query =
resource resource

View file

@ -382,6 +382,18 @@ defmodule Ash.Test.Actions.BulkUpdateTest do
default_accept :* default_accept :*
defaults [:read, :destroy, :create, :update] defaults [:read, :destroy, :create, :update]
update :error_in_before_action_on_title2 do
atomic_upgrade? false
change before_action(fn changeset, context ->
if changeset.data.title == "title2" do
Ash.Changeset.add_error(changeset, "not good")
else
changeset
end
end)
end
update :update_with_after_action do update :update_with_after_action do
atomic_upgrade? false atomic_upgrade? false
@ -605,6 +617,75 @@ defmodule Ash.Test.Actions.BulkUpdateTest do
}} }}
end end
test "sends only the correct amount notifications with stream strategy in transactions, when run in a before action hook" do
MnesiaPost
|> Ash.Changeset.for_create(:create, %{title: "title_pre"})
|> Ash.Changeset.before_action(fn changeset ->
Ash.bulk_create!([%{title: "title1"}, %{title: "title2"}], MnesiaPost, :create,
return_stream?: true,
return_records?: true,
authorize?: false
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Ash.bulk_update!(:update, %{title2: "updated value"},
resource: MnesiaPost,
strategy: :stream,
allow_stream_with: :full_read,
return_records?: true,
notify?: true,
return_errors?: true,
authorize?: false
)
changeset
end)
|> Ash.create!()
assert_received {:notification, %{changeset: %{action: %{type: :create}}}}
assert_received {:notification,
%{
data: %{title: "title1"},
changeset: %{attributes: %{title2: "updated value"}}
}}
assert_received {:notification,
%{
data: %{title: "title2"},
changeset: %{attributes: %{title2: "updated value"}}
}}
refute_received {:notification, _}
end
test "rolls back on errors in a `before_action` hook" do
Ash.bulk_create!([%{title: "title1"}, %{title: "title2"}], MnesiaPost, :create,
return_stream?: true,
return_records?: true,
authorize?: false
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Ash.bulk_update(:error_in_before_action_on_title2, %{title2: "updated value"},
resource: MnesiaPost,
strategy: :stream,
allow_stream_with: :full_read,
return_records?: true,
notify?: true,
return_errors?: true,
authorize?: false
)
# the first mutation didn't happen
assert MnesiaPost |> Ash.read!() |> Enum.map(& &1.title) |> Enum.sort() == [
"title1",
"title2"
]
end
test "sends notifications with after action hooks in stream strategy in transactions" do test "sends notifications with after action hooks in stream strategy in transactions" do
assert %Ash.BulkResult{records: [%{title2: "updated value"}, %{title2: "updated value"}]} = assert %Ash.BulkResult{records: [%{title2: "updated value"}, %{title2: "updated value"}]} =
Ash.bulk_create!([%{title: "title1"}, %{title: "title2"}], MnesiaPost, :create, Ash.bulk_create!([%{title: "title1"}, %{title: "title2"}], MnesiaPost, :create,