fix: ensure that statuses are set correctly on bulk actions

fix: properly transfer process context(tracers) for bulk actions

closes #1332
This commit is contained in:
Zach Daniel 2024-07-26 11:38:41 -04:00
parent 3de985ccc5
commit dc4a63c86c
4 changed files with 96 additions and 17 deletions

View file

@ -189,7 +189,7 @@ defmodule Ash.Actions.Create.Bulk do
Enum.to_list(Stream.concat(changeset_stream))
else
Stream.run(changeset_stream)
[]
nil
end
notifications =
@ -213,9 +213,18 @@ defmodule Ash.Actions.Create.Bulk do
}
case bulk_result do
%{records: _, error_count: 0} -> %{bulk_result | status: :success}
%{records: [], error_count: _} -> %{bulk_result | status: :error}
_ -> %{bulk_result | status: :partial_success}
%{records: _, error_count: 0} ->
%{bulk_result | status: :success}
%{records: [], error_count: _} ->
%{bulk_result | status: :error}
_ ->
if Process.get({:any_success?, ref}) do
%{bulk_result | status: :partial_success}
else
%{bulk_result | status: :error}
end
end
catch
{:error, error, batch_number} ->
@ -399,6 +408,8 @@ defmodule Ash.Actions.Create.Bulk do
Enum.flat_map(must_be_simple, fn changeset ->
case Ash.Actions.Create.run(domain, changeset, action, opts) do
{:ok, result} ->
Process.put({:any_success?, ref}, true)
[
Ash.Resource.set_metadata(result, %{
bulk_create_index: changeset.context.bulk_create.index
@ -627,9 +638,13 @@ defmodule Ash.Actions.Create.Bulk do
end
if max_concurrency && max_concurrency > 1 do
ash_context = Ash.ProcessHelpers.get_context_for_transfer(opts)
Task.async_stream(
stream,
fn batch ->
Ash.ProcessHelpers.transfer_context(ash_context, opts)
try do
Process.put(:ash_started_transaction?, true)
batch_result = callback.(batch)
@ -652,7 +667,7 @@ defmodule Ash.Actions.Create.Bulk do
end
end
{batch_result, notifications, errors}
{batch_result, notifications, errors, Process.get({:any_success?, ref})}
catch
value ->
{:throw, value}
@ -665,7 +680,9 @@ defmodule Ash.Actions.Create.Bulk do
{:ok, {:throw, value}} ->
throw(value)
{:ok, {result, notifications, errors}} ->
{:ok, {result, notifications, errors, any_success?}} ->
Process.put({:any_success?, ref}, any_success?)
store_notification(ref, notifications, opts)
store_error(ref, errors, opts)
@ -1165,9 +1182,11 @@ defmodule Ash.Actions.Create.Bulk do
case result do
{:ok, result} ->
Process.put({:any_success?, ref}, true)
result
:ok ->
Process.put({:any_success?, ref}, true)
[]
{:error, :no_rollback, error} ->

View file

@ -935,15 +935,24 @@ defmodule Ash.Actions.Destroy.Bulk do
not_atomic_reason
)
|> case do
%Ash.BulkResult{error_count: 0, records: records, notifications: notifications} ->
%Ash.BulkResult{
error_count: 0,
records: records,
notifications: notifications,
status: status
} ->
Process.put({:any_success?, ref}, status != :error)
store_notification(ref, notifications, opts)
List.wrap(records)
%Ash.BulkResult{
errors: errors,
notifications: notifications,
error_count: error_count
error_count: error_count,
status: status
} ->
Process.put({:any_success?, ref}, status != :error)
store_notification(ref, notifications, opts)
store_error(ref, errors, opts, error_count)
[]
@ -1023,7 +1032,7 @@ defmodule Ash.Actions.Destroy.Bulk do
Enum.to_list(Stream.concat(changeset_stream))
else
Stream.run(changeset_stream)
[]
nil
end
notifications =
@ -1277,9 +1286,13 @@ defmodule Ash.Actions.Destroy.Bulk do
Keyword.put(opts, :return_destroyed?, opts[:return_records?])
) do
:ok ->
Process.put({:any_success?, ref}, true)
[]
{:ok, result} when not is_list(result) ->
Process.put({:any_success?, ref}, true)
[
Ash.Resource.set_metadata(result, %{
bulk_destroy_index: changeset.context.bulk_destroy.index
@ -1500,10 +1513,13 @@ defmodule Ash.Actions.Destroy.Bulk do
end
if max_concurrency && max_concurrency > 1 do
ash_context = Ash.ProcessHelpers.get_context_for_transfer(opts)
Task.async_stream(
stream,
fn batch ->
try do
Ash.ProcessHelpers.transfer_context(ash_context, opts)
Process.put(:ash_started_transaction?, true)
batch_result = callback.(batch)
{errors, _} = Process.get({:bulk_destroy_errors, ref}) || {[], 0}
@ -1525,7 +1541,7 @@ defmodule Ash.Actions.Destroy.Bulk do
end
end
{batch_result, notifications, errors}
{batch_result, notifications, errors, Process.get({:any_success?, ref})}
after
Process.delete(:ash_started_transaction?)
end
@ -1537,7 +1553,8 @@ defmodule Ash.Actions.Destroy.Bulk do
{:ok, {:throw, value}} ->
throw(value)
{:ok, {result, notifications, errors}} ->
{:ok, {result, notifications, errors, any_success?}} ->
Process.put({:any_success?, ref}, any_success?)
store_notification(ref, notifications, opts)
store_error(ref, errors, opts)
@ -1921,6 +1938,8 @@ defmodule Ash.Actions.Destroy.Bulk do
[]
{:ok, result} ->
Process.put({:any_success?, ref}, true)
result
{:error, error} ->

View file

@ -1089,15 +1089,24 @@ defmodule Ash.Actions.Update.Bulk do
strategy: [:atomic]
)
|> case do
%Ash.BulkResult{error_count: 0, records: records, notifications: notifications} ->
%Ash.BulkResult{
error_count: 0,
records: records,
notifications: notifications,
status: status
} ->
Process.put({:any_success?, ref}, status != :error)
store_notification(ref, notifications, opts)
List.wrap(records)
%Ash.BulkResult{
errors: errors,
notifications: notifications,
error_count: error_count
error_count: error_count,
status: status
} ->
Process.put({:any_success?, ref}, status != :error)
store_notification(ref, notifications, opts)
store_error(ref, errors, opts, error_count)
[]
@ -1188,7 +1197,7 @@ defmodule Ash.Actions.Update.Bulk do
Enum.to_list(Stream.concat(changeset_stream))
else
Stream.run(changeset_stream)
[]
nil
end
notifications =
@ -1490,6 +1499,8 @@ defmodule Ash.Actions.Update.Bulk do
Keyword.put(opts, :atomic_upgrade?, false)
) do
{:ok, result} ->
Process.put({:any_success?, ref}, true)
[
Ash.Resource.set_metadata(result, %{
metadata_key => changeset.context |> Map.get(context_key) |> Map.get(:index)
@ -1721,9 +1732,12 @@ defmodule Ash.Actions.Update.Bulk do
end
if max_concurrency && max_concurrency > 1 do
ash_context = Ash.ProcessHelpers.get_context_for_transfer(opts)
Task.async_stream(
stream,
fn batch ->
Ash.ProcessHelpers.transfer_context(ash_context, opts)
Process.put(:ash_started_transaction?, true)
batch_result = callback.(batch)
{errors, _} = Process.get({:bulk_update_errors, ref}) || {[], 0}
@ -1745,7 +1759,7 @@ defmodule Ash.Actions.Update.Bulk do
end
end
{batch_result, notifications, errors}
{batch_result, notifications, errors, Process.get({:any_success?, ref})}
end,
timeout: :infinity,
max_concurrency: max_concurrency
@ -1760,12 +1774,15 @@ defmodule Ash.Actions.Update.Bulk do
notifications: notifications,
errors: errors,
error_count: error_count
}, _, _}} ->
}, _, _, any_success?}} ->
Process.put({:any_success?, ref}, any_success?)
store_notification(ref, notifications, opts)
store_error(ref, errors, opts, error_count)
records
{:ok, {result, notifications, errors}} ->
{:ok, {result, notifications, errors, any_success?}} ->
Process.put({:any_success?, ref}, any_success?)
store_notification(ref, notifications, opts)
store_error(ref, errors, opts)
@ -2212,6 +2229,8 @@ defmodule Ash.Actions.Update.Bulk do
case result do
{:ok, result} ->
Process.put({:any_success?, ref}, true)
result
{:error, error} ->

View file

@ -510,6 +510,28 @@ defmodule Ash.Test.Actions.BulkCreateTest do
)
end
test "properly sets the status to `:partial_success` without `return_records?`" do
org =
Org
|> Ash.Changeset.for_create(:create, %{})
|> Ash.create!()
assert %Ash.BulkResult{
error_count: 1,
status: :partial_success,
errors: [%Ash.Error.Invalid{}]
} =
Ash.bulk_create(
[%{title: "title1"}, %{title: %{foo: :bar}}],
Post,
:create_with_change,
tenant: org.id,
return_errors?: true,
sorted?: true,
authorize?: false
)
end
test "can upsert with list" do
org =
Org