fix: assorted bulk create fixes

This commit is contained in:
Zach Daniel 2023-05-02 14:04:08 -04:00
parent cb99faf918
commit 0ced9c7639
10 changed files with 134 additions and 166 deletions

View file

@ -1,83 +0,0 @@
defmodule Ash.Actions.BulkHelpers do
# def halt_if(stream, condition, result) do
# Stream.transform(
# stream,
# fn -> :__halt_if_accumulator__ end,
# fn item, :__halt_if_accumulator__ ->
# if condition.(item) do
# {:halt, result.(item)}
# else
# {[item], :__halt_if_accumulator__}
# end
# end,
# fn
# :__halt_if_accumulator__ ->
# []
# result ->
# [result]
# end,
# fn _ -> :ok end
# )
# end
# def in_batches(changeset_stream, batch_size, stop_on_errors?, per_batch) do
# changeset_stream
# |> Stream.chunk_every(batch_size)
# |> Stream.with_index(1)
# |> Stream.map(fn {batch, batch_count} ->
# case per_batch.(batch, batch_count) do
# case per_batch.(batch, batch_count) do
# {:ok, }
# end
# end
# end)
# Enum.reduce_while(
# changeset_stream,
# {:ok, {0, 0, [], [], [], []}},
# fn
# changeset,
# {:ok, {^batch_size, batch_count, batch, processed_results, errors, notifications}} ->
# batch = [changeset | batch]
# case per_batch.(batch, batch_count) do
# {:ok, batch_result, new_errors} ->
# if new_errors != [] and stop_on_errors? do
# {:halt, {:ok, batch_result, new_errors, notifications}}
# else
# {:cont,
# {:ok,
# {0, batch_count + 1, [], batch_result ++ processed_results,
# errors ++ new_errors}}}
# end
# {:error, error} ->
# {:halt, {:error, error}}
# end
# changeset, {:ok, {i, batch_count, batch, processed_results, errors, notifications}} ->
# {:cont,
# {:ok,
# {i + 1, batch_count, [changeset | batch], processed_results, errors, notifications}}}
# end
# )
# |> case do
# {:ok, {_batch_size, _batch_count, [], processed_results, errors, notifications}} ->
# {:ok, processed_results, errors, notifications}
# {:ok, {_batch_size, batch_count, remaining_batch, processed_results, errors, notifications}} ->
# if errors != [] and stop_on_errors? do
# {:ok, processed_results, errors, notifications}
# else
# case per_batch.(remaining_batch, batch_count) do
# {:ok, batch_result, new_errors} ->
# {:ok, batch_result ++ processed_results, errors ++ new_errors, notifications}
# {:error, error} ->
# {:error, error}
# end
# end
# end
# end
end

View file

@ -1,14 +1,5 @@
defmodule Ash.Actions.Create.Bulk do
@moduledoc """
Bulk create
Outstanding issues:
transactions: In order support `before_transaction` hooks, we
have to do each batch in a transaction, not the entire operation. Because the input might be infinite.
If they want to run the whole thing in a transaction, the `before_transaction` hooks
will warn that we are currently in a transaction. Probably just need to message
this limitation.
"""
@moduledoc false
@spec run(Ash.Api.t(), Ash.Resource.t(), atom(), Enumerable.t(map), Keyword.t()) ::
:ok
| {:ok, [Ash.Resource.record()]}
@ -32,7 +23,7 @@ defmodule Ash.Actions.Create.Bulk do
end
Ash.DataLayer.transaction(
List.wrap(resource) ++ (action.touches_resources || []),
List.wrap(resource) ++ action.touches_resources,
fn ->
do_run(api, resource, action, inputs, opts)
end,
@ -85,7 +76,23 @@ defmodule Ash.Actions.Create.Bulk do
# TODO: add process context without a changeset
{_, opts} = Ash.Actions.Helpers.add_process_context(api, Ash.Changeset.new(resource), opts)
batch_size = opts[:batch_size] || 100
manual_action_can_bulk? =
case action.manual do
{mod, _opts} ->
function_exported?(mod, :bulk_create, 3)
_ ->
false
end
data_layer_can_bulk? = Ash.DataLayer.data_layer_can?(resource, :bulk_create)
batch_size =
if data_layer_can_bulk? || manual_action_can_bulk? do
opts[:batch_size] || 100
else
1
end
all_changes =
action.changes
@ -116,7 +123,7 @@ defmodule Ash.Actions.Create.Bulk do
end
end)
|> Stream.transform(
fn -> %{batch: [], count: 0, must_return_records?: false} end,
fn -> %{batch: [], count: 0, must_return_records?: opts[:notify?]} end,
fn
{:ok, item}, state when state.count < batch_size ->
must_return_records? = state.must_return_records? || !Enum.empty?(item.after_action)
@ -206,7 +213,9 @@ defmodule Ash.Actions.Create.Bulk do
opts,
count,
must_return_records?,
must_return_records_for_changes?
must_return_records_for_changes?,
data_layer_can_bulk?,
api
)
|> run_after_action_hooks(changesets_by_index)
|> process_results(changes, all_changes, opts)
@ -271,32 +280,13 @@ defmodule Ash.Actions.Create.Bulk do
opts,
count,
must_return_records?,
must_return_records_for_changes?
must_return_records_for_changes?,
data_layer_can_bulk?,
api
)
|> run_after_action_hooks(changesets_by_index)
|> process_results(changes, all_changes, opts)
end
# |> case do
# {:ok, bulk_result} ->
# bulk_result =
# if notify? do
# %{
# bulk_result
# | notifications:
# bulk_result.notifications ++ Process.delete(:ash_notifications) || []
# }
# else
# bulk_result
# end
# handle_bulk_result(bulk_result, resource, action, opts)
# {:error, error} ->
# {:error, error}
# end
# |> process_results(changes, all_changes, opts)
# end)
end)
if opts[:return_stream?] do
@ -313,7 +303,12 @@ defmodule Ash.Actions.Create.Bulk do
}
{:ok, batch_result, notifications}, result ->
notifications = Ash.Notifier.notify(notifications)
notifications =
if opts[:notify?] do
Ash.Notifier.notify(notifications)
else
[]
end
records = Enum.concat(Enum.to_list(batch_result), result.records)
@ -379,13 +374,17 @@ defmodule Ash.Actions.Create.Bulk do
if opts[:return_notifications?] do
result
else
result = %{result | notifications: Ash.Notifier.notify(notifications)}
if opts[:notify?] do
result = %{result | notifications: Ash.Notifier.notify(notifications)}
Ash.Actions.Helpers.warn_missed!(resource, action, %{
resource_notifications: result.notifications
})
Ash.Actions.Helpers.warn_missed!(resource, action, %{
resource_notifications: result.notifications
})
result
result
else
result
end
end
end
@ -396,7 +395,9 @@ defmodule Ash.Actions.Create.Bulk do
opts,
count,
must_return_records?,
must_return_records_for_changes?
must_return_records_for_changes?,
data_layer_can_bulk?,
api
) do
{batch, notifications} =
Enum.reduce(batch, {[], []}, fn changeset, {changesets, notifications} ->
@ -413,6 +414,7 @@ defmodule Ash.Actions.Create.Bulk do
actor: opts[:actor],
authorize?: opts[:authorize?],
tracer: opts[:tracer],
api: api,
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
@ -420,17 +422,53 @@ defmodule Ash.Actions.Create.Bulk do
tenant: opts[:tenant]
})
else
raise "Manual action doesn't support bulk operation. Must define `bulk_create/3` in #{inspect(mod)}"
[changeset] = batch
case mod.create(changeset, opts, %{
actor: opts[:actor],
tenant: opts[:tenant],
authorize?: opts[:authorize?],
tracer: opts[:tracer],
api: api
}) do
{:ok, result} ->
{:ok,
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)}
{:error, error} ->
{:error, error}
end
end
_ ->
Ash.DataLayer.bulk_create(resource, action, batch, %{
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
if data_layer_can_bulk? do
Ash.DataLayer.bulk_create(resource, batch, %{
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
else
[changeset] = batch
case Ash.DataLayer.create(resource, changeset) do
{:ok, result} ->
{:ok,
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)}
{:error, error} ->
{:error, error}
end
end
end
|> case do
{:ok, result} ->
@ -441,6 +479,8 @@ defmodule Ash.Actions.Create.Bulk do
end
end
defp run_after_action_hooks(:ok, _), do: :ok
defp run_after_action_hooks({:ok, batch_results, notifications}, changesets_by_index) do
batch_results
|> Enum.reduce_while(

View file

@ -371,14 +371,32 @@ defmodule Ash.Api do
doc:
"Wether or not to return all of the records that were inserted. Defaults to false to account for large inserts."
],
stop_on_errored_changesets?: [
stop_on_error?: [
type: :boolean,
default: true,
default: false,
doc: """
If false, any changesets with errors will be returned and all other changesets will be honored.
If true, the first encountered error will stop the action and be returned. Otherwise, errors
will be skipped.
"""
],
notify?: [
type: :boolean,
default: false,
doc: """
Wether or not to send notifications out. If this is set to `true` then the data layer must return
the results from each batch. This may be intensive for large bulk actions.
"""
],
transaction: [
type: {:one_of, [:all, :batch, false]},
default: :batch,
doc: """
Wether or not to wrap the entire execution in a transaction, each batch, or not at all.
The data layer may still fail to perform the operation in some way, which may still return an error.
See the specific data layer for more info on bulk action failure characteristics.
Keep in mind:
`before_transaction` and `after_transaction` hooks attached to changesets will have to be run
*inside* the transaction if you choose `transaction: :all`.
"""
]
]
@ -1811,12 +1829,9 @@ defmodule Ash.Api do
end
inputs ->
with :ok <- check_can_bulk_insert(resource),
{:ok, opts} <- Spark.OptionsHelpers.validate(opts, @bulk_create_opts_schema) do
Create.Bulk.run(api, resource, action, inputs, opts)
else
{:no_bulk, _resource, _actions} ->
raise "Cannot synthesize bulk actions yet!"
case Spark.OptionsHelpers.validate(opts, @bulk_create_opts_schema) do
{:ok, opts} ->
Create.Bulk.run(api, resource, action, inputs, opts)
{:error, error} ->
%Ash.BulkResult{status: :error, errors: [Ash.Error.to_ash_error(error)]}
@ -1824,14 +1839,6 @@ defmodule Ash.Api do
end
end
defp check_can_bulk_insert(resource) do
if Ash.DataLayer.data_layer_can?(resource, :bulk_create) do
:ok
else
:no_bulk
end
end
@doc false
def update!(api, changeset, opts) do
opts = Spark.OptionsHelpers.validate!(opts, @update_opts_schema)

View file

@ -1,4 +1,8 @@
defmodule Ash.BulkResult do
@moduledoc """
The return value for bulk actions.
"""
@type t :: %__MODULE__{
status: :success | :partial_success | :error,
notifications: list(Ash.Notifier.Notification.t()) | nil,

View file

@ -506,11 +506,17 @@ defmodule Ash.Changeset do
"""
end
action =
case action do
name when is_atom(name) -> Ash.Resource.Info.action(changeset.resource, action)
action -> action
end
changeset
|> set_context(%{
private: %{
upsert?: opts[:upsert?] || action.upsert? || false,
upsert_identity: opts[:upsert_identity] || action.upsert_identity
upsert?: opts[:upsert?] || (action && action.upsert?) || false,
upsert_identity: opts[:upsert_identity] || (action && action.upsert_identity)
}
})
|> do_for_action(action, params, opts)

View file

@ -137,7 +137,6 @@ defmodule Ash.DataLayer do
@callback bulk_create(
Ash.Resource.t(),
action :: atom,
Enumerable.t(Ash.Changeset.t()),
options :: bulk_options
) ::
@ -194,7 +193,7 @@ defmodule Ash.DataLayer do
@optional_callbacks source: 1,
run_query: 2,
bulk_create: 4,
bulk_create: 3,
distinct: 3,
lock: 3,
run_query_with_lateral_join: 4,
@ -329,15 +328,14 @@ defmodule Ash.DataLayer do
@spec bulk_create(
Ash.Resource.t(),
action :: atom,
Enumerable.t(Ash.Changeset.t()),
options :: bulk_options
) ::
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}
| {:error, Ash.Error.t()}
def bulk_create(resource, action, changesets, options) do
Ash.DataLayer.data_layer(resource).bulk_create(resource, action, changesets, options)
def bulk_create(resource, changesets, options) do
Ash.DataLayer.data_layer(resource).bulk_create(resource, changesets, options)
end
@spec destroy(Ash.Resource.t(), Ash.Changeset.t()) :: :ok | {:error, term}

View file

@ -681,7 +681,7 @@ defmodule Ash.DataLayer.Ets do
end
@impl true
def bulk_create(resource, _action, stream, options) do
def bulk_create(resource, stream, options) do
with {:ok, table} <- wrap_or_create_table(resource, options.tenant) do
Enum.reduce_while(stream, {:ok, []}, fn changeset, {:ok, results} ->
pkey =

View file

@ -74,7 +74,6 @@ defmodule Ash.Resource.Actions.Create do
],
upsert_identity: [
type: :atom,
default: false,
doc: """
The identity to use for the upsert. Cannot be overriden by the caller. Ignored if `upsert?` is not set to `true`.
"""

View file

@ -6,6 +6,7 @@ defmodule Ash.Resource.ManualCreate do
@type context :: %{
optional(:actor) => term,
optional(:tenant) => term,
optional(:tracer) => term,
optional(:authorize?) => boolean,
optional(:api) => module,
optional(any) => any

View file

@ -59,10 +59,6 @@ defmodule Ash.Test.Actions.BulkCreateTest do
end
end
test "returns an empty list when given an empty list" do
Api.bulk_create!([], Post, :create, return_records?: true)
end
test "returns created records" do
assert %Ash.BulkResult{records: [%{title: "title1"}, %{title: "title2"}]} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,