improvement: support transactions in bulk updates

This commit is contained in:
Zach Daniel 2024-01-23 21:36:17 -05:00
parent 878011bd3e
commit 1807523ba2
4 changed files with 183 additions and 57 deletions

View file

@ -63,38 +63,86 @@ defmodule Ash.Actions.Update.Bulk do
}
atomic_changeset ->
with {:ok, query} <- authorize_bulk_query(query, opts),
{:ok, atomic_changeset, query} <-
authorize_changeset(query, atomic_changeset, opts),
{:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do
case Ash.DataLayer.update_query(
data_layer_query,
atomic_changeset,
Map.new(Keyword.take(opts, [:return_records?, :tenant]))
) do
:ok ->
%Ash.BulkResult{
status: :success
}
has_after_action_hooks? = not(Enum.empty?(atomic_changeset.after_action || []))
# There are performance implications here. We probably need to explicitly enable
# having after action hooks. Or perhaps we need to stream the ids and then bulk update
# them.
opts =
if has_after_action_hooks? || opts[:notify?] do
Keyword.put(opts, :return_records?, true)
else
opts
end
{:ok, results} ->
%Ash.BulkResult{
status: :success,
records: results
}
{:error, error} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(error)]
}
notify? =
if opts[:notify?] do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
else
{:error, error} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(error)]
false
end
context_key =
case action.type do
:update ->
:bulk_update
:destroy ->
:bulk_destroy
end
bulk_result =
if has_after_action_hooks? do
Ash.DataLayer.transaction(
List.wrap(atomic_changeset.resource) ++ action.touches_resources,
fn ->
do_atomic_update(query, atomic_changeset, has_after_action_hooks?, opts)
end,
opts[:timeout],
%{
type: context_key,
metadata: %{
resource: query.resource,
action: atomic_changeset.action.name,
actor: opts[:actor]
},
data_layer_context: opts[:data_layer_context] || %{}
}
)
else
do_atomic_update(query, atomic_changeset, has_after_action_hooks?, opts)
end
notifications =
if notify? do
List.wrap(bulk_result.notifications) ++ Process.delete(:ash_notifications)
else
List.wrap(bulk_result.notifications)
end
if opts[:return_notifications?] do
%{bulk_result | notifications: notifications}
else
if opts[:return_notifications?] do
bulk_result
else
if notify? do
notifications = bulk_result.notifications ++ Process.get(:ash_notifications, [])
remaining_notifications = Ash.Notifier.notify(notifications)
Process.delete(:ash_notifications) || []
Ash.Actions.Helpers.warn_missed!(atomic_changeset.resource, action, %{
resource_notifications: remaining_notifications
})
%{bulk_result | notifications: notifications}
end
end
end
end
end
@ -142,16 +190,12 @@ defmodule Ash.Actions.Update.Bulk do
if opts[:transaction] == :all &&
Ash.DataLayer.data_layer_can?(resource, :transact) do
notify? =
if opts[:notify?] do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
else
false
end
Ash.DataLayer.transaction(
List.wrap(resource) ++ action.touches_resources,
@ -194,6 +238,84 @@ defmodule Ash.Actions.Update.Bulk do
end
end
defp do_atomic_update(query, atomic_changeset, has_atomics?, opts) do
with {:ok, query} <- authorize_bulk_query(query, opts),
{:ok, atomic_changeset, query} <-
authorize_changeset(query, atomic_changeset, opts),
{:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do
case Ash.DataLayer.update_query(
data_layer_query,
atomic_changeset,
Map.new(Keyword.take(opts, [:return_records?, :tenant]))
) do
:ok ->
%Ash.BulkResult{
status: :success
}
{:ok, results} ->
{errors, results, notifications, error_count} =
if has_atomics? do
results
|> Enum.reduce({[], [], [], 0}, fn result, {errors, successes, notifications, error_count} ->
case Ash.Changeset.run_after_actions(result, atomic_changeset, []) do
{:error, error} ->
{[error | errors], successes, error_count + 1}
{:ok, result, _changeset, %{notifications: new_notifications}} ->
{errors, [result | successes], notifications ++ new_notifications, error_count}
end
end)
|> then(fn {errors, successes, error_count} ->
{Enum.reverse(errors), Enum.reverse(successes), error_count}
end)
else
{[], results, 0}
end
status =
case {error_count, results} do
{0, []} ->
:success
{0, _results} ->
:success
{_error_count, []} ->
:error
{0, _results} ->
:success
end
%Ash.BulkResult{
status: status,
error_count: error_count,
notifications: notifications,
errors: errors,
records: results
}
{:error, error} ->
%Ash.BulkResult{
status: :error,
error_count: 1,
notifications: [],
errors: [Ash.Error.to_error_class(error)]
}
end
else
{:error, error} ->
%Ash.BulkResult{
status: :error,
error_count: 1,
notifications: [],
errors: [Ash.Error.to_error_class(error)]
}
end
end
def do_run(api, stream, action, input, opts, metadata_key, context_key) do
resource = opts[:resource]
opts = Ash.Actions.Helpers.set_opts(opts, api)

View file

@ -72,7 +72,9 @@ defmodule Ash.Test.Actions.AtomicUpdateTest do
test "a changeset can be fully atomic" do
changeset =
Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"}, eager?: false)
Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"},
eager?: false
)
assert changeset.valid?
assert changeset.atomics[:name]

View file

@ -363,7 +363,9 @@ defmodule Ash.Test.Actions.UpdateTest do
test "a changeset can be fully atomic" do
changeset =
Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"}, eager?: false)
Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"},
eager?: false
)
assert changeset.valid?
assert changeset.atomics[:name]