feat(Ash.Reactor): Add bulk_create step type.

This commit is contained in:
James Harton 2024-05-13 13:53:35 +12:00 committed by James Harton
parent 720f08f2eb
commit 1d5d247d7a
9 changed files with 914 additions and 11 deletions

View file

@ -226,6 +226,201 @@ Target: `Ash.Reactor.Dsl.Action`
## reactor.bulk_create
```elixir
bulk_create name, resource, action \\ nil
```
Declares a step which will call a create action on a resource with a collection of inputs.
> ### Check the docs! {: .warning}
>
> Make sure to thoroughly read and understand the documentation in `Ash.bulk_create/4` before using. Read each option and note the default values. By default, bulk creates don't return records or errors, and don't emit notifications.
Caveats/differences from `Ash.bulk_create/4`:
1. `max_concurrency` specifies the number of tasks that Ash will start to process batches, and has no effect on Reactor concurrency targets. It's could be possible to create a very large number of processes if a number of steps are running bulk actions with a high degree of concurrency.
2. Setting `notify?` to `true` will cause both `notify?` and `return_notifications?` to be set to true in the underlying call to `Ash.bulk_create/4`. Notifications will then be managed by the `Ash.Reactor.Notifications` Reactor middleware.
3. If you specify an undo action it must be a generic action which takes the bulk result as it's only argument.
> #### Undo behaviour {: .tip}
>
> This step has three different modes of undo.
>
> * `never` - The result of the action is never undone. This is the default.
> * `always` - The `undo_action` will always be called.
> * `outside_transaction` - The `undo_action` will not be called when running inside a `transaction` block, but will be otherwise.
### Nested DSLs
* [actor](#reactor-bulk_create-actor)
* [tenant](#reactor-bulk_create-tenant)
* [wait_for](#reactor-bulk_create-wait_for)
### Examples
```
create :create_posts, MyApp.Post, :create do
initial inputs(:titles)
actor(result(:get_user))
tenant(result(:get_organisation, [:id]))
end
```
### Arguments
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-bulk_create-name){: #reactor-bulk_create-name .spark-required} | `atom` | | A unique name for the step. |
| [`resource`](#reactor-bulk_create-resource){: #reactor-bulk_create-resource .spark-required} | `module` | | The resource to call the action on. |
| [`action`](#reactor-bulk_create-action){: #reactor-bulk_create-action } | `atom` | | The name of the action to call on the resource. |
### Options
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`initial`](#reactor-bulk_create-initial){: #reactor-bulk_create-initial .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | A collection of inputs to pass to the create action. Must implement the `Enumerable` protocol. |
| [`assume_casted?`](#reactor-bulk_create-assume_casted?){: #reactor-bulk_create-assume_casted? } | `boolean` | `false` | Whether or not to cast attributes and arguments as input. This is an optimization for cases where the input is already casted and/or not in need of casting |
| [`authorize_changeset_with`](#reactor-bulk_create-authorize_changeset_with){: #reactor-bulk_create-authorize_changeset_with } | `:filter \| :error` | `:filter` | If set to `:error`, instead of filtering unauthorized changes, unauthorized changes will raise an appropriate forbidden error |
| [`authorize_query_with`](#reactor-bulk_create-authorize_query_with){: #reactor-bulk_create-authorize_query_with } | `:filter \| :error` | `:filter` | If set to `:error`, instead of filtering unauthorized query results, unauthorized query results will raise an appropriate forbidden error |
| [`batch_size`](#reactor-bulk_create-batch_size){: #reactor-bulk_create-batch_size } | `nil \| pos_integer` | | The number of records to include in each batch. Defaults to the `default_limit` or `max_page_size` of the action, or 100. |
| [`load`](#reactor-bulk_create-load){: #reactor-bulk_create-load } | `atom \| list(atom)` | `[]` | A load statement to apply to records. Ignored if `return_records?` is not true. |
| [`max_concurrency`](#reactor-bulk_create-max_concurrency){: #reactor-bulk_create-max_concurrency } | `non_neg_integer` | `0` | If set to a value greater than 0, up to that many tasks will be started to run batches asynchronously. |
| [`notification_metadata`](#reactor-bulk_create-notification_metadata){: #reactor-bulk_create-notification_metadata } | `map \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | `%{}` | Metadata to be merged into the metadata field for all notifications sent from this operation. |
| [`notify?`](#reactor-bulk_create-notify?){: #reactor-bulk_create-notify? } | `boolean` | `false` | Whether or not to generate any notifications. This may be intensive for large bulk actions. |
| [`read_action`](#reactor-bulk_create-read_action){: #reactor-bulk_create-read_action } | `atom` | | The action to use when building the read query. |
| [`return_errors?`](#reactor-bulk_create-return_errors?){: #reactor-bulk_create-return_errors? } | `boolean` | `false` | Whether or not to return all of the errors that occur. Defaults to false to account for large inserts. |
| [`return_records?`](#reactor-bulk_create-return_records?){: #reactor-bulk_create-return_records? } | `boolean` | `false` | Whether or not to return all of the records that were inserted. Defaults to false to account for large inserts. |
| [`return_stream?`](#reactor-bulk_create-return_stream?){: #reactor-bulk_create-return_stream? } | `boolean` | `false` | If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned. |
| [`rollback_on_error?`](#reactor-bulk_create-rollback_on_error?){: #reactor-bulk_create-rollback_on_error? } | `boolean` | `true` | Whether or not to rollback the transaction on error, if the resource is in a transaction. |
| [`select`](#reactor-bulk_create-select){: #reactor-bulk_create-select } | `atom \| list(atom)` | | A select statement to apply to records. Ignored if `return_records?` is not `true`. |
| [`skip_unknown_inputs`](#reactor-bulk_create-skip_unknown_inputs){: #reactor-bulk_create-skip_unknown_inputs } | `atom \| list(atom)` | | A list of inputs that, if provided, will be ignored if they are not recognized by the action. |
| [`sorted?`](#reactor-bulk_create-sorted?){: #reactor-bulk_create-sorted? } | `boolean` | `false` | Whether or not to sort results by their input position, in cases where `return_records?` is set to `true`. |
| [`stop_on_error?`](#reactor-bulk_create-stop_on_error?){: #reactor-bulk_create-stop_on_error? } | `boolean` | `false` | If `true`, the first encountered error will stop the action and be returned. Otherwise, errors will be skipped. |
| [`success_state`](#reactor-bulk_create-success_state){: #reactor-bulk_create-success_state } | `:success \| :partial_success` | `:success` | Bulk results can be entirely or partially successful. Chooses the `Ash.BulkResult` state to consider the step a success. |
| [`timeout`](#reactor-bulk_create-timeout){: #reactor-bulk_create-timeout } | `timeout` | | If none is provided, the timeout configured on the domain is used (which defaults to `30_000`). |
| [`transaction`](#reactor-bulk_create-transaction){: #reactor-bulk_create-transaction } | `:all \| :batch \| false` | `:batch` | Whether or not to wrap the entire execution in a transaction, each batch, or not at all. |
| [`upsert_fields`](#reactor-bulk_create-upsert_fields){: #reactor-bulk_create-upsert_fields } | `atom \| list(atom)` | | The fields to upsert. If not set, the action's `upsert_fields` is used. |
| [`upsert_identity`](#reactor-bulk_create-upsert_identity){: #reactor-bulk_create-upsert_identity } | `atom` | | The identity to use for the upsert |
| [`upsert?`](#reactor-bulk_create-upsert?){: #reactor-bulk_create-upsert? } | `boolean` | `false` | Whether or not this action should be executed as an upsert. |
| [`domain`](#reactor-bulk_create-domain){: #reactor-bulk_create-domain } | `module` | | The Domain to use when calling the action. Defaults to the Domain set on the resource or in the `ash` section. |
| [`async?`](#reactor-bulk_create-async?){: #reactor-bulk_create-async? } | `boolean` | `true` | When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`. |
| [`authorize?`](#reactor-bulk_create-authorize?){: #reactor-bulk_create-authorize? } | `boolean \| nil` | | Explicitly enable or disable authorization for the action. |
| [`description`](#reactor-bulk_create-description){: #reactor-bulk_create-description } | `String.t` | | A description for the step |
| [`undo_action`](#reactor-bulk_create-undo_action){: #reactor-bulk_create-undo_action } | `atom` | | The name of the action to call on the resource when the step is to be undone. |
| [`undo`](#reactor-bulk_create-undo){: #reactor-bulk_create-undo } | `:always \| :never \| :outside_transaction` | `:never` | How to handle undoing this action |
## reactor.bulk_create.actor
```elixir
actor source
```
Specifies the action actor
### Arguments
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`source`](#reactor-bulk_create-actor-source){: #reactor-bulk_create-actor-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the actor. |
### Options
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`transform`](#reactor-bulk_create-actor-transform){: #reactor-bulk_create-actor-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the actor before it is passed to the action. |
### Introspection
Target: `Ash.Reactor.Dsl.Actor`
## reactor.bulk_create.tenant
```elixir
tenant source
```
Specifies the action tenant
### Arguments
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`source`](#reactor-bulk_create-tenant-source){: #reactor-bulk_create-tenant-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the tenant. |
### Options
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`transform`](#reactor-bulk_create-tenant-transform){: #reactor-bulk_create-tenant-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the tenant before it is passed to the action. |
### Introspection
Target: `Ash.Reactor.Dsl.Tenant`
## reactor.bulk_create.wait_for
```elixir
wait_for names
```
Wait for the named step to complete before allowing this one to start.
Desugars to `argument :_, result(step_to_wait_for)`
### Examples
```
wait_for :create_user
```
### Arguments
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`names`](#reactor-bulk_create-wait_for-names){: #reactor-bulk_create-wait_for-names .spark-required} | `atom \| list(atom)` | | The name of the step to wait for. |
### Introspection
Target: `Reactor.Dsl.WaitFor`
### Introspection
Target: `Ash.Reactor.Dsl.BulkCreate`
## reactor.change ## reactor.change
```elixir ```elixir
change name, change change name, change

View file

@ -0,0 +1,246 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.BulkCreate do
@moduledoc false
alias Ash.Reactor.BulkCreateStep
alias Ash.Resource.Info
alias Reactor.{Argument, Builder}
alias Spark.{Dsl.Transformer, Error.DslError}
import Ash.Reactor.BuilderUtils
import Reactor.Template, only: :macros
@doc false
@impl true
def build(bulk_create, reactor) do
with {:ok, reactor} <- ensure_hooked(reactor) do
initial = %Argument{name: :initial, source: bulk_create.initial}
notification_metadata =
case bulk_create.notification_metadata do
template when is_template(template) ->
%Argument{name: :notification_metadata, source: template}
map when is_map(map) ->
Argument.from_value(:notification_metadata, map)
end
arguments =
[initial, notification_metadata]
|> maybe_append(bulk_create.actor)
|> maybe_append(bulk_create.tenant)
|> Enum.concat(bulk_create.wait_for)
action_options =
bulk_create
|> Map.take([
:action,
:assume_casted?,
:authorize_changeset_with,
:authorize_query_with,
:authorize?,
:batch_size,
:domain,
:max_concurrency,
:notify?,
:read_action,
:resource,
:return_errors?,
:return_records?,
:return_stream?,
:rollback_on_error?,
:select,
:skip_unknown_inputs,
:sorted?,
:stop_on_error?,
:success_state,
:timeout,
:transaction,
:upsert_fields,
:upsert_fields,
:upsert_identity,
:upsert_identity,
:upsert?,
:undo_action,
:undo
])
|> Map.put(:return_notifications?, bulk_create.notify?)
|> Enum.reject(&is_nil(elem(&1, 1)))
step_options =
bulk_create
|> Map.take([:async?])
|> Map.put(:ref, :step_name)
|> Enum.to_list()
Builder.add_step(
reactor,
bulk_create.name,
{BulkCreateStep, action_options},
arguments,
step_options
)
end
end
@doc false
@impl true
def transform(_bulk_create, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(bulk_create, dsl_state) do
action_error_path = [:bulk_create, bulk_create.name, :action]
with {:ok, action} <-
get_action(dsl_state, bulk_create.resource, bulk_create.action, action_error_path),
:ok <-
verify_action_type(dsl_state, bulk_create.resource, action, :create, action_error_path),
:ok <- verify_undo(dsl_state, bulk_create),
:ok <- maybe_verify_undo_action(dsl_state, bulk_create),
:ok <- maybe_verify_upsert_fields(dsl_state, bulk_create, action, action_error_path),
:ok <- verify_select(dsl_state, bulk_create),
:ok <- verify_rollback_on_error(dsl_state, bulk_create),
:ok <- verify_sorted(dsl_state, bulk_create) do
verify_notify(dsl_state, bulk_create)
end
end
defguardp is_falsy(value) when value in [nil, false]
defp verify_notify(dsl_state, bulk_create)
when bulk_create.notify? == true and bulk_create.return_stream? == true,
do:
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:bulk_create, bulk_create.name, :notify?],
message: """
Setting `notify?` has no effect when `return_stream?` is `true`.
You must manually consume the resulting stream of records and notifications in a subsequent step.
"""
)}
defp verify_notify(_dsl_state, _bulk_create), do: :ok
defp verify_sorted(dsl_state, bulk_create)
when bulk_create.sorted? == true and is_falsy(bulk_create.return_records?),
do:
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:bulk_create, bulk_create.name, :sorted?],
message: "Setting `sorted?` has no effect with `return_records?` is not `true`."
)}
defp verify_sorted(_dsl_state, _bulk_create), do: :ok
defp verify_rollback_on_error(dsl_state, bulk_create)
when bulk_create.rollback_on_error? == true and is_falsy(bulk_create.transaction),
do:
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:bulk_create, bulk_create.name, :rollback_on_error?],
message: "Setting `rollback_on_error?` has no effect when `transaction` is `false`."
)}
defp verify_rollback_on_error(_dsl_state, _bulk_create), do: :ok
defp verify_select(_dsl_state, bulk_create) when bulk_create.select == [], do: :ok
defp verify_select(_dsl_state, bulk_create) when bulk_create.return_records? == true, do: :ok
defp verify_select(dsl_state, bulk_create),
do:
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:bulk_create, bulk_create.name, :select],
message: "Setting `select` has no effect when `return_records?` is not `true`."
)}
defp maybe_verify_upsert_fields(dsl_state, bulk_create, action, error_path)
when bulk_create.upsert? == true and bulk_create.upsert_fields == [] and
action.upsert_fields == [],
do:
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: error_path,
message:
"Expected `upsert_fields` to be set on either the bulk create step or the underlying action."
)}
defp maybe_verify_upsert_fields(_dsl_state, _bulk_create, _action, _error_path), do: :ok
defp verify_undo(dsl_state, bulk_create)
when bulk_create.undo != :never and bulk_create.return_stream? == true,
do:
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:bulk_create, bulk_create.name, :undo],
message:
"Cannot set undo to anything other than `:never` when `return_stream?` is `true`."
)}
defp verify_undo(_dsl_state, _bulk_create), do: :ok
defp maybe_verify_undo_action(_dsl_state, bulk_create) when bulk_create.undo == :never, do: :ok
defp maybe_verify_undo_action(dsl_state, bulk_create) do
error_path = [:bulk_create, bulk_create.name, :undo_action]
with {:ok, action} <-
get_action(dsl_state, bulk_create.resource, bulk_create.undo_action, error_path),
:ok <- verify_action_type(dsl_state, bulk_create.resource, action, :action, error_path) do
verify_action_takes_bulk_result(dsl_state, bulk_create.resource, action, error_path)
end
end
defp get_action(dsl_state, resource, action_name, error_path) do
case Info.action(resource, action_name) do
nil ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: error_path,
message:
"No action found matching the name `#{action_name}` on resource `#{inspect(resource)}`."
)}
action when is_struct(action) ->
{:ok, action}
end
end
defp verify_action_type(_dsl_state, _resource, action, action_type, _error_path)
when action.type == action_type,
do: :ok
defp verify_action_type(dsl_state, resource, action, action_type, error_path) do
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: error_path,
message:
"Expected the action `#{inspect(action.name)}` on `#{inspect(resource)}` to be a #{action_type}, however it is a #{action.type}"
)}
end
defp verify_action_takes_bulk_result(
_dsl_state,
_resource,
%{arguments: [%{name: :bulk_result}]},
_error_path
),
do: :ok
defp verify_action_takes_bulk_result(dsl_state, _resource, _action, error_path) do
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: error_path,
message: "The undo action for an create step should take a single `bulk_result` argument."
)}
end
end

View file

@ -6,6 +6,7 @@ defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Create do
alias Reactor.{Argument, Builder} alias Reactor.{Argument, Builder}
alias Spark.{Dsl.Transformer, Error.DslError} alias Spark.{Dsl.Transformer, Error.DslError}
import Ash.Reactor.BuilderUtils import Ash.Reactor.BuilderUtils
import Reactor.Template, only: :macros
@doc false @doc false
@impl true @impl true
@ -20,10 +21,7 @@ defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Create do
module when is_atom(module) -> module when is_atom(module) ->
Argument.from_value(:initial, module) Argument.from_value(:initial, module)
template template when is_template(template) ->
when is_struct(template, Reactor.Template.Input) or
is_struct(template, Reactor.Template.Result) or
is_struct(template, Reactor.Template.Value) ->
%Argument{name: :initial, source: template} %Argument{name: :initial, source: template}
end end

View file

@ -189,7 +189,7 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do
defp validate_entity_input_dupes(_entity, _dsl_state), do: :ok defp validate_entity_input_dupes(_entity, _dsl_state), do: :ok
defp validate_entity_input_empty(entity, dsl_state) do defp validate_entity_input_empty(entity, dsl_state) when is_map_key(entity, :inputs) do
entity.inputs entity.inputs
|> Enum.filter(&Enum.empty?(&1.template)) |> Enum.filter(&Enum.empty?(&1.template))
|> case do |> case do
@ -222,7 +222,9 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do
end end
end end
defp validate_entity_input_names(entity, action, dsl_state) do defp validate_entity_input_empty(_, _), do: :ok
defp validate_entity_input_names(entity, action, dsl_state) when is_map_key(entity, :inputs) do
argument_names = Enum.map(action.arguments, & &1.name) argument_names = Enum.map(action.arguments, & &1.name)
allowed_input_names = allowed_input_names =
@ -291,6 +293,8 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do
end end
end end
defp validate_entity_input_names(_entity, _action, _dsl_state), do: :ok
defp maybe_accept_inputs(input_names, action) when length(action.accepts) > 0, defp maybe_accept_inputs(input_names, action) when length(action.accepts) > 0,
do: Enum.filter(input_names, &(&1 in action.accepts)) do: Enum.filter(input_names, &(&1 in action.accepts))
@ -333,7 +337,7 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do
end end
defp get_entity_resource_action(entity, dsl_state) do defp get_entity_resource_action(entity, dsl_state) do
case Resource.Info.action(entity.resource, entity.action, entity.type) do case Resource.Info.action(entity.resource, entity.action, action_type(entity.type)) do
nil -> nil ->
suggestions = suggestions =
entity.resource entity.resource
@ -356,6 +360,13 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do
end end
end end
defp action_type(:bulk_create), do: :create
defp action_type(:create), do: :create
defp action_type(:read), do: :read
defp action_type(:update), do: :update
defp action_type(:destroy), do: :destroy
defp action_type(:action), do: :action
defp maybe_validate_upsert_identity(entity, dsl_state) defp maybe_validate_upsert_identity(entity, dsl_state)
when entity.upsert? and entity.upsert_identity do when entity.upsert? and entity.upsert_identity do
if Resource.Info.identity(entity.resource, entity.upsert_identity) do if Resource.Info.identity(entity.resource, entity.upsert_identity) do

View file

@ -0,0 +1,290 @@
defmodule Ash.Reactor.Dsl.BulkCreate do
@moduledoc """
The `bulk_create` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
action_step?: true,
action: nil,
actor: [],
assume_casted?: false,
async?: true,
authorize_changeset_with: :filter,
authorize_query_with: :filter,
authorize?: nil,
batch_size: nil,
description: nil,
domain: nil,
initial: nil,
load: [],
max_concurrency: 0,
name: nil,
notification_metadata: %{},
notify?: false,
read_action: nil,
resource: nil,
return_errors?: false,
return_records?: false,
return_stream?: false,
rollback_on_error?: true,
select: [],
skip_unknown_inputs: [],
sorted?: false,
stop_on_error?: false,
success_state: :success,
tenant: [],
timeout: 30_000,
transaction: false,
transform: nil,
type: :bulk_create,
undo_action: nil,
undo: :never,
upsert_fields: [],
upsert_identity: nil,
upsert?: false,
wait_for: []
@type t :: %__MODULE__{
__identifier__: any,
action_step?: true,
action: atom,
actor: [Ash.Reactor.Dsl.Actor.t()],
assume_casted?: boolean,
async?: boolean,
authorize_changeset_with: :filter | :error,
authorize_query_with: :filter | :error,
authorize?: boolean | nil,
batch_size: nil | pos_integer(),
description: String.t() | nil,
domain: Ash.Domain.t(),
initial: Reactor.Template.t(),
load: [atom],
max_concurrency: non_neg_integer(),
name: atom,
notification_metadata: map,
notify?: boolean,
read_action: atom,
resource: module,
return_errors?: boolean,
return_records?: boolean,
return_stream?: boolean,
rollback_on_error?: boolean,
select: [atom],
skip_unknown_inputs: [atom],
sorted?: boolean,
stop_on_error?: boolean,
success_state: :success | :partial_success,
tenant: [Ash.Reactor.Dsl.Tenant.t()],
timeout: nil | timeout,
transaction: :all | :batch | false,
type: :bulk_create,
undo_action: nil,
undo: :never,
upsert_fields: [],
upsert_identity: nil
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :bulk_create,
describe: """
Declares a step which will call a create action on a resource with a collection of inputs.
> ### Check the docs! {: .warning}
>
> Make sure to thoroughly read and understand the documentation in `Ash.bulk_create/4` before using. Read each option and note the default values. By default, bulk creates don't return records or errors, and don't emit notifications.
Caveats/differences from `Ash.bulk_create/4`:
1. `max_concurrency` specifies the number of tasks that Ash will start to process batches, and has no effect on Reactor concurrency targets. It's could be possible to create a very large number of processes if a number of steps are running bulk actions with a high degree of concurrency.
2. Setting `notify?` to `true` will cause both `notify?` and `return_notifications?` to be set to true in the underlying call to `Ash.bulk_create/4`. Notifications will then be managed by the `Ash.Reactor.Notifications` Reactor middleware.
3. If you specify an undo action it must be a generic action which takes the bulk result as it's only argument.
#{Ash.Reactor.Dsl.Action.__shared_undo_docs__()}
""",
examples: [
"""
create :create_posts, MyApp.Post, :create do
initial inputs(:titles)
actor(result(:get_user))
tenant(result(:get_organisation, [:id]))
end
"""
],
no_depend_modules: [:domain, :resource],
target: __MODULE__,
args: [:name, :resource, {:optional, :action}],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
actor: [Ash.Reactor.Dsl.Actor.__entity__()],
tenant: [Ash.Reactor.Dsl.Tenant.__entity__()],
wait_for: [Reactor.Dsl.WaitFor.__entity__()]
],
singleton_entity_keys: [:actor, :tenant],
recursive_as: :steps,
schema:
[
assume_casted?: [
type: :boolean,
doc:
"Whether or not to cast attributes and arguments as input. This is an optimization for cases where the input is already casted and/or not in need of casting",
required: false,
default: false
],
authorize_changeset_with: [
type: {:in, [:filter, :error]},
doc:
"If set to `:error`, instead of filtering unauthorized changes, unauthorized changes will raise an appropriate forbidden error",
required: false,
default: :filter
],
authorize_query_with: [
type: {:in, [:filter, :error]},
doc:
"If set to `:error`, instead of filtering unauthorized query results, unauthorized query results will raise an appropriate forbidden error",
required: false,
default: :filter
],
batch_size: [
type: {:or, [nil, :pos_integer]},
doc:
"The number of records to include in each batch. Defaults to the `default_limit` or `max_page_size` of the action, or 100.",
required: false
],
initial: [
type: Reactor.Template.type(),
required: true,
doc:
"A collection of inputs to pass to the create action. Must implement the `Enumerable` protocol."
],
load: [
type: {:wrap_list, :atom},
doc:
"A load statement to apply to records. Ignored if `return_records?` is not true.",
required: false,
default: []
],
max_concurrency: [
type: :non_neg_integer,
doc:
"If set to a value greater than 0, up to that many tasks will be started to run batches asynchronously.",
required: false,
default: 0
],
notification_metadata: [
type: {:or, [:map, Reactor.Template.type()]},
doc:
"Metadata to be merged into the metadata field for all notifications sent from this operation.",
required: false,
default: %{}
],
notify?: [
type: :boolean,
doc:
"Whether or not to generate any notifications. This may be intensive for large bulk actions.",
required: false,
default: false
],
read_action: [
type: :atom,
doc: "The action to use when building the read query.",
required: false
],
return_errors?: [
type: :boolean,
doc:
"Whether or not to return all of the errors that occur. Defaults to false to account for large inserts.",
required: false,
default: false
],
return_records?: [
type: :boolean,
doc:
"Whether or not to return all of the records that were inserted. Defaults to false to account for large inserts.",
required: false,
default: false
],
return_stream?: [
type: :boolean,
doc: "If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned.",
required: false,
default: false
],
rollback_on_error?: [
type: :boolean,
doc:
"Whether or not to rollback the transaction on error, if the resource is in a transaction.",
required: false,
default: true
],
select: [
type: {:wrap_list, :atom},
doc:
"A select statement to apply to records. Ignored if `return_records?` is not `true`.",
required: false
],
skip_unknown_inputs: [
type: {:wrap_list, :atom},
doc:
"A list of inputs that, if provided, will be ignored if they are not recognized by the action.",
required: false
],
sorted?: [
type: :boolean,
doc:
"Whether or not to sort results by their input position, in cases where `return_records?` is set to `true`.",
required: false,
default: false
],
stop_on_error?: [
type: :boolean,
doc:
"If `true`, the first encountered error will stop the action and be returned. Otherwise, errors will be skipped.",
required: false,
default: false
],
success_state: [
type: {:in, [:success, :partial_success]},
doc:
"Bulk results can be entirely or partially successful. Chooses the `Ash.BulkResult` state to consider the step a success.",
required: false,
default: :success
],
timeout: [
type: :timeout,
doc:
"If none is provided, the timeout configured on the domain is used (which defaults to `30_000`).",
required: false
],
transaction: [
type: {:in, [:all, :batch, false]},
doc:
"Whether or not to wrap the entire execution in a transaction, each batch, or not at all.",
required: false,
default: :batch
],
upsert_fields: [
type: {:wrap_list, :atom},
doc: "The fields to upsert. If not set, the action's `upsert_fields` is used.",
required: false
],
upsert_identity: [
type: :atom,
required: false,
doc: "The identity to use for the upsert"
],
upsert?: [
type: :boolean,
required: false,
default: false,
doc: "Whether or not this action should be executed as an upsert."
]
]
|> Spark.Options.merge(
Ash.Reactor.Dsl.Action.__shared_action_option_schema__(),
"Shared action options"
)
}
end

View file

@ -26,6 +26,7 @@ defmodule Ash.Reactor do
dsl_patches: dsl_patches:
[ [
Ash.Reactor.Dsl.Action, Ash.Reactor.Dsl.Action,
Ash.Reactor.Dsl.BulkCreate,
Ash.Reactor.Dsl.Change, Ash.Reactor.Dsl.Change,
Ash.Reactor.Dsl.Create, Ash.Reactor.Dsl.Create,
Ash.Reactor.Dsl.Destroy, Ash.Reactor.Dsl.Destroy,

View file

@ -0,0 +1,116 @@
defmodule Ash.Reactor.BulkCreateStep do
@moduledoc """
The Reactor stop which is used to execute create actions in bulk.
"""
use Reactor.Step
import Ash.Reactor.StepUtils
alias Ash.{BulkResult, DataLayer}
@doc false
@impl true
def run(arguments, context, options) do
bulk_create_options =
options
|> Keyword.take([
:assume_casted?,
:authorize_changeset_with,
:authorize_query_with,
:authorize?,
:batch_size,
:domain,
:load,
:max_concurrency,
:notify?,
:read_action,
:return_errors?,
:return_notifications?,
:return_records?,
:return_stream?,
:rollback_on_error?,
:select,
:skip_unknown_inputs,
:sorted?,
:stop_on_error?,
:timeout,
:transaction,
:upsert_fields,
:upsert_identity,
:upsert?
])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
|> maybe_set_kw(:notification_metadata, arguments[:notification_metadata])
success_states =
options[:success_state]
|> case do
:partial_success -> [:success, :partial_success]
_ -> [:success]
end
return_stream? = options[:return_stream?]
arguments.initial
|> Ash.bulk_create(options[:resource], options[:action], bulk_create_options)
|> case do
result when is_struct(result, BulkResult) ->
if result.status in success_states do
maybe_queue_notifications(result, context)
else
{:error, result}
end
stream when return_stream? == true ->
{:ok, stream}
end
end
@doc false
@impl true
def undo(bulk_result, arguments, _context, options) when is_struct(bulk_result, BulkResult) do
action_options =
options
|> Keyword.take([:authorize?, :domain])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
options[:resource]
|> Ash.ActionInput.for_action(options[:undo_action], %{bulk_result: bulk_result})
|> Ash.run_action(action_options)
|> case do
:ok -> :ok
{:ok, _} -> :ok
{:error, reason} -> {:error, reason}
end
end
@doc false
@impl true
def can?(%{impl: {_, options}}, :undo) do
case options[:undo] do
:always ->
true
:never ->
false
:outside_transaction ->
!DataLayer.in_transaction?(options[:resource]) || options[:transaction] != false
end
end
def can?(_, :compensate), do: false
defp maybe_queue_notifications(result, _context) when is_nil(result.notifications),
do: {:ok, result}
defp maybe_queue_notifications(result, _context) when result.notifications == [],
do: {:ok, result}
defp maybe_queue_notifications(result, context) do
with :ok <- Ash.Reactor.Notifications.enqueue_notifications(context, result.notifications) do
{:ok, %{result | notifications: nil}}
end
end
end

View file

@ -19,7 +19,7 @@ defmodule Ash.Reactor.CreateStep do
|> maybe_set_kw(:tenant, arguments[:tenant]) |> maybe_set_kw(:tenant, arguments[:tenant])
action_options = action_options =
[return_notifications?: true] [return_notifications?: true, domain: options[:domain]]
|> maybe_set_kw(:authorize?, options[:authorize?]) |> maybe_set_kw(:authorize?, options[:authorize?])
changeset = changeset =
@ -38,7 +38,7 @@ defmodule Ash.Reactor.CreateStep do
end end
changeset changeset
|> options[:domain].create(action_options) |> Ash.create(action_options)
|> case do |> case do
{:ok, record} -> {:ok, record} ->
{:ok, store_changeset_in_metadata(context.current_step.name, record, changeset)} {:ok, store_changeset_in_metadata(context.current_step.name, record, changeset)}
@ -62,7 +62,7 @@ defmodule Ash.Reactor.CreateStep do
|> maybe_set_kw(:tenant, arguments[:tenant]) |> maybe_set_kw(:tenant, arguments[:tenant])
action_options = action_options =
[return_notifications?: true, return_destroyed?: false] [return_notifications?: true, return_destroyed?: false, domain: options[:domain]]
|> maybe_set_kw(:authorize?, options[:authorize?]) |> maybe_set_kw(:authorize?, options[:authorize?])
attributes = attributes =
@ -70,7 +70,7 @@ defmodule Ash.Reactor.CreateStep do
record record
|> Changeset.for_destroy(options[:undo_action], attributes, changeset_options) |> Changeset.for_destroy(options[:undo_action], attributes, changeset_options)
|> options[:domain].destroy(action_options) |> Ash.destroy(action_options)
# We always want to discard the notifications. # We always want to discard the notifications.
|> case do |> case do
:ok -> :ok :ok -> :ok

View file

@ -0,0 +1,46 @@
defmodule Ash.Test.Reactor.BulkCreateTest do
@moduledoc false
use ExUnit.Case, async: true
alias Ash.Test.Domain
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets, domain: Domain
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false, public?: true
end
actions do
defaults [:read, create: :*]
end
end
defmodule BulkCreateReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
input :post_attrs
bulk_create :create_posts, Post, :create do
initial(input(:post_attrs))
end
end
test "it can create a bunch of records all at once" do
how_many = :rand.uniform(99) + :rand.uniform(99)
post_attrs =
1..how_many
|> Enum.map(&%{title: "Post number #{&1}"})
assert {:ok, _} =
Reactor.run(BulkCreateReactor, %{post_attrs: post_attrs}, %{}, async?: false)
created_posts = Ash.read!(Post, action: :read)
assert length(created_posts) == how_many
end
end