diff --git a/lib/ash/actions/update/bulk.ex b/lib/ash/actions/update/bulk.ex index 5f2a19c8..76ee0e6a 100644 --- a/lib/ash/actions/update/bulk.ex +++ b/lib/ash/actions/update/bulk.ex @@ -63,39 +63,87 @@ defmodule Ash.Actions.Update.Bulk do } atomic_changeset -> - with {:ok, query} <- authorize_bulk_query(query, opts), - {:ok, atomic_changeset, query} <- - authorize_changeset(query, atomic_changeset, opts), - {:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do - case Ash.DataLayer.update_query( - data_layer_query, - atomic_changeset, - Map.new(Keyword.take(opts, [:return_records?, :tenant])) - ) do - :ok -> - %Ash.BulkResult{ - status: :success - } - - {:ok, results} -> - %Ash.BulkResult{ - status: :success, - records: results - } - - {:error, error} -> - %Ash.BulkResult{ - status: :error, - errors: [Ash.Error.to_error_class(error)] - } + has_after_action_hooks? = not(Enum.empty?(atomic_changeset.after_action || [])) + # There are performance implications here. We probably need to explicitly enable + # having after action hooks. Or perhaps we need to stream the ids and then bulk update + # them. + opts = + if has_after_action_hooks? || opts[:notify?] do + Keyword.put(opts, :return_records?, true) + else + opts end - else - {:error, error} -> - %Ash.BulkResult{ - status: :error, - errors: [Ash.Error.to_error_class(error)] + + notify? = + if opts[:notify?] do + if Process.get(:ash_started_transaction?) do + false + else + Process.put(:ash_started_transaction?, true) + true + end + else + false + end + + context_key = + case action.type do + :update -> + :bulk_update + + :destroy -> + :bulk_destroy + end + + + bulk_result = + if has_after_action_hooks? do + Ash.DataLayer.transaction( + List.wrap(atomic_changeset.resource) ++ action.touches_resources, + fn -> + do_atomic_update(query, atomic_changeset, has_after_action_hooks?, opts) + end, + opts[:timeout], + %{ + type: context_key, + metadata: %{ + resource: query.resource, + action: atomic_changeset.action.name, + actor: opts[:actor] + }, + data_layer_context: opts[:data_layer_context] || %{} } + ) + else + do_atomic_update(query, atomic_changeset, has_after_action_hooks?, opts) end + + notifications = + if notify? do + List.wrap(bulk_result.notifications) ++ Process.delete(:ash_notifications) + else + List.wrap(bulk_result.notifications) + end + + if opts[:return_notifications?] do + %{bulk_result | notifications: notifications} + else + if opts[:return_notifications?] do + bulk_result + else + if notify? do + notifications = bulk_result.notifications ++ Process.get(:ash_notifications, []) + remaining_notifications = Ash.Notifier.notify(notifications) + Process.delete(:ash_notifications) || [] + + Ash.Actions.Helpers.warn_missed!(atomic_changeset.resource, action, %{ + resource_notifications: remaining_notifications + }) + + %{bulk_result | notifications: notifications} + end + end + end end end @@ -142,15 +190,11 @@ defmodule Ash.Actions.Update.Bulk do if opts[:transaction] == :all && Ash.DataLayer.data_layer_can?(resource, :transact) do notify? = - if opts[:notify?] do - if Process.get(:ash_started_transaction?) do - false - else - Process.put(:ash_started_transaction?, true) - true - end - else + if Process.get(:ash_started_transaction?) do false + else + Process.put(:ash_started_transaction?, true) + true end Ash.DataLayer.transaction( @@ -194,6 +238,84 @@ defmodule Ash.Actions.Update.Bulk do end end + defp do_atomic_update(query, atomic_changeset, has_atomics?, opts) do + with {:ok, query} <- authorize_bulk_query(query, opts), + {:ok, atomic_changeset, query} <- + authorize_changeset(query, atomic_changeset, opts), + {:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do + case Ash.DataLayer.update_query( + data_layer_query, + atomic_changeset, + Map.new(Keyword.take(opts, [:return_records?, :tenant])) + ) do + :ok -> + %Ash.BulkResult{ + status: :success + } + + {:ok, results} -> + {errors, results, notifications, error_count} = + if has_atomics? do + results + |> Enum.reduce({[], [], [], 0}, fn result, {errors, successes, notifications, error_count} -> + case Ash.Changeset.run_after_actions(result, atomic_changeset, []) do + {:error, error} -> + {[error | errors], successes, error_count + 1} + + {:ok, result, _changeset, %{notifications: new_notifications}} -> + {errors, [result | successes], notifications ++ new_notifications, error_count} + end + end) + |> then(fn {errors, successes, error_count} -> + {Enum.reverse(errors), Enum.reverse(successes), error_count} + end) + else + {[], results, 0} + end + + status = + case {error_count, results} do + {0, []} -> + :success + + {0, _results} -> + :success + + {_error_count, []} -> + :error + + + {0, _results} -> + :success + end + + %Ash.BulkResult{ + status: status, + error_count: error_count, + notifications: notifications, + errors: errors, + records: results + } + + {:error, error} -> + %Ash.BulkResult{ + status: :error, + error_count: 1, + notifications: [], + errors: [Ash.Error.to_error_class(error)] + } + end + else + {:error, error} -> + %Ash.BulkResult{ + status: :error, + error_count: 1, + notifications: [], + errors: [Ash.Error.to_error_class(error)] + } + end + end + def do_run(api, stream, action, input, opts, metadata_key, context_key) do resource = opts[:resource] opts = Ash.Actions.Helpers.set_opts(opts, api) diff --git a/lib/ash/changeset/changeset.ex b/lib/ash/changeset/changeset.ex index a6a56139..1e2eb776 100644 --- a/lib/ash/changeset/changeset.ex +++ b/lib/ash/changeset/changeset.ex @@ -1649,25 +1649,25 @@ defmodule Ash.Changeset do @doc false def hydrate_atomic_refs(changeset, actor, opts \\ []) do hydrated_changeset = - %{ - changeset - | atomics: - Enum.map(changeset.atomics, fn {key, expr} -> - expr = - Ash.Filter.build_filter_from_template( - expr, - actor, - changeset.arguments, - changeset.context, - changeset - ) + %{ + changeset + | atomics: + Enum.map(changeset.atomics, fn {key, expr} -> + expr = + Ash.Filter.build_filter_from_template( + expr, + actor, + changeset.arguments, + changeset.context, + changeset + ) - {:ok, expr} = - Ash.Filter.hydrate_refs(expr, %{resource: changeset.resource, public?: false}) + {:ok, expr} = + Ash.Filter.hydrate_refs(expr, %{resource: changeset.resource, public?: false}) - {key, expr} - end) - } + {key, expr} + end) + } if Keyword.get(opts, :eager?, true) do add_known_atomic_errors(hydrated_changeset) diff --git a/test/actions/atomic_update_test.exs b/test/actions/atomic_update_test.exs index cb61e599..74051e44 100644 --- a/test/actions/atomic_update_test.exs +++ b/test/actions/atomic_update_test.exs @@ -72,7 +72,9 @@ defmodule Ash.Test.Actions.AtomicUpdateTest do test "a changeset can be fully atomic" do changeset = - Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"}, eager?: false) + Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"}, + eager?: false + ) assert changeset.valid? assert changeset.atomics[:name] diff --git a/test/actions/update_test.exs b/test/actions/update_test.exs index af1c11fe..0cf536d7 100644 --- a/test/actions/update_test.exs +++ b/test/actions/update_test.exs @@ -363,7 +363,9 @@ defmodule Ash.Test.Actions.UpdateTest do test "a changeset can be fully atomic" do changeset = - Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"}, eager?: false) + Ash.Changeset.fully_atomic_changeset(Author, :with_validation, %{name: "fred weasly"}, + eager?: false + ) assert changeset.valid? assert changeset.atomics[:name]