From dc4a63c86cc55cfc821cc8a4e2c90d5e8a4a1f50 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Fri, 26 Jul 2024 11:38:41 -0400 Subject: [PATCH] fix: ensure that statuses are set correctly on bulk actions fix: properly transfer process context(tracers) for bulk actions closes #1332 --- lib/ash/actions/create/bulk.ex | 31 +++++++++++++++++++++----- lib/ash/actions/destroy/bulk.ex | 29 +++++++++++++++++++----- lib/ash/actions/update/bulk.ex | 31 +++++++++++++++++++++----- test/actions/bulk/bulk_create_test.exs | 22 ++++++++++++++++++ 4 files changed, 96 insertions(+), 17 deletions(-) diff --git a/lib/ash/actions/create/bulk.ex b/lib/ash/actions/create/bulk.ex index 4cf94961..90d37e5d 100644 --- a/lib/ash/actions/create/bulk.ex +++ b/lib/ash/actions/create/bulk.ex @@ -189,7 +189,7 @@ defmodule Ash.Actions.Create.Bulk do Enum.to_list(Stream.concat(changeset_stream)) else Stream.run(changeset_stream) - [] + nil end notifications = @@ -213,9 +213,18 @@ defmodule Ash.Actions.Create.Bulk do } case bulk_result do - %{records: _, error_count: 0} -> %{bulk_result | status: :success} - %{records: [], error_count: _} -> %{bulk_result | status: :error} - _ -> %{bulk_result | status: :partial_success} + %{records: _, error_count: 0} -> + %{bulk_result | status: :success} + + %{records: [], error_count: _} -> + %{bulk_result | status: :error} + + _ -> + if Process.get({:any_success?, ref}) do + %{bulk_result | status: :partial_success} + else + %{bulk_result | status: :error} + end end catch {:error, error, batch_number} -> @@ -399,6 +408,8 @@ defmodule Ash.Actions.Create.Bulk do Enum.flat_map(must_be_simple, fn changeset -> case Ash.Actions.Create.run(domain, changeset, action, opts) do {:ok, result} -> + Process.put({:any_success?, ref}, true) + [ Ash.Resource.set_metadata(result, %{ bulk_create_index: changeset.context.bulk_create.index @@ -627,9 +638,13 @@ defmodule Ash.Actions.Create.Bulk do end if max_concurrency && max_concurrency > 1 do + ash_context = Ash.ProcessHelpers.get_context_for_transfer(opts) + Task.async_stream( stream, fn batch -> + Ash.ProcessHelpers.transfer_context(ash_context, opts) + try do Process.put(:ash_started_transaction?, true) batch_result = callback.(batch) @@ -652,7 +667,7 @@ defmodule Ash.Actions.Create.Bulk do end end - {batch_result, notifications, errors} + {batch_result, notifications, errors, Process.get({:any_success?, ref})} catch value -> {:throw, value} @@ -665,7 +680,9 @@ defmodule Ash.Actions.Create.Bulk do {:ok, {:throw, value}} -> throw(value) - {:ok, {result, notifications, errors}} -> + {:ok, {result, notifications, errors, any_success?}} -> + Process.put({:any_success?, ref}, any_success?) + store_notification(ref, notifications, opts) store_error(ref, errors, opts) @@ -1165,9 +1182,11 @@ defmodule Ash.Actions.Create.Bulk do case result do {:ok, result} -> + Process.put({:any_success?, ref}, true) result :ok -> + Process.put({:any_success?, ref}, true) [] {:error, :no_rollback, error} -> diff --git a/lib/ash/actions/destroy/bulk.ex b/lib/ash/actions/destroy/bulk.ex index df63c709..598f8069 100644 --- a/lib/ash/actions/destroy/bulk.ex +++ b/lib/ash/actions/destroy/bulk.ex @@ -935,15 +935,24 @@ defmodule Ash.Actions.Destroy.Bulk do not_atomic_reason ) |> case do - %Ash.BulkResult{error_count: 0, records: records, notifications: notifications} -> + %Ash.BulkResult{ + error_count: 0, + records: records, + notifications: notifications, + status: status + } -> + Process.put({:any_success?, ref}, status != :error) + store_notification(ref, notifications, opts) List.wrap(records) %Ash.BulkResult{ errors: errors, notifications: notifications, - error_count: error_count + error_count: error_count, + status: status } -> + Process.put({:any_success?, ref}, status != :error) store_notification(ref, notifications, opts) store_error(ref, errors, opts, error_count) [] @@ -1023,7 +1032,7 @@ defmodule Ash.Actions.Destroy.Bulk do Enum.to_list(Stream.concat(changeset_stream)) else Stream.run(changeset_stream) - [] + nil end notifications = @@ -1277,9 +1286,13 @@ defmodule Ash.Actions.Destroy.Bulk do Keyword.put(opts, :return_destroyed?, opts[:return_records?]) ) do :ok -> + Process.put({:any_success?, ref}, true) + [] {:ok, result} when not is_list(result) -> + Process.put({:any_success?, ref}, true) + [ Ash.Resource.set_metadata(result, %{ bulk_destroy_index: changeset.context.bulk_destroy.index @@ -1500,10 +1513,13 @@ defmodule Ash.Actions.Destroy.Bulk do end if max_concurrency && max_concurrency > 1 do + ash_context = Ash.ProcessHelpers.get_context_for_transfer(opts) + Task.async_stream( stream, fn batch -> try do + Ash.ProcessHelpers.transfer_context(ash_context, opts) Process.put(:ash_started_transaction?, true) batch_result = callback.(batch) {errors, _} = Process.get({:bulk_destroy_errors, ref}) || {[], 0} @@ -1525,7 +1541,7 @@ defmodule Ash.Actions.Destroy.Bulk do end end - {batch_result, notifications, errors} + {batch_result, notifications, errors, Process.get({:any_success?, ref})} after Process.delete(:ash_started_transaction?) end @@ -1537,7 +1553,8 @@ defmodule Ash.Actions.Destroy.Bulk do {:ok, {:throw, value}} -> throw(value) - {:ok, {result, notifications, errors}} -> + {:ok, {result, notifications, errors, any_success?}} -> + Process.put({:any_success?, ref}, any_success?) store_notification(ref, notifications, opts) store_error(ref, errors, opts) @@ -1921,6 +1938,8 @@ defmodule Ash.Actions.Destroy.Bulk do [] {:ok, result} -> + Process.put({:any_success?, ref}, true) + result {:error, error} -> diff --git a/lib/ash/actions/update/bulk.ex b/lib/ash/actions/update/bulk.ex index bb3de3b8..42f50ac4 100644 --- a/lib/ash/actions/update/bulk.ex +++ b/lib/ash/actions/update/bulk.ex @@ -1089,15 +1089,24 @@ defmodule Ash.Actions.Update.Bulk do strategy: [:atomic] ) |> case do - %Ash.BulkResult{error_count: 0, records: records, notifications: notifications} -> + %Ash.BulkResult{ + error_count: 0, + records: records, + notifications: notifications, + status: status + } -> + Process.put({:any_success?, ref}, status != :error) + store_notification(ref, notifications, opts) List.wrap(records) %Ash.BulkResult{ errors: errors, notifications: notifications, - error_count: error_count + error_count: error_count, + status: status } -> + Process.put({:any_success?, ref}, status != :error) store_notification(ref, notifications, opts) store_error(ref, errors, opts, error_count) [] @@ -1188,7 +1197,7 @@ defmodule Ash.Actions.Update.Bulk do Enum.to_list(Stream.concat(changeset_stream)) else Stream.run(changeset_stream) - [] + nil end notifications = @@ -1490,6 +1499,8 @@ defmodule Ash.Actions.Update.Bulk do Keyword.put(opts, :atomic_upgrade?, false) ) do {:ok, result} -> + Process.put({:any_success?, ref}, true) + [ Ash.Resource.set_metadata(result, %{ metadata_key => changeset.context |> Map.get(context_key) |> Map.get(:index) @@ -1721,9 +1732,12 @@ defmodule Ash.Actions.Update.Bulk do end if max_concurrency && max_concurrency > 1 do + ash_context = Ash.ProcessHelpers.get_context_for_transfer(opts) + Task.async_stream( stream, fn batch -> + Ash.ProcessHelpers.transfer_context(ash_context, opts) Process.put(:ash_started_transaction?, true) batch_result = callback.(batch) {errors, _} = Process.get({:bulk_update_errors, ref}) || {[], 0} @@ -1745,7 +1759,7 @@ defmodule Ash.Actions.Update.Bulk do end end - {batch_result, notifications, errors} + {batch_result, notifications, errors, Process.get({:any_success?, ref})} end, timeout: :infinity, max_concurrency: max_concurrency @@ -1760,12 +1774,15 @@ defmodule Ash.Actions.Update.Bulk do notifications: notifications, errors: errors, error_count: error_count - }, _, _}} -> + }, _, _, any_success?}} -> + Process.put({:any_success?, ref}, any_success?) + store_notification(ref, notifications, opts) store_error(ref, errors, opts, error_count) records - {:ok, {result, notifications, errors}} -> + {:ok, {result, notifications, errors, any_success?}} -> + Process.put({:any_success?, ref}, any_success?) store_notification(ref, notifications, opts) store_error(ref, errors, opts) @@ -2212,6 +2229,8 @@ defmodule Ash.Actions.Update.Bulk do case result do {:ok, result} -> + Process.put({:any_success?, ref}, true) + result {:error, error} -> diff --git a/test/actions/bulk/bulk_create_test.exs b/test/actions/bulk/bulk_create_test.exs index 989403d4..10c9a033 100644 --- a/test/actions/bulk/bulk_create_test.exs +++ b/test/actions/bulk/bulk_create_test.exs @@ -510,6 +510,28 @@ defmodule Ash.Test.Actions.BulkCreateTest do ) end + test "properly sets the status to `:partial_success` without `return_records?`" do + org = + Org + |> Ash.Changeset.for_create(:create, %{}) + |> Ash.create!() + + assert %Ash.BulkResult{ + error_count: 1, + status: :partial_success, + errors: [%Ash.Error.Invalid{}] + } = + Ash.bulk_create( + [%{title: "title1"}, %{title: %{foo: :bar}}], + Post, + :create_with_change, + tenant: org.id, + return_errors?: true, + sorted?: true, + authorize?: false + ) + end + test "can upsert with list" do org = Org