From 5af9dc66967bd230b5bfc2c99a94d47f48499732 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Thu, 25 May 2023 09:46:12 -0400 Subject: [PATCH] fix: properly honor the `return_errors?` option --- lib/ash/actions/create/bulk.ex | 91 +++++++++++++++++++------- lib/ash/api/api.ex | 6 ++ lib/ash/bulk_result.ex | 6 +- test/actions/bulk/bulk_create_test.exs | 31 ++++++++- 4 files changed, 106 insertions(+), 28 deletions(-) diff --git a/lib/ash/actions/create/bulk.ex b/lib/ash/actions/create/bulk.ex index 402c017f..7d850b85 100644 --- a/lib/ash/actions/create/bulk.ex +++ b/lib/ash/actions/create/bulk.ex @@ -150,11 +150,7 @@ defmodule Ash.Actions.Create.Bulk do {[batch(state)], %{state | batch: [item], count: 0, must_return_records?: false}} {:error, error}, state -> - if opts[:return_errors?] do - {[{:error, error}], state} - else - {[], state} - end + {[{:error, error}], state} end, fn state -> {[batch(state)], state} @@ -338,19 +334,17 @@ defmodule Ash.Actions.Create.Bulk do if opts[:stop_on_error?] do {:halt, {[], error}} else - {error - |> List.wrap() - |> Stream.map(&{:error, &1}), nil} + {error_stream(error, opts), nil} end {:error, notifications, error} -> if opts[:stop_on_error?] do {:halt, {notify_stream([], notifications, resource, action, opts), error}} else - {error - |> List.wrap() - |> Stream.map(&{:error, &1}) - |> notify_stream(notifications, resource, action, opts), nil} + { + notify_stream(error_stream(error, opts), notifications, resource, action, opts), + nil + } end {:ok, invalid, notifications} -> @@ -358,26 +352,25 @@ defmodule Ash.Actions.Create.Bulk do {:halt, {[], invalid - |> Stream.map(&{:error, &1}) + |> error_stream(opts) |> notify_stream(notifications, resource, action, opts)}} else {invalid - |> Stream.map(&{:error, &1}) + |> error_stream(opts) |> notify_stream(notifications, resource, action, opts), nil} end {:ok, batch_result, invalid, notifications} -> if opts[:stop_on_error?] && !Enum.empty?(invalid) do - {:halt, {invalid, batch_result}} + {:halt, {error_stream(invalid, opts), batch_result}} else if opts[:return_records?] do {batch_result |> Stream.map(&{:ok, &1}) - |> Stream.concat(Stream.map(invalid, &{:error, &1})) + |> Stream.concat(error_stream(invalid, opts)) |> notify_stream(notifications, resource, action, opts), nil} else - {invalid - |> Stream.map(&{:error, &1}) + {error_stream(invalid, opts) |> notify_stream(notifications, resource, action, opts), nil} end end @@ -391,7 +384,7 @@ defmodule Ash.Actions.Create.Bulk do remaining_items = successes |> Stream.map(&{:ok, &1}) - |> Stream.concat(Stream.map(errors, &{:error, &1})) + |> Stream.concat(error_stream(errors, opts)) {remaining_items, nil} @@ -404,16 +397,22 @@ defmodule Ash.Actions.Create.Bulk do result |> Enum.reduce(%Ash.BulkResult{status: :empty, records: [], errors: []}, fn {:error, error}, result -> + {error_count, errors} = errors(result, error, opts) + %{ result - | errors: [error | result.errors], + | errors: errors, + error_count: error_count, status: errored(result.status) } {:error, notifications, error}, result -> + {error_count, errors} = errors(result, error, opts) + %{ result - | errors: [error | result.errors], + | errors: errors, + error_count: error_count, status: errored(result.status), notifications: (result.notifications || []) ++ notifications } @@ -426,9 +425,12 @@ defmodule Ash.Actions.Create.Bulk do } {:ok, invalid, notifications}, result -> + {error_count, errors} = errors(result, invalid, opts) + %{ result - | errors: invalid ++ result.errors, + | errors: errors, + error_count: error_count, status: :partial_success, notifications: (result.notifications || []) ++ notifications } @@ -447,10 +449,13 @@ defmodule Ash.Actions.Create.Bulk do result.records end + {error_count, errors} = errors(result, invalid, opts) + %{ result | records: records, - errors: invalid ++ result.errors, + errors: errors, + error_count: error_count, status: status, notifications: (result.notifications || []) ++ notifications } @@ -475,11 +480,47 @@ defmodule Ash.Actions.Create.Bulk do :error end - %Ash.BulkResult{ + result = %Ash.BulkResult{ status: status, - errors: List.wrap(error), notifications: notifications } + + {errors, error_count} = errors(result, error, opts) + + %{result | errors: errors, error_count: error_count} + end + + defp errors(result, invalid, opts) when is_list(invalid) do + Enum.reduce(invalid, {result.error_count, result.errors}, fn invalid, {error_count, errors} -> + errors(%{result | error_count: error_count, errors: errors}, invalid, opts) + end) + end + + defp errors(result, {:error, error}, opts) do + if opts[:return_errors?] do + {result.error_count + 1, [error | result.errors]} + else + {result.error_count + 1, []} + end + end + + defp errors(result, invalid, opts) do + if Enumerable.impl_for(invalid) do + invalid = Enum.to_list(invalid) + errors(result, invalid, opts) + else + errors(result, {:error, invalid}, opts) + end + end + + defp error_stream(error, opts) do + if opts[:return_errors?] do + error + |> List.wrap() + |> Stream.map(&{:error, &1}) + else + [] + end end defp run_bulk_before_batches( diff --git a/lib/ash/api/api.ex b/lib/ash/api/api.ex index 4c4e7105..0fdc2045 100644 --- a/lib/ash/api/api.ex +++ b/lib/ash/api/api.ex @@ -392,6 +392,12 @@ defmodule Ash.Api do doc: "Wether or not to return all of the records that were inserted. Defaults to false to account for large inserts." ], + return_errors?: [ + type: :boolean, + default: false, + doc: + "Wether or not to return all of the errors that occur. Defaults to false to account for large inserts." + ], batch_size: [ type: :pos_integer, doc: """ diff --git a/lib/ash/bulk_result.ex b/lib/ash/bulk_result.ex index 762979a3..9e67e4c6 100644 --- a/lib/ash/bulk_result.ex +++ b/lib/ash/bulk_result.ex @@ -7,13 +7,15 @@ defmodule Ash.BulkResult do status: :success | :partial_success | :error, notifications: list(Ash.Notifier.Notification.t()) | nil, records: list(Ash.Resource.record()) | nil, - errors: list(Ash.Error.t() | Ash.Changeset.t()) | nil + errors: list(Ash.Error.t() | Ash.Changeset.t()) | nil, + error_count: non_neg_integer() } defstruct [ :status, :errors, :records, - :notifications + :notifications, + error_count: 0 ] end diff --git a/test/actions/bulk/bulk_create_test.exs b/test/actions/bulk/bulk_create_test.exs index 509148e6..9a53fcef 100644 --- a/test/actions/bulk/bulk_create_test.exs +++ b/test/actions/bulk/bulk_create_test.exs @@ -97,6 +97,33 @@ defmodule Ash.Test.Actions.BulkCreateTest do ) end + test "will return error count" do + assert %Ash.BulkResult{records: [%{title: "title1_stuff"}], error_count: 1, errors: []} = + Api.bulk_create!( + [%{title: "title1"}, %{title: %{foo: :bar}}], + Post, + :create_with_change, + return_records?: true, + sorted?: true + ) + end + + test "will return errors on request" do + assert %Ash.BulkResult{ + records: [%{title: "title1_stuff"}], + error_count: 1, + errors: [%Ash.Changeset{}] + } = + Api.bulk_create!( + [%{title: "title1"}, %{title: %{foo: :bar}}], + Post, + :create_with_change, + return_records?: true, + return_errors?: true, + sorted?: true + ) + end + test "can upsert" do assert %Ash.BulkResult{ records: [ @@ -179,6 +206,7 @@ defmodule Ash.Test.Actions.BulkCreateTest do :create_with_policy, authorize?: true, return_records?: true, + return_errors?: true, sorted?: true ) end @@ -278,7 +306,8 @@ defmodule Ash.Test.Actions.BulkCreateTest do notify?: true, return_stream?: true, return_notifications?: true, - return_records?: true + return_records?: true, + return_errors?: true ) |> Enum.to_list() |> Enum.sort_by(fn