improvement: bulk update in batches if not given a query

This commit is contained in:
Zach Daniel 2024-01-31 10:26:08 -05:00
parent 9749b75e3b
commit 107e6c53f5

View file

@ -1,5 +1,8 @@
defmodule Ash.Actions.Update.Bulk do
@moduledoc false
require Ash.Query
@spec run(Ash.Api.t(), Enumerable.t() | Ash.Query.t(), atom(), input :: map, Keyword.t()) ::
Ash.BulkResult.t()
def run(api, resource, action, input, opts) when is_atom(resource) do
@ -166,6 +169,8 @@ defmodule Ash.Actions.Update.Bulk do
nil
end
opts = Keyword.put(opts, :resource, resource)
if !resource do
raise ArgumentError,
"Could not determine resource for bulk #{action.type}. Please provide the `resource` option if providing a stream of inputs."
@ -374,6 +379,104 @@ defmodule Ash.Actions.Update.Bulk do
def do_run(api, stream, action, input, opts, metadata_key, context_key) do
resource = opts[:resource]
fully_atomic_changeset =
cond do
Enum.empty?(Ash.Resource.Info.primary_key(resource)) ->
{:not_atomic, "cannot atomically update a stream without a primary key"}
!Ash.Resource.Info.primary_action(resource, :read) ->
{:not_atomic, "cannot atomically update a stream without a primary read action"}
Ash.DataLayer.data_layer_can?(resource, :update_query) ->
Ash.Changeset.fully_atomic_changeset(resource, action, input, opts)
true ->
{:not_atomic, "data layer does not support updating a query"}
end
case fully_atomic_changeset do
%Ash.Changeset{} = atomic_changeset ->
do_atomic_batches(
atomic_changeset,
api,
stream,
action,
input,
opts
)
{:not_atomic, _} ->
do_stream_batches(api, stream, action, input, opts, metadata_key, context_key)
end
end
defp do_atomic_batches(
atomic_changeset,
api,
stream,
action,
input,
opts
) do
batch_size = opts[:batch_size] || 100
resource = opts[:resource]
ref = make_ref()
pkey = Ash.Resource.Info.primary_key(resource)
stream
|> Stream.chunk_every(batch_size)
|> map_batches(
resource,
opts,
ref,
fn batch ->
pkeys = [or: Enum.map(batch, &Map.take(&1, pkey))]
resource
|> Ash.Query.for_read(Ash.Resource.Info.primary_action!(resource, :read).name,
actor: opts[:actor],
authorize?: false,
tenant: atomic_changeset.tenant,
tracer: opts[:tracer]
)
|> Ash.Query.filter(^pkeys)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.Query.select([])
|> then(fn query ->
run(api, query, action.name, input,
actor: opts[:actor],
authorize?: false,
tenant: atomic_changeset.tenant,
tracer: opts[:tracer],
atomic_changeset: atomic_changeset,
return_errors?: opts[:return_errors?],
return_notifications?: opts[:return_notifications?],
notify?: opts[:notify?],
return_records?: opts[:return_records?]
)
|> case do
%Ash.BulkResult{error_count: 0, records: records, notifications: notifications} ->
store_notification(ref, notifications, opts)
List.wrap(records)
%Ash.BulkResult{
errors: errors,
notifications: notifications,
error_count: error_count
} ->
store_notification(ref, notifications, opts)
store_error(ref, errors, opts, error_count)
{:error, Ash.Error.to_error_class(errors)}
end
end)
end
)
|> run_batches(ref, opts)
end
defp do_stream_batches(api, stream, action, input, opts, metadata_key, context_key) do
resource = opts[:resource]
opts = Ash.Actions.Helpers.set_opts(opts, api)
{_, opts} = Ash.Actions.Helpers.add_process_context(api, Ash.Changeset.new(resource), opts)
@ -403,48 +506,50 @@ defmodule Ash.Actions.Update.Bulk do
argument_names = Enum.map(action.arguments, & &1.name)
changeset_stream =
stream
|> Stream.with_index()
|> Stream.chunk_every(batch_size)
|> map_batches(
resource,
opts,
ref,
fn batch ->
try do
batch
|> Enum.map(
&setup_changeset(
&1,
action,
opts,
input,
argument_names,
api,
context_key
)
)
|> reject_and_maybe_store_errors(ref, opts)
|> handle_batch(
api,
resource,
stream
|> Stream.with_index()
|> Stream.chunk_every(batch_size)
|> map_batches(
resource,
opts,
ref,
fn batch ->
try do
batch
|> Enum.map(
&setup_changeset(
&1,
action,
all_changes,
opts,
ref,
context_key,
metadata_key,
base_changeset
input,
argument_names,
api,
context_key
)
after
if opts[:notify?] && !opts[:return_notifications?] do
Ash.Notifier.notify(Process.delete({:bulk_update_notifications, ref}))
end
)
|> reject_and_maybe_store_errors(ref, opts)
|> handle_batch(
api,
resource,
action,
all_changes,
opts,
ref,
context_key,
metadata_key,
base_changeset
)
after
if opts[:notify?] && !opts[:return_notifications?] do
Ash.Notifier.notify(Process.delete({:bulk_update_notifications, ref}))
end
end
)
end
)
|> run_batches(ref, opts)
end
defp run_batches(changeset_stream, ref, opts) do
if opts[:return_stream?] do
Stream.concat(changeset_stream)
else
@ -944,6 +1049,17 @@ defmodule Ash.Actions.Update.Bulk do
{:ok, {:throw, value}} ->
throw(value)
{:ok,
{%Ash.BulkResult{
records: records,
notifications: notifications,
errors: errors,
error_count: error_count
}, _, _}} ->
store_notification(ref, notifications, opts)
store_error(ref, errors, opts, error_count)
records
{:ok, {result, notifications, errors}} ->
store_notification(ref, notifications, opts)
store_error(ref, errors, opts)
@ -1063,9 +1179,18 @@ defmodule Ash.Actions.Update.Bulk do
end)
end
defp store_error(_ref, empty, _opts) when empty in [[], nil], do: :ok
defp store_error(ref, errors, opts, count \\ nil)
defp store_error(_ref, empty, _opts, 0) when empty in [[], nil], do: :ok
defp store_error(ref, empty, _opts, error_count) when empty in [[], nil] do
{errors, count} = Process.get({:bulk_update_errors, ref}) || {[], 0}
Process.put({:bulk_update_errors, ref}, {errors, count + error_count})
end
defp store_error(ref, error, opts, count) do
add = count || Enum.count(List.wrap(error))
defp store_error(ref, error, opts) do
if opts[:stop_on_error?] && !opts[:return_stream?] do
throw({:error, Ash.Error.to_error_class(error), 0, []})
else
@ -1083,11 +1208,11 @@ defmodule Ash.Actions.Update.Bulk do
Process.put(
{:bulk_update_errors, ref},
{[error | errors], count + 1}
{[error | errors], count + add}
)
else
{errors, count} = Process.get({:bulk_update_errors, ref}) || {[], 0}
Process.put({:bulk_update_errors, ref}, {errors, count + 1})
Process.put({:bulk_update_errors, ref}, {errors, count + add})
end
end
end