improvement: bulk create actions

This is not currently supported by any data layer other than ETS
This commit is contained in:
Zach Daniel 2023-05-02 13:17:52 -04:00
parent 2843224f06
commit 80f696417d
14 changed files with 1400 additions and 174 deletions

View file

@ -8,11 +8,6 @@ The philosophy behind Ash allows us to build an extremely flexible and powerful
To this end, there are many prebuilt extensions to use, but there is also a rich suite of tools to build your _own_ extensions. In this way, you can make the framework work for you, instead of struggling to fit your application to a strictly prescribed pattern. Use as much of Ash as you can, and leverage the amazing Elixir ecosystem for everything else.
<!---
> long sentences
> last sentence sounds a bit either / or mutually exclusive, but we do both!
-->
## Declarative, Introspectable, Derivable
The real superpower behind Ash is the declarative design pattern. All behavior is driven by explicit, static declarations. A resource, for example, is really just a configuration file. On its own it does nothing. It is provided to code that reads that configuration and acts accordingly.

View file

@ -0,0 +1,83 @@
defmodule Ash.Actions.BulkHelpers do
# def halt_if(stream, condition, result) do
# Stream.transform(
# stream,
# fn -> :__halt_if_accumulator__ end,
# fn item, :__halt_if_accumulator__ ->
# if condition.(item) do
# {:halt, result.(item)}
# else
# {[item], :__halt_if_accumulator__}
# end
# end,
# fn
# :__halt_if_accumulator__ ->
# []
# result ->
# [result]
# end,
# fn _ -> :ok end
# )
# end
# def in_batches(changeset_stream, batch_size, stop_on_errors?, per_batch) do
# changeset_stream
# |> Stream.chunk_every(batch_size)
# |> Stream.with_index(1)
# |> Stream.map(fn {batch, batch_count} ->
# case per_batch.(batch, batch_count) do
# case per_batch.(batch, batch_count) do
# {:ok, }
# end
# end
# end)
# Enum.reduce_while(
# changeset_stream,
# {:ok, {0, 0, [], [], [], []}},
# fn
# changeset,
# {:ok, {^batch_size, batch_count, batch, processed_results, errors, notifications}} ->
# batch = [changeset | batch]
# case per_batch.(batch, batch_count) do
# {:ok, batch_result, new_errors} ->
# if new_errors != [] and stop_on_errors? do
# {:halt, {:ok, batch_result, new_errors, notifications}}
# else
# {:cont,
# {:ok,
# {0, batch_count + 1, [], batch_result ++ processed_results,
# errors ++ new_errors}}}
# end
# {:error, error} ->
# {:halt, {:error, error}}
# end
# changeset, {:ok, {i, batch_count, batch, processed_results, errors, notifications}} ->
# {:cont,
# {:ok,
# {i + 1, batch_count, [changeset | batch], processed_results, errors, notifications}}}
# end
# )
# |> case do
# {:ok, {_batch_size, _batch_count, [], processed_results, errors, notifications}} ->
# {:ok, processed_results, errors, notifications}
# {:ok, {_batch_size, batch_count, remaining_batch, processed_results, errors, notifications}} ->
# if errors != [] and stop_on_errors? do
# {:ok, processed_results, errors, notifications}
# else
# case per_batch.(remaining_batch, batch_count) do
# {:ok, batch_result, new_errors} ->
# {:ok, batch_result ++ processed_results, errors ++ new_errors, notifications}
# {:error, error} ->
# {:error, error}
# end
# end
# end
# end
end

View file

@ -0,0 +1,707 @@
defmodule Ash.Actions.Create.Bulk do
@moduledoc """
Bulk create
Outstanding issues:
transactions: In order support `before_transaction` hooks, we
have to do each batch in a transaction, not the entire operation. Because the input might be infinite.
If they want to run the whole thing in a transaction, the `before_transaction` hooks
will warn that we are currently in a transaction. Probably just need to message
this limitation.
"""
@spec run(Ash.Api.t(), Ash.Resource.t(), atom(), Enumerable.t(map), Keyword.t()) ::
:ok
| {:ok, [Ash.Resource.record()]}
| {:ok, [Ash.Resource.record()], [Ash.Notifier.Notification.t()]}
| {:error, term}
def run(api, resource, action, inputs, opts) do
action = Ash.Resource.Info.action(resource, action)
if opts[:transaction] == :all &&
Ash.DataLayer.data_layer_can?(resource, :transact) do
notify? =
if opts[:notify?] do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
else
false
end
Ash.DataLayer.transaction(
List.wrap(resource) ++ (action.touches_resources || []),
fn ->
do_run(api, resource, action, inputs, opts)
end,
opts[:timeout],
%{
type: :bulk_create,
metadata: %{
resource: resource,
action: action.name,
actor: opts[:actor]
}
}
)
|> case do
{:ok, bulk_result} ->
bulk_result =
if notify? do
%{
bulk_result
| notifications:
bulk_result.notifications ++ Process.delete(:ash_notifications) || []
}
else
bulk_result
end
handle_bulk_result(bulk_result, resource, action, opts)
{:error, error} ->
{:error, error}
end
else
api
|> do_run(resource, action, inputs, opts)
|> handle_bulk_result(resource, action, opts)
end
end
def do_run(api, resource, action, inputs, opts) do
opts = Ash.Actions.Helpers.set_opts(opts, api)
if action.manual? do
raise "Old style manual actions cannot be used with bulk creates"
end
if opts[:upsert?] || action.upsert? do
raise "Cannot upsert bulk actions currently"
end
# TODO: add process context without a changeset
{_, opts} = Ash.Actions.Helpers.add_process_context(api, Ash.Changeset.new(resource), opts)
batch_size = opts[:batch_size] || 100
all_changes =
action.changes
|> Enum.concat(Ash.Resource.Info.changes(resource, action.type))
|> Enum.with_index()
result =
inputs
|> Stream.with_index()
|> Stream.map(fn {input, index} ->
resource
|> Ash.Changeset.new()
|> Map.put(:api, api)
|> Ash.Actions.Helpers.add_context(opts)
|> Ash.Changeset.set_context(%{bulk_create: %{index: index}})
|> Ash.Changeset.prepare_changeset_for_action(action, opts, input)
|> Ash.Changeset.run_before_transaction_hooks()
end)
|> Stream.map(fn changeset ->
if changeset.valid? do
{:ok, changeset}
else
if opts[:stop_on_error?] do
throw({:error, Ash.Error.to_error_class(changeset.error), 0, []})
else
{:error, changeset}
end
end
end)
|> Stream.transform(
fn -> %{batch: [], count: 0, must_return_records?: false} end,
fn
{:ok, item}, state when state.count < batch_size ->
must_return_records? = state.must_return_records? || !Enum.empty?(item.after_action)
{[],
%{
state
| batch: [item | state.batch],
count: state.count + 1,
must_return_records?: must_return_records?
}}
{:ok, item}, state ->
{[batch(state)], %{state | batch: [item], count: 0, must_return_records?: false}}
{:error, error}, state ->
if opts[:return_errors?] do
{[{:error, error}], state}
else
{[], state}
end
end,
fn state ->
{[batch(state)], state}
end,
fn _ -> :ok end
)
|> Stream.map(fn
{:error, error} ->
{:error, error}
{:batch, batch_config} ->
%{count: count, batch: batch, must_return_records?: must_return_records?} = batch_config
batch =
Stream.map(batch, fn changeset ->
Ash.Changeset.set_defaults(changeset, :create, true)
end)
if opts[:transaction] == :batch &&
Ash.DataLayer.data_layer_can?(resource, :transact) do
notify? =
if opts[:notify?] do
if Process.get(:ash_started_transaction?) do
false
else
Process.put(:ash_started_transaction?, true)
true
end
else
false
end
try do
Ash.DataLayer.transaction(
List.wrap(resource) ++ (action.touches_resources || []),
fn ->
%{
must_return_records?: must_return_records_for_changes?,
batch: batch,
changes: changes
} =
batch
|> run_action_changes(
all_changes,
action,
opts[:actor],
opts[:authorize?],
opts[:tracer]
)
{batch, changesets_by_index} =
Enum.reduce(batch, {[], %{}}, fn changeset,
{changesets, changesets_by_index} ->
{[changeset | changesets],
Map.put(
changesets_by_index,
changeset.context.bulk_create.index,
changeset
)}
end)
run_batch(
resource,
batch,
action,
opts,
count,
must_return_records?,
must_return_records_for_changes?
)
|> run_after_action_hooks(changesets_by_index)
|> process_results(changes, all_changes, opts)
end,
opts[:timeout],
%{
type: :bulk_create,
metadata: %{
resource: resource,
action: action.name,
actor: opts[:actor]
}
}
)
|> case do
{:ok, result} ->
result
{:error, error} ->
{:error, error}
end
after
if notify? do
notifications = Process.get(:ash_notifications, [])
remaining_notifications = Ash.Notifier.notify(notifications)
Process.delete(:ash_notifications) || []
Ash.Actions.Helpers.warn_missed!(resource, action, %{
resource_notifications: remaining_notifications
})
end
end
else
%{
must_return_records?: must_return_records_for_changes?,
batch: batch,
changes: changes
} =
batch
|> run_action_changes(
all_changes,
action,
opts[:actor],
opts[:authorize?],
opts[:tracer]
)
{batch, changesets_by_index} =
Enum.reduce(batch, {[], %{}}, fn changeset, {changesets, changesets_by_index} ->
{[changeset | changesets],
Map.put(
changesets_by_index,
changeset.context.bulk_create.index,
changeset
)}
end)
run_batch(
resource,
batch,
action,
opts,
count,
must_return_records?,
must_return_records_for_changes?
)
|> run_after_action_hooks(changesets_by_index)
|> process_results(changes, all_changes, opts)
end
# |> case do
# {:ok, bulk_result} ->
# bulk_result =
# if notify? do
# %{
# bulk_result
# | notifications:
# bulk_result.notifications ++ Process.delete(:ash_notifications) || []
# }
# else
# bulk_result
# end
# handle_bulk_result(bulk_result, resource, action, opts)
# {:error, error} ->
# {:error, error}
# end
# |> process_results(changes, all_changes, opts)
# end)
end)
if opts[:return_stream?] do
raise "can't do this yet"
else
result
|> Enum.reduce(%Ash.BulkResult{status: :success, records: [], errors: []}, fn
{:error, notifications, error}, result ->
%{
result
| errors: [error | result.errors],
status: :partial_success,
notifications: notifications
}
{:ok, batch_result, notifications}, result ->
notifications = Ash.Notifier.notify(notifications)
records = Enum.concat(Enum.to_list(batch_result), result.records)
if opts[:return_notifications] do
%{result | records: records, notifications: notifications}
else
notifications =
if Process.get(:ash_started_transaction?) do
current_notifications = Process.get(:ash_notifications, [])
Process.put(
:ash_notifications,
current_notifications ++ notifications
)
[]
else
notifications
end
%{result | records: records, notifications: notifications}
end
:ok, result ->
result
end)
end
catch
{:error, error, batch_number, notifications} ->
status =
if batch_number > 1 do
:partial_success
else
:error
end
%Ash.BulkResult{
status: status,
errors: List.wrap(error),
notifications: notifications
}
end
defp handle_bulk_result(bulk_result, resource, action, opts) do
bulk_result
|> notify(resource, action, opts)
|> sort(opts)
end
defp sort(%{records: records} = result, opts) when is_list(records) do
if opts[:sorted?] do
%{result | records: Enum.sort_by(records, & &1.__metadata__.bulk_create_index)}
else
result
end
end
defp sort(result, _), do: result
defp notify(%{notifications: []} = result, _resource, _action, _opts), do: result
defp notify(%{notifications: notifications} = result, resource, action, opts) do
if opts[:return_notifications?] do
result
else
result = %{result | notifications: Ash.Notifier.notify(notifications)}
Ash.Actions.Helpers.warn_missed!(resource, action, %{
resource_notifications: result.notifications
})
result
end
end
defp run_batch(
resource,
batch,
action,
opts,
count,
must_return_records?,
must_return_records_for_changes?
) do
{batch, notifications} =
Enum.reduce(batch, {[], []}, fn changeset, {changesets, notifications} ->
{changeset, %{notifications: new_notifications}} =
Ash.Changeset.run_before_actions(changeset)
{[changeset | changesets], notifications ++ new_notifications}
end)
case action.manual do
{mod, opts} ->
if function_exported?(mod, :bulk_create, 3) do
mod.bulk_create(batch, opts, %{
actor: opts[:actor],
authorize?: opts[:authorize?],
tracer: opts[:tracer],
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
else
raise "Manual action doesn't support bulk operation. Must define `bulk_create/3` in #{inspect(mod)}"
end
_ ->
Ash.DataLayer.bulk_create(resource, action, batch, %{
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
end
|> case do
{:ok, result} ->
{:ok, result, notifications}
other ->
other
end
end
defp run_after_action_hooks({:ok, batch_results, notifications}, changesets_by_index) do
batch_results
|> Enum.reduce_while(
{:ok, [], notifications, changesets_by_index},
fn result, {:ok, records, notifications, changesets_by_index} ->
changeset = changesets_by_index[result.__metadata__.bulk_create_index]
case Ash.Changeset.run_after_actions(result, changeset, []) do
{:error, error} ->
{:halt, {:error, error}}
{:ok, result, changeset, %{notifications: new_notifications}} ->
{:cont,
{:ok, [result | records], notifications ++ new_notifications,
Map.put(changesets_by_index, result.__metadata__.bulk_create_index, changeset)}}
end
end
)
|> case do
{:ok, results, notifications, changesets_by_index} ->
{:ok, results, notifications, changesets_by_index}
other ->
other
end
end
defp run_after_action_hooks({:error, error}, _) do
{:error, error}
end
defp process_results(:ok, _, _, _), do: :ok
defp process_results(
{:ok, batch, notifications, changesets_by_index},
changes,
all_changes,
opts
) do
Enum.reduce(
batch,
{[], notifications, changesets_by_index, []},
fn result, {results, notifications, changesets_by_index, errors} ->
changeset = changesets_by_index[result.__metadata__.bulk_create_index]
notifications = notifications ++ [notification(changeset, result, opts)]
try do
case Ash.Changeset.run_after_transactions({:ok, result}, changeset) do
{:ok, result} ->
{[result | results], notifications, changesets_by_index, errors}
{:error, error} ->
{results, notifications, changesets_by_index, [error | errors]}
end
rescue
e ->
{results, notifications, changesets_by_index, [e | errors]}
end
end
)
|> case do
{results, notifications, changesets_by_index, []} ->
case run_bulk_after_changes(changes, all_changes, results, changesets_by_index, opts) do
{results, new_notifications, []} ->
{:ok, results, new_notifications ++ notifications}
{_results, _new_notifications, errors} ->
{:error, notifications, Ash.Error.to_ash_error(errors)}
end
{_result, notifications, _, errors} ->
{:error, notifications, Ash.Error.to_ash_error(errors)}
end
end
defp process_results({:error, error}, _, _, _), do: {:error, error}
defp run_bulk_after_changes(changes, all_changes, results, changesets_by_index, opts) do
results =
Stream.map(results, fn result ->
{:ok, result}
end)
all_changes
|> Enum.with_index()
|> Enum.filter(fn
{%{change: {module, _opts}}, _} ->
function_exported?(module, :after_batch, 3)
_ ->
false
end)
|> Enum.reduce(results, fn {%{change: {module, change_opts}}, index}, results ->
{matches, non_matches} =
results
|> Enum.split_with(fn
{:ok, result} ->
changes[index] == :all or
result.__metadata__.bulk_create_index in List.wrap(changes[index])
_ ->
false
end)
matches =
Enum.map(matches, fn match ->
{changesets_by_index[match.__metadata__.bulk_create_index], match}
end)
after_batch_results =
module.after_batch(matches, change_opts, %{
actor: opts[:actor],
tracer: opts[:tracer],
authorize?: opts[:authorize?]
})
Enum.concat([after_batch_results, non_matches])
end)
|> Enum.reduce(
{[], [], []},
fn
%Ash.Notifier.Notification{} = notification, {results, notifications, errors} ->
{results, [notification | notifications], errors}
{:ok, result}, {results, notifications, errors} ->
{[result | results], notifications, errors}
{:error, error}, {results, notifications, errors} ->
{results, notifications, [error | errors]}
end
)
end
defp notification(changeset, result, opts) do
%Ash.Notifier.Notification{
resource: changeset.resource,
api: changeset.api,
actor: opts[:actor],
action: changeset.action,
data: result,
changeset: changeset
}
end
defp run_action_changes(batch, all_changes, _action, actor, authorize?, tracer) do
# TODO: support action.delay_global_validations?
Enum.reduce(
all_changes,
%{must_return_records?: false, batch: batch, changes: %{}, notifications: []},
fn
{%{validation: {module, opts}} = validation, _change_index}, state ->
batch =
Stream.map(batch, fn changeset ->
if Enum.all?(validation.where || [], fn {module, opts} ->
opts =
Ash.Filter.build_filter_from_template(
opts,
actor,
changeset.arguments,
changeset.context
)
module.validate(changeset, opts) == :ok
end) do
module.validate(changeset, opts)
else
changeset
end
end)
%{
must_return_records?: state.must_return_records?,
batch: batch,
changes: state.changes
}
{%{change: {module, opts}} = change, change_index}, state ->
if Enum.empty?(change.where) && !change.only_when_valid? do
context = %{
actor: actor,
authorize?: authorize? || false,
tracer: tracer
}
batch = module.batch_change(batch, opts, context)
must_return_records? =
state.must_return_records? || function_exported?(module, :after_batch, 3)
%{
must_return_records?: must_return_records?,
batch: Enum.to_list(batch),
changes: Map.put(state.changes, change_index, :all)
}
else
{matches, non_matches} =
batch
|> Enum.split_with(fn changeset ->
applies_from_where? =
Enum.all?(change.where || [], fn {module, opts} ->
opts =
Ash.Filter.build_filter_from_template(
opts,
actor,
changeset.arguments,
changeset.context
)
module.validate(changeset, opts) == :ok
end)
applies_from_only_when_valid? =
if change.only_when_valid? do
changeset.valid?
else
true
end
applies_from_where? and applies_from_only_when_valid?
end)
if Enum.empty?(matches) do
%{
must_return_records?: state.must_return_records?,
batch: non_matches,
changes: state.changes
}
else
context = %{
actor: actor,
authorize?: authorize? || false,
tracer: tracer
}
matches = module.batch_change(matches, opts, context)
must_return_records? =
state.must_return_records? || function_exported?(module, :after_batch, 3)
%{
must_return_records?: must_return_records?,
batch: Enum.concat(matches, non_matches),
changes:
Map.put(
state.changes,
change_index,
Enum.map(matches, & &1.context.bulk_create.index)
)
}
end
end
end
)
end
defp batch(state) do
{:batch,
%{
count: state.count,
batch: state.batch,
must_return_records?: state.must_return_records?
}}
end
end

View file

@ -67,19 +67,22 @@ defmodule Ash.Actions.Helpers do
opts
end
opts =
opts
|> add_actor(query_or_changeset, api)
|> add_authorize?(api)
|> add_tenant()
|> add_tracer()
opts = set_opts(opts, api, query_or_changeset)
query_or_changeset = add_context(query_or_changeset, opts)
{query_or_changeset, opts}
end
defp add_context(query_or_changeset, opts) do
def set_opts(opts, api, query_or_changeset \\ nil) do
opts
|> add_actor(query_or_changeset, api)
|> add_authorize?(api)
|> add_tenant()
|> add_tracer()
end
def add_context(query_or_changeset, opts) do
context = Process.get(:ash_context, %{}) || %{}
private_context = Map.new(Keyword.take(opts, [:actor, :authorize?]))

View file

@ -358,6 +358,39 @@ defmodule Ash.Api do
"Shared create/update/destroy Options"
)
@bulk_create_opts_schema [
sorted?: [
type: :boolean,
default: false,
doc:
"Wether or not to sort results by their input position, in cases where `return_records?: true` was provided."
],
return_records?: [
type: :boolean,
default: false,
doc:
"Wether or not to return all of the records that were inserted. Defaults to false to account for large inserts."
],
stop_on_errored_changesets?: [
type: :boolean,
default: true,
doc: """
If false, any changesets with errors will be returned and all other changesets will be honored.
The data layer may still fail to perform the operation in some way, which may still return an error.
See the specific data layer for more info on bulk action failure characteristics.
"""
]
]
|> merge_schemas(
Keyword.delete(@global_opts, :action),
"Global Options"
)
|> merge_schemas(
@shared_created_update_and_destroy_opts_schema,
"Shared create/update/destroy Options"
)
@doc false
def create_opts_schema, do: @create_opts_schema
@ -1120,6 +1153,73 @@ defmodule Ash.Api do
| {:ok, Ash.Resource.record(), list(Ash.Notifier.Notification.t())}
| {:error, term}
@doc """
Creates many records.
## Assumptions
We assume that the input is a list of changesets all for the same action, or a list of input maps for the
same action with the `:resource` and `:action` option provided to illustrate which action it is for.
## Performance/Feasibility
The performance of this operation depends on the data layer in question.
Data layers like AshPostgres will choose reasonable batch sizes in an attempt
to handle large bulk actions, but that does not mean that you can pass a list of
500k inputs and expect things to go off without a hitch (although it might).
If you need to do large data processing, you should look into projects like
GenStage and Broadway. With that said, if you want to do things like support CSV upload
and you place some reasonable limits on the size this is a great tool. You'll need to
test it yourself, YMMV.
Passing `return_records?: true` can significantly increase the time it takes to perform the operation,
and can also make the operation completely unreasonable due to the memory requirement. If you want to
do very large bulk creates and display all of the results, the suggestion is to annotate them with a
"bulk_create_id" in the data layer, and then read the records with that `bulk_create_id` so that they can
be retrieved later if necessary.
## Changes/Validations
Changes will be applied in the order they are given on the actions as normal. Any change that exposes
the `bulk_change` or `bulk_validate` callback will be applied on the entire list.
## After Action Hooks
The following requirements must be met for `after_action` hooks to function properly. If they are not met,
and an after_action hook being applied to a changeset in a `change`.
1. `return_records?` must be set to `true`.
2. The changeset must be setting the primary key as part of its changes, so that we know which result applies to which
changeset.
It is possible to use `after_action` hooks with `bulk_change/3`, but you need to return the hooks along with the changesets.
This allows for setting up `after_action` hooks that don't need access to the returned record,
or `after_action` hooks that can operate on the entire list at once. See the documentation for that callback for more on
how to do accomplish that.
#{Spark.OptionsHelpers.docs(@bulk_create_opts_schema)}
"""
@callback bulk_create(
[map],
resource :: Ash.Resource.t(),
action :: atom,
params :: Keyword.t()
) ::
Ash.BulkResult.t()
@doc """
Creates many records, raising on any errors. See `bulk_create/2` for more.
#{Spark.OptionsHelpers.docs(@bulk_create_opts_schema)}
"""
@callback bulk_create!(
[map],
resource :: Ash.Resource.t(),
action :: atom,
params :: Keyword.t()
) ::
Ash.BulkResult.t() | no_return()
@doc """
Update a record. See `c:update/2` for more information.
"""
@ -1675,6 +1775,63 @@ defmodule Ash.Api do
end
end
@doc false
@spec bulk_create!(Ash.Api.t(), [map], Ash.Resource.t(), atom, Keyword.t()) ::
Ash.BulkResult.t() | no_return
def bulk_create!(api, inputs, resource, action, opts) do
api
|> bulk_create(inputs, resource, action, opts)
|> case do
%Ash.BulkResult{status: :error, errors: errors} ->
raise Ash.Error.to_error_class(errors)
bulk_result ->
bulk_result
end
end
@doc false
@spec bulk_create(Ash.Api.t(), [map], Ash.Resource.t(), atom, Keyword.t()) :: Ash.BulkResult.t()
def bulk_create(api, inputs, resource, action, opts) do
case inputs do
[] ->
result = %Ash.BulkResult{status: :success, errors: []}
result =
if opts[:return_records?] do
%{result | records: []}
else
result
end
if opts[:return_notifications?] do
%{result | notifications: []}
else
result
end
inputs ->
with :ok <- check_can_bulk_insert(resource),
{:ok, opts} <- Spark.OptionsHelpers.validate(opts, @bulk_create_opts_schema) do
Create.Bulk.run(api, resource, action, inputs, opts)
else
{:no_bulk, _resource, _actions} ->
raise "Cannot synthesize bulk actions yet!"
{:error, error} ->
%Ash.BulkResult{status: :error, errors: [Ash.Error.to_ash_error(error)]}
end
end
end
defp check_can_bulk_insert(resource) do
if Ash.DataLayer.data_layer_can?(resource, :bulk_create) do
:ok
else
:no_bulk
end
end
@doc false
def update!(api, changeset, opts) do
opts = Spark.OptionsHelpers.validate!(opts, @update_opts_schema)

View file

@ -295,6 +295,20 @@ defmodule Ash.Api.Interface do
end
end
def bulk_create!(inputs, resource, action, opts \\ []) do
Api.bulk_create!(__MODULE__, inputs, resource, action, opts)
end
def bulk_create(inputs, resource, action, opts \\ []) do
case Api.bulk_create(__MODULE__, inputs, resource, action, opts) do
{:error, error} ->
{:error, Ash.Error.to_error_class(error)}
other ->
other
end
end
def create!(changeset, params \\ []) do
Api.create!(__MODULE__, changeset, params)
end

15
lib/ash/bulk_result.ex Normal file
View file

@ -0,0 +1,15 @@
defmodule Ash.BulkResult do
@type t :: %__MODULE__{
status: :success | :partial_success | :error,
notifications: list(Ash.Notifier.Notification.t()) | nil,
records: list(Ash.Resource.record()) | nil,
errors: list(term) | nil
}
defstruct [
:status,
:errors,
:records,
:notifications
]
end

View file

@ -126,27 +126,40 @@ defmodule Ash.Changeset do
end
end
@type after_action_fun ::
(t, Ash.Resource.record() ->
{:ok, Ash.Resource.record()}
| {:ok, Ash.Resource.record(), [Ash.Notifier.Notification.t()]}
| {:error, any})
# TODO: make these structs i.e %Changeset.AfterActionHook{}
@type after_transaction_fun ::
(t, {:ok, Ash.Resource.record()} | {:error, any} ->
{:ok, Ash.Resource.record()} | {:error, any})
@type before_action_fun :: (t -> t | {t, %{notifications: [Ash.Notifier.Notification.t()]}})
@type before_transaction_fun :: (t -> t)
@type around_result ::
{:ok, Ash.Resource.record(), t(), %{notifications: list(Ash.Notifier.Notification.t())}}
| {:error, Ash.Error.t()}
@type around_callback :: (t() -> around_result)
@type around_action_fun :: (t, around_callback -> around_result)
@type t :: %__MODULE__{
__validated_for_action__: atom | nil,
action: Ash.Resource.Actions.action() | nil,
action_failed?: boolean,
action_type: Ash.Resource.Actions.action_type() | nil,
after_action: [
(t, Ash.Resource.record() ->
{:ok, Ash.Resource.record()}
| {:ok, Ash.Resource.record(), [Ash.Notifier.Notification.t()]}
| {:error, any})
],
after_transaction: [
(t, {:ok, Ash.Resource.record()} | {:error, any} ->
{:ok, Ash.Resource.record()} | {:error, any})
],
after_action: [after_action_fun | {after_action_fun, map}],
after_transaction: [after_transaction_fun | {after_transaction_fun, map}],
api: module | nil,
arguments: %{optional(atom) => any},
around_action: [(t, around_callback -> around_result)],
around_action: [around_action_fun | {around_action_fun, map}],
attributes: %{optional(atom) => any},
before_action: [(t -> t | {t, %{notifications: [Ash.Notifier.Notification.t()]}})],
before_transaction: [(t -> t)],
before_action: [before_action_fun | {around_action_fun, map}],
before_transaction: [before_transaction_fun | {before_transaction_fun, map}],
context: map,
data: Ash.Resource.record() | nil,
defaults: [atom],
@ -414,7 +427,7 @@ defmodule Ash.Changeset do
type: :boolean,
default: false,
doc:
"If set to `true`, values are only required when the action is run (instead of immediately)."
"If set to `false`, values are only required when the action is run (instead of immediately)."
],
actor: [
type: :any,
@ -734,22 +747,7 @@ defmodule Ash.Changeset do
changeset =
changeset
|> Map.put(:action, action)
|> reset_arguments()
|> handle_errors(action.error_handler)
|> set_actor(opts)
|> set_authorize(opts)
|> set_tracer(opts)
|> timeout(changeset.timeout || opts[:timeout])
|> set_tenant(
opts[:tenant] || changeset.tenant || changeset.data.__metadata__[:tenant]
)
|> cast_params(action, params || %{})
|> set_argument_defaults(action)
|> require_arguments(action)
|> validate_attributes_accepted(action)
|> require_values(action.type, false, action.require_attributes)
|> set_defaults(changeset.action_type, false)
|> prepare_changeset_for_action(action, opts, params)
|> run_action_changes(
action,
opts[:actor],
@ -777,6 +775,24 @@ defmodule Ash.Changeset do
end
end
def prepare_changeset_for_action(changeset, action, opts, params) do
changeset
|> Map.put(:action, action)
|> reset_arguments()
|> handle_errors(action.error_handler)
|> set_actor(opts)
|> set_authorize(opts)
|> set_tracer(opts)
|> timeout(changeset.timeout || opts[:timeout])
|> set_tenant(opts[:tenant] || changeset.tenant || changeset.data.__metadata__[:tenant])
|> cast_params(action, params || %{})
|> set_argument_defaults(action)
|> require_arguments(action)
|> validate_attributes_accepted(action)
|> require_values(action.type, false, action.require_attributes)
|> set_defaults(changeset.action_type, false)
end
defp get_action_entity(resource, name) when is_atom(name),
do: Ash.Resource.Info.action(resource, name)
@ -1057,7 +1073,7 @@ defmodule Ash.Changeset do
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
validation: inspect(module)
} do
Ash.Tracer.set_metadata(opts[:tracer], :validation, metadata)
Ash.Tracer.set_metadata(tracer, :validation, metadata)
opts =
Ash.Filter.build_filter_from_template(
@ -1078,7 +1094,7 @@ defmodule Ash.Changeset do
} do
{:ok, opts} = module.init(opts)
Ash.Tracer.set_metadata(opts[:tracer], :change, metadata)
Ash.Tracer.set_metadata(tracer, :change, metadata)
opts =
Ash.Filter.build_filter_from_template(
@ -1724,50 +1740,7 @@ defmodule Ash.Changeset do
defp transaction_hooks(changeset, func) do
warn_on_transaction_hooks(changeset, changeset.before_transaction, "before_transaction")
changeset =
Enum.reduce_while(
changeset.before_transaction,
set_phase(changeset, :before_transaction),
fn before_transaction, changeset ->
metadata = %{
api: changeset.api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: changeset.context[:private][:actor],
tenant: changeset.context[:private][:actor],
action: changeset.action && changeset.action.name,
authorize?: changeset.context[:private][:authorize?]
}
tracer = changeset.context[:private][:tracer]
result =
Ash.Tracer.span :before_transaction,
"before_transaction",
tracer do
Ash.Tracer.set_metadata(tracer, :before_transaction, metadata)
Ash.Tracer.telemetry_span [:ash, :before_transaction], metadata do
before_transaction.(changeset)
end
end
case result do
{:error, error} ->
{:halt, {:error, error}}
changeset ->
cont =
if changeset.valid? do
:cont
else
:halt
end
{cont, changeset}
end
end
)
changeset = run_before_transaction_hooks(changeset)
result =
try do
@ -1833,7 +1806,116 @@ defmodule Ash.Changeset do
end
end
defp run_after_transactions(result, changeset) do
def run_before_transaction_hooks(changeset) do
Enum.reduce_while(
changeset.before_transaction,
set_phase(changeset, :before_transaction),
fn before_transaction, changeset ->
metadata = %{
api: changeset.api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: changeset.context[:private][:actor],
tenant: changeset.context[:private][:tenant],
action: changeset.action && changeset.action.name,
authorize?: changeset.context[:private][:authorize?]
}
tracer = changeset.context[:private][:tracer]
result =
Ash.Tracer.span :before_transaction,
"before_transaction",
tracer do
Ash.Tracer.set_metadata(tracer, :before_transaction, metadata)
Ash.Tracer.telemetry_span [:ash, :before_transaction], metadata do
before_transaction.(changeset)
end
end
case result do
{:error, error} ->
{:halt, {:error, error}}
changeset ->
cont =
if changeset.valid? do
:cont
else
:halt
end
{cont, changeset}
end
end
)
end
@doc false
def run_before_actions(changeset) do
Enum.reduce_while(
changeset.before_action,
{changeset, %{notifications: []}},
fn before_action, {changeset, instructions} ->
metadata = %{
api: changeset.api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: changeset.context[:private][:actor],
tenant: changeset.context[:private][:actor],
action: changeset.action && changeset.action.name,
authorize?: changeset.context[:private][:authorize?]
}
tracer = changeset.context[:private][:tracer]
result =
Ash.Tracer.span :before_action,
"before_action",
tracer do
Ash.Tracer.set_metadata(tracer, :before_action, metadata)
Ash.Tracer.telemetry_span [:ash, :before_action], metadata do
before_action.(changeset)
end
end
case result do
{:error, error} ->
{:halt, {:error, error}}
{changeset, %{notifications: notifications}} ->
cont =
if changeset.valid? do
:cont
else
:halt
end
{cont,
{changeset,
%{
instructions
| notifications: instructions.notifications ++ List.wrap(notifications)
}}}
changeset ->
cont =
if changeset.valid? do
:cont
else
:halt
end
{cont, {changeset, instructions}}
end
end
)
end
@doc false
def run_after_transactions(result, changeset) do
warn_on_transaction_hooks(changeset, changeset.before_transaction, "after_transaction")
changeset = set_phase(changeset, :after_transaction)
@ -1880,65 +1962,7 @@ defmodule Ash.Changeset do
|> put_context(:private, %{in_before_action?: true})
|> set_phase(:before_action)
result =
Enum.reduce_while(
changeset.before_action,
{changeset, %{notifications: []}},
fn before_action, {changeset, instructions} ->
metadata = %{
api: changeset.api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: changeset.context[:private][:actor],
tenant: changeset.context[:private][:actor],
action: changeset.action && changeset.action.name,
authorize?: changeset.context[:private][:authorize?]
}
tracer = changeset.context[:private][:tracer]
result =
Ash.Tracer.span :before_action,
"before_action",
tracer do
Ash.Tracer.set_metadata(tracer, :before_action, metadata)
Ash.Tracer.telemetry_span [:ash, :before_action], metadata do
before_action.(changeset)
end
end
case result do
{:error, error} ->
{:halt, {:error, error}}
{changeset, %{notifications: notifications}} ->
cont =
if changeset.valid? do
:cont
else
:halt
end
{cont,
{changeset,
%{
instructions
| notifications: instructions.notifications ++ List.wrap(notifications)
}}}
changeset ->
cont =
if changeset.valid? do
:cont
else
:halt
end
{cont, {changeset, instructions}}
end
end
)
result = run_before_actions(changeset)
case result do
{:error, error} ->
@ -1976,7 +2000,8 @@ defmodule Ash.Changeset do
end)
end
defp run_after_actions(result, changeset, before_action_notifications) do
@doc false
def run_after_actions(result, changeset, before_action_notifications) do
changeset = set_phase(changeset, :after_action)
Enum.reduce_while(
@ -3405,7 +3430,7 @@ defmodule Ash.Changeset do
"""
@spec before_action(
t(),
(t() -> t() | {t(), %{notifications: list(Ash.Notifier.Notification.t())}}),
before_action_fun(),
Keyword.t()
) ::
t()
@ -3425,7 +3450,7 @@ defmodule Ash.Changeset do
"""
@spec before_transaction(
t(),
(t() -> t()),
before_transaction_fun(),
Keyword.t()
) :: t()
def before_transaction(changeset, func, opts \\ []) do
@ -3444,10 +3469,7 @@ defmodule Ash.Changeset do
"""
@spec after_action(
t(),
(t(), Ash.Resource.record() ->
{:ok, Ash.Resource.record()}
| {:ok, Ash.Resource.record(), list(Ash.Notifier.Notification.t())}
| {:error, term}),
after_action_fun(),
Keyword.t()
) :: t()
def after_action(changeset, func, opts \\ []) do
@ -3469,9 +3491,7 @@ defmodule Ash.Changeset do
"""
@spec after_transaction(
t(),
(t(), {:ok, Ash.Resource.record()} | {:error, Ash.Changeset.t()} ->
{:ok, Ash.Resource.record()}
| {:error, Ash.Changeset.t()}),
after_transaction_fun(),
Keyword.t()
) :: t()
def after_transaction(changeset, func, opts \\ []) do
@ -3545,12 +3565,8 @@ defmodule Ash.Changeset do
You can almost always get the same effect by using `before_action`, setting some context on the changeset
and reading it out in an `after_action` hook.
"""
@type around_result ::
{:ok, Ash.Resource.record(), t(), %{notifications: list(Ash.Notifier.Notification.t())}}
| {:error, Ash.Error.t()}
@type around_callback :: (t() -> around_result)
@spec around_action(t(), (t(), around_callback() -> around_result)) :: t()
@spec around_action(t(), around_action_fun()) :: t()
def around_action(changeset, func) do
%{changeset | around_action: changeset.around_action ++ [func]}
end

View file

@ -127,13 +127,28 @@ defmodule Ash.DataLayer do
source_resource :: Ash.Resource.t(),
list(lateral_join_link())
) ::
{:ok, list(Ash.Resource.t())} | {:error, term}
{:ok, list(Ash.Resource.record())} | {:error, term}
@type bulk_options :: %{
batch_size: pos_integer,
return_records?: boolean,
tenant: String.t() | nil
}
@callback bulk_create(
Ash.Resource.t(),
action :: atom,
Enumerable.t(Ash.Changeset.t()),
options :: bulk_options
) ::
{:ok, Enumerable.t(:ok | {:ok, Ash.Resource.record()} | {:error, Ash.Error.t()})}
| {:error, Ash.Error.t()}
@callback create(Ash.Resource.t(), Ash.Changeset.t()) ::
{:ok, Ash.Resource.t()} | {:error, term}
{:ok, Ash.Resource.record()} | {:error, term}
@callback upsert(Ash.Resource.t(), Ash.Changeset.t(), list(atom)) ::
{:ok, Ash.Resource.t()} | {:error, term}
{:ok, Ash.Resource.record()} | {:error, term}
@callback update(Ash.Resource.t(), Ash.Changeset.t()) ::
{:ok, Ash.Resource.t()} | {:error, term}
{:ok, Ash.Resource.record()} | {:error, term}
@callback add_aggregate(
data_layer_query(),
Ash.Query.Aggregate.t(),
@ -179,6 +194,7 @@ defmodule Ash.DataLayer do
@optional_callbacks source: 1,
run_query: 2,
bulk_create: 4,
distinct: 3,
lock: 3,
run_query_with_lateral_join: 4,
@ -311,6 +327,19 @@ defmodule Ash.DataLayer do
Ash.DataLayer.data_layer(resource).create(resource, changeset)
end
@spec bulk_create(
Ash.Resource.t(),
action :: atom,
Enumerable.t(Ash.Changeset.t()),
options :: bulk_options
) ::
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}
| {:error, Ash.Error.t()}
def bulk_create(resource, action, changesets, options) do
Ash.DataLayer.data_layer(resource).bulk_create(resource, action, changesets, options)
end
@spec destroy(Ash.Resource.t(), Ash.Changeset.t()) :: :ok | {:error, term}
def destroy(resource, changeset) do
Ash.DataLayer.data_layer(resource).destroy(resource, changeset)

View file

@ -190,6 +190,7 @@ defmodule Ash.DataLayer.Ets do
not private?(resource)
end
def can?(_, :bulk_create), do: true
def can?(_, :composite_primary_key), do: true
def can?(_, :expression_calculation), do: true
def can?(_, :expression_calculation_sort), do: true
@ -679,6 +680,44 @@ defmodule Ash.DataLayer.Ets do
end
end
@impl true
def bulk_create(resource, _action, stream, options) do
with {:ok, table} <- wrap_or_create_table(resource, options.tenant) do
Enum.reduce_while(stream, {:ok, []}, fn changeset, {:ok, results} ->
pkey =
resource
|> Ash.Resource.Info.primary_key()
|> Enum.into(%{}, fn attr ->
{attr, Ash.Changeset.get_attribute(changeset, attr)}
end)
with {:ok, record} <- Ash.Changeset.apply_attributes(changeset),
record <- unload_relationships(resource, record) do
{:cont, {:ok, [{pkey, changeset.context.bulk_create.index, record} | results]}}
else
{:error, error} ->
{:halt, {:error, error}}
end
end)
|> case do
{:ok, records} ->
case put_or_insert_new_batch(table, records, resource, options.return_records?) do
:ok ->
:ok
{:ok, records} ->
{:ok, Stream.map(records, &set_loaded/1)}
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
end
end
@doc false
@impl true
def create(resource, changeset) do
@ -694,12 +733,16 @@ defmodule Ash.DataLayer.Ets do
record <- unload_relationships(resource, record),
{:ok, record} <-
put_or_insert_new(table, {pkey, record}, resource) do
{:ok, %{record | __meta__: %Ecto.Schema.Metadata{state: :loaded, schema: resource}}}
{:ok, set_loaded(record)}
else
{:error, error} -> {:error, Ash.Error.to_ash_error(error)}
end
end
defp set_loaded(%resource{} = record) do
%{record | __meta__: %Ecto.Schema.Metadata{state: :loaded, schema: resource}}
end
defp put_or_insert_new(table, {pkey, record}, resource) do
attributes = resource |> Ash.Resource.Info.attributes()
@ -719,6 +762,48 @@ defmodule Ash.DataLayer.Ets do
end
end
defp put_or_insert_new_batch(table, records, resource, return_records?) do
attributes = resource |> Ash.Resource.Info.attributes()
Enum.reduce_while(records, {:ok, [], []}, fn {pkey, index, record}, {:ok, acc, indices} ->
case dump_to_native(record, attributes) do
{:ok, casted} ->
{:cont, {:ok, [{pkey, casted} | acc], [{pkey, index} | indices]}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
|> case do
{:ok, batch, indices} ->
case ETS.Set.put(table, batch) do
{:ok, set} ->
if return_records? do
Enum.reduce_while(indices, {:ok, []}, fn {pkey, index}, {:ok, acc} ->
{_key, record} = ETS.Set.get!(set, pkey)
case cast_record(record, resource) do
{:ok, casted} ->
{:cont,
{:ok, [Ash.Resource.put_metadata(casted, :bulk_create_index, index) | acc]}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
else
:ok
end
other ->
other
end
other ->
other
end
end
@doc false
def dump_to_native(record, attributes) do
Enum.reduce_while(attributes, {:ok, %{}}, fn attribute, {:ok, attrs} ->

View file

@ -101,8 +101,24 @@ defmodule Ash.Resource.Change do
optional(:actor) => Ash.Resource.record(),
optional(any) => any
}
@callback init(Keyword.t()) :: {:ok, Keyword.t()} | {:error, term}
@callback change(Ash.Changeset.t(), Keyword.t(), context) :: Ash.Changeset.t()
@callback batch_change([Ash.Changeset.t()], Keyword.t(), context) ::
Enumerable.t(Ash.Changeset.t() | Ash.Notifier.Notification.t())
@callback after_batch(
[{Ash.Changeset.t(), Ash.Resource.record()}],
Keyword.t(),
context
) ::
Enumerable.t(
{:ok, Ash.Resource.record()}
| {:error, Ash.Error.t()}
| Ash.Notifier.Notification.t()
)
@optional_callbacks after_batch: 3
defmacro __using__(_) do
quote do
@ -110,7 +126,13 @@ defmodule Ash.Resource.Change do
def init(opts), do: {:ok, opts}
defoverridable init: 1
def batch_change(changesets, opts, context) do
Stream.map(changesets, fn changeset ->
change(changeset, opts, context)
end)
end
defoverridable init: 1, batch_change: 3
end
end
end

View file

@ -0,0 +1,103 @@
defmodule Ash.Test.Actions.BulkCreateTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
actions do
defaults [:create, :read, :update, :destroy]
create :create_with_change do
change fn changeset, _ ->
title = Ash.Changeset.get_attribute(changeset, :title)
Ash.Changeset.force_change_attribute(changeset, :title, title <> "_stuff")
end
end
create :create_with_after_action do
change after_action(fn _changeset, result ->
{:ok, %{result | title: result.title <> "_stuff"}}
end)
end
create :create_with_after_transaction do
change after_transaction(fn _changeset, {:ok, result} ->
{:ok, %{result | title: result.title <> "_stuff"}}
end)
end
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
timestamps()
end
end
defmodule Registry do
@moduledoc false
use Ash.Registry
entries do
entry Post
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
registry Registry
end
end
test "returns an empty list when given an empty list" do
Api.bulk_create!([], Post, :create, return_records?: true)
end
test "returns created records" do
assert %Ash.BulkResult{records: [%{title: "title1"}, %{title: "title2"}]} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_records?: true,
sorted?: true
)
end
test "runs changes" do
assert %Ash.BulkResult{records: [%{title: "title1_stuff"}, %{title: "title2_stuff"}]} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create_with_change,
return_records?: true,
sorted?: true
)
end
test "runs after action hooks" do
assert %Ash.BulkResult{records: [%{title: "title1_stuff"}, %{title: "title2_stuff"}]} =
Api.bulk_create!(
[%{title: "title1"}, %{title: "title2"}],
Post,
:create_with_after_action,
return_records?: true,
sorted?: true
)
end
test "runs after transaction hooks" do
assert %Ash.BulkResult{records: [%{title: "title1_stuff"}, %{title: "title2_stuff"}]} =
Api.bulk_create!(
[%{title: "title1"}, %{title: "title2"}],
Post,
:create_with_after_transaction,
return_records?: true,
sorted?: true
)
end
end

View file

@ -49,11 +49,8 @@ defmodule Ash.Test.Actions.BulkCreateTest do
test "records can be streamed" do
1..10
|> Enum.each(fn i ->
Post
|> Ash.Changeset.for_create(:create, %{title: "title#{i}"})
|> Api.create!()
end)
|> Stream.map(&%{title: "title#{&1}"})
|> Api.bulk_create!(Post, :create)
count =
Post