fix: ensure notifications are dispatched from bulk actions

This commit is contained in:
Zach Daniel 2024-06-28 17:10:29 -04:00
parent 66ebb1422e
commit 659a061b16
6 changed files with 116 additions and 137 deletions

View file

@ -196,15 +196,11 @@ defmodule Ash.Actions.Destroy.Bulk do
end end
notify? = notify? =
if opts[:notify?] do if Process.get(:ash_started_transaction?) do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
else
false false
else
Process.put(:ash_started_transaction?, true)
true
end end
try do try do
@ -250,71 +246,65 @@ defmodule Ash.Actions.Destroy.Bulk do
:bulk_destroy :bulk_destroy
end end
bulk_result = if (has_after_batch_hooks? || !Enum.empty?(atomic_changeset.after_action)) &&
if has_after_batch_hooks? && opts[:transaction] do Keyword.get(opts, :transaction, true) do
Ash.DataLayer.transaction( Ash.DataLayer.transaction(
List.wrap(atomic_changeset.resource) ++ action.touches_resources, List.wrap(atomic_changeset.resource) ++ action.touches_resources,
fn -> fn ->
do_atomic_destroy(query, atomic_changeset, has_after_batch_hooks?, input, opts) do_atomic_destroy(query, atomic_changeset, has_after_batch_hooks?, input, opts)
end, end,
opts[:timeout], opts[:timeout],
%{ %{
type: context_key, type: context_key,
metadata: %{ metadata: %{
resource: query.resource, resource: query.resource,
action: atomic_changeset.action.name, action: atomic_changeset.action.name,
actor: opts[:actor] actor: opts[:actor]
}, },
data_layer_context: opts[:data_layer_context] || %{} data_layer_context: opts[:data_layer_context] || %{}
} }
) )
|> case do
{:ok, bulk_result} ->
bulk_result
{:error, error} ->
%Ash.BulkResult{
status: :error,
error_count: 1,
errors: [Ash.Error.to_error_class(error)]
}
end
else
do_atomic_destroy(query, atomic_changeset, has_after_batch_hooks?, input, opts)
end
notifications =
if notify? do
List.wrap(bulk_result.notifications) ++
List.wrap(Process.delete(:ash_notifications))
else
List.wrap(bulk_result.notifications)
end
if opts[:return_notifications?] do
%{bulk_result | notifications: notifications}
else else
if notify? do {:ok, do_atomic_destroy(query, atomic_changeset, has_after_batch_hooks?, input, opts)}
notifications = end
(bulk_result.notifications || []) ++ Process.get(:ash_notifications, []) |> case do
{:ok, bulk_result} ->
if opts[:return_notifications?] do
bulk_result
else
if notify? do
notifications =
List.wrap(Process.delete(:ash_notifications)) ++ bulk_result.notifications
remaining_notifications = Ash.Notifier.notify(notifications) if opts[:notify?] do
Process.delete(:ash_notifications) || [] remaining_notifications = Ash.Notifier.notify(notifications)
Ash.Actions.Helpers.warn_missed!(atomic_changeset.resource, action, %{ Ash.Actions.Helpers.warn_missed!(atomic_changeset.resource, action, %{
resource_notifications: remaining_notifications resource_notifications: remaining_notifications
}) })
%{bulk_result | notifications: notifications} %{bulk_result | notifications: notifications}
else else
if opts[:notify?] do %{bulk_result | notifications: []}
Ash.Actions.Helpers.warn_missed!(atomic_changeset.resource, action, %{ end
resource_notifications: notifications else
}) process_notifications = List.wrap(Process.get(:ash_notifications, []))
Process.put(
:ash_notifications,
process_notifications ++ bulk_result.notifications
)
%{bulk_result | notifications: []}
end
end end
%{bulk_result | notifications: []} {:error, error} ->
end %Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_ash_error(error)],
error_count: 1
}
end end
after after
if notify? do if notify? do
@ -452,7 +442,7 @@ defmodule Ash.Actions.Destroy.Bulk do
end end
after after
if notify? do if notify? do
Process.put(:ash_started_transaction?, false) Process.delete(:ash_started_transaction?)
end end
end end
else else
@ -1295,8 +1285,6 @@ defmodule Ash.Actions.Destroy.Bulk do
end end
after after
if notify? do if notify? do
Process.put(:ash_started_transaction?, false)
notifications = Process.get(:ash_notifications, []) notifications = Process.get(:ash_notifications, [])
remaining_notifications = Ash.Notifier.notify(notifications) remaining_notifications = Ash.Notifier.notify(notifications)
Process.delete(:ash_notifications) || [] Process.delete(:ash_notifications) || []
@ -1304,6 +1292,8 @@ defmodule Ash.Actions.Destroy.Bulk do
Ash.Actions.Helpers.warn_missed!(resource, action, %{ Ash.Actions.Helpers.warn_missed!(resource, action, %{
resource_notifications: remaining_notifications resource_notifications: remaining_notifications
}) })
Process.delete(:ash_started_transaction?)
end end
end end
else else
@ -1462,7 +1452,7 @@ defmodule Ash.Actions.Destroy.Bulk do
{batch_result, notifications, errors} {batch_result, notifications, errors}
after after
Process.put(:ash_started_transaction?, false) Process.delete(:ash_started_transaction?)
end end
end, end,
timeout: :infinity, timeout: :infinity,

View file

@ -804,7 +804,7 @@ defmodule Ash.Actions.Read do
end end
after after
if notify? do if notify? do
Process.put(:ash_started_transaction?, false) Process.delete(:ash_started_transaction?)
end end
end end
end end

View file

@ -152,15 +152,11 @@ defmodule Ash.Actions.Update.Bulk do
atomic_changeset = %{atomic_changeset | domain: domain} atomic_changeset = %{atomic_changeset | domain: domain}
notify? = notify? =
if opts[:notify?] do if Process.get(:ash_started_transaction?) do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
else
false false
else
Process.put(:ash_started_transaction?, true)
true
end end
try do try do
@ -233,21 +229,14 @@ defmodule Ash.Actions.Update.Bulk do
end end
|> case do |> case do
{:ok, bulk_result} -> {:ok, bulk_result} ->
notifications =
if notify? do
List.wrap(bulk_result.notifications) ++
List.wrap(Process.delete(:ash_notifications))
else
List.wrap(bulk_result.notifications)
end
if opts[:return_notifications?] do if opts[:return_notifications?] do
%{bulk_result | notifications: notifications} bulk_result
else else
if opts[:return_notifications?] do if notify? do
bulk_result notifications =
else List.wrap(Process.delete(:ash_notifications)) ++ bulk_result.notifications
if notify? do
if opts[:notify?] do
remaining_notifications = Ash.Notifier.notify(notifications) remaining_notifications = Ash.Notifier.notify(notifications)
Ash.Actions.Helpers.warn_missed!(atomic_changeset.resource, action, %{ Ash.Actions.Helpers.warn_missed!(atomic_changeset.resource, action, %{
@ -256,8 +245,17 @@ defmodule Ash.Actions.Update.Bulk do
%{bulk_result | notifications: notifications} %{bulk_result | notifications: notifications}
else else
bulk_result %{bulk_result | notifications: []}
end end
else
process_notifications = List.wrap(Process.get(:ash_notifications, []))
Process.put(
:ash_notifications,
process_notifications ++ bulk_result.notifications
)
%{bulk_result | notifications: []}
end end
end end
@ -270,7 +268,7 @@ defmodule Ash.Actions.Update.Bulk do
end end
after after
if notify? do if notify? do
Process.put(:ash_started_transaction?, false) Process.delete(:ash_started_transaction?)
end end
end end
end end
@ -409,7 +407,7 @@ defmodule Ash.Actions.Update.Bulk do
end end
after after
if notify? do if notify? do
Process.put(:ash_started_transaction?, false) Process.delete(:ash_started_transaction?)
end end
end end
else else
@ -1517,7 +1515,7 @@ defmodule Ash.Actions.Update.Bulk do
end end
after after
if notify? do if notify? do
Process.put(:ash_started_transaction?, false) Process.delete(:ash_started_transaction?)
notifications = Process.get(:ash_notifications, []) notifications = Process.get(:ash_notifications, [])
remaining_notifications = Ash.Notifier.notify(notifications) remaining_notifications = Ash.Notifier.notify(notifications)
Process.delete(:ash_notifications) || [] Process.delete(:ash_notifications) || []
@ -1666,32 +1664,28 @@ defmodule Ash.Actions.Update.Bulk do
Task.async_stream( Task.async_stream(
stream, stream,
fn batch -> fn batch ->
try do Process.put(:ash_started_transaction?, true)
Process.put(:ash_started_transaction?, true) batch_result = callback.(batch)
batch_result = callback.(batch) {errors, _} = Process.get({:bulk_update_errors, ref}) || {[], 0}
{errors, _} = Process.get({:bulk_update_errors, ref}) || {[], 0}
notifications = notifications =
if opts[:notify?] do if opts[:notify?] do
process_notifications = Process.get(:ash_notifications, []) process_notifications = Process.get(:ash_notifications, [])
bulk_notifications = Process.get({:bulk_update_notifications, ref}) || [] bulk_notifications = Process.get({:bulk_update_notifications, ref}) || []
if opts[:return_notifications?] do if opts[:return_notifications?] do
process_notifications ++ bulk_notifications process_notifications ++ bulk_notifications
else else
if opts[:transaction] && opts[:transaction] != :all do if opts[:transaction] && opts[:transaction] != :all do
Ash.Notifier.notify(bulk_notifications) Ash.Notifier.notify(bulk_notifications)
Ash.Notifier.notify(process_notifications) Ash.Notifier.notify(process_notifications)
end
[]
end end
end
{batch_result, notifications, errors} []
after end
Process.put(:ash_started_transaction?, false) end
end
{batch_result, notifications, errors}
end, end,
timeout: :infinity, timeout: :infinity,
max_concurrency: max_concurrency max_concurrency: max_concurrency

View file

@ -75,7 +75,7 @@ defmodule Ash.Actions.Update do
nil} nil}
opts[:atomic_upgrade?] == false -> opts[:atomic_upgrade?] == false ->
{{:not_atomic, "atomic upgrade was disabled"}, nil} {{:not_atomic, "atomic upgrade was disabled with opts"}, nil}
true -> true ->
params = params =

View file

@ -2981,17 +2981,7 @@ defmodule Ash.Changeset do
) )
|> case do |> case do
{:ok, {:ok, value, changeset, instructions}} -> {:ok, {:ok, value, changeset, instructions}} ->
notifications = {:ok, value, changeset, instructions}
if notify? && !opts[:return_notifications?] do
Enum.concat(
instructions[:notifications] || [],
Process.delete(:ash_notifications) || []
)
else
instructions[:notifications] || []
end
{:ok, value, changeset, Map.put(instructions, :notifications, notifications)}
{:ok, {:error, error}} -> {:ok, {:error, error}} ->
{:error, error} {:error, error}
@ -3039,11 +3029,15 @@ defmodule Ash.Changeset do
{:ok, value, changeset, Map.put(instructions, :notifications, [])} {:ok, value, changeset, Map.put(instructions, :notifications, [])}
else else
notifications =
List.wrap(Process.delete(:ash_notifications)) ++
(instructions[:notifications] || [])
notifications = notifications =
if opts[:return_notifications?] do if opts[:return_notifications?] do
instructions[:notifications] || [] notifications
else else
Ash.Notifier.notify(instructions[:notifications] || []) Ash.Notifier.notify(notifications)
end end
{:ok, value, changeset, Map.put(instructions, :notifications, notifications)} {:ok, value, changeset, Map.put(instructions, :notifications, notifications)}

View file

@ -2905,10 +2905,11 @@ defmodule Ash.Query do
def data_layer_query(%{resource: resource, domain: domain} = ash_query, opts) do def data_layer_query(%{resource: resource, domain: domain} = ash_query, opts) do
query = opts[:initial_query] || Ash.DataLayer.resource_to_query(resource, domain) query = opts[:initial_query] || Ash.DataLayer.resource_to_query(resource, domain)
context = ash_query.context context =
|> Map.put(:action, ash_query.action) ash_query.context
|> Map.put_new(:private, %{}) |> Map.put(:action, ash_query.action)
|> put_in([:private, :tenant], ash_query.tenant) |> Map.put_new(:private, %{})
|> put_in([:private, :tenant], ash_query.tenant)
with {:ok, query} <- with {:ok, query} <-
Ash.DataLayer.set_context( Ash.DataLayer.set_context(