From 1d5d247d7ad2eb6f68551dfd015e08d17e2a5a52 Mon Sep 17 00:00:00 2001 From: James Harton Date: Mon, 13 May 2024 13:53:35 +1200 Subject: [PATCH] feat(Ash.Reactor): Add `bulk_create` step type. --- documentation/dsls/DSL:-Ash.Reactor.md | 195 +++++++++++++++ lib/ash/reactor/builders/bulk_create.ex | 246 ++++++++++++++++++ lib/ash/reactor/builders/create.ex | 6 +- lib/ash/reactor/dsl/action_transformer.ex | 17 +- lib/ash/reactor/dsl/bulk_create.ex | 290 ++++++++++++++++++++++ lib/ash/reactor/reactor.ex | 1 + lib/ash/reactor/steps/bulk_create_step.ex | 116 +++++++++ lib/ash/reactor/steps/create_step.ex | 8 +- test/reactor/bulk_create_test.exs | 46 ++++ 9 files changed, 914 insertions(+), 11 deletions(-) create mode 100644 lib/ash/reactor/builders/bulk_create.ex create mode 100644 lib/ash/reactor/dsl/bulk_create.ex create mode 100644 lib/ash/reactor/steps/bulk_create_step.ex create mode 100644 test/reactor/bulk_create_test.exs diff --git a/documentation/dsls/DSL:-Ash.Reactor.md b/documentation/dsls/DSL:-Ash.Reactor.md index ee9ceb78..4459fb24 100644 --- a/documentation/dsls/DSL:-Ash.Reactor.md +++ b/documentation/dsls/DSL:-Ash.Reactor.md @@ -226,6 +226,201 @@ Target: `Ash.Reactor.Dsl.Action` +## reactor.bulk_create +```elixir +bulk_create name, resource, action \\ nil +``` + + +Declares a step which will call a create action on a resource with a collection of inputs. + +> ### Check the docs! {: .warning} +> +> Make sure to thoroughly read and understand the documentation in `Ash.bulk_create/4` before using. Read each option and note the default values. By default, bulk creates don't return records or errors, and don't emit notifications. + +Caveats/differences from `Ash.bulk_create/4`: + +1. `max_concurrency` specifies the number of tasks that Ash will start to process batches, and has no effect on Reactor concurrency targets. It's could be possible to create a very large number of processes if a number of steps are running bulk actions with a high degree of concurrency. +2. Setting `notify?` to `true` will cause both `notify?` and `return_notifications?` to be set to true in the underlying call to `Ash.bulk_create/4`. Notifications will then be managed by the `Ash.Reactor.Notifications` Reactor middleware. +3. If you specify an undo action it must be a generic action which takes the bulk result as it's only argument. + +> #### Undo behaviour {: .tip} +> +> This step has three different modes of undo. +> +> * `never` - The result of the action is never undone. This is the default. +> * `always` - The `undo_action` will always be called. +> * `outside_transaction` - The `undo_action` will not be called when running inside a `transaction` block, but will be otherwise. + + + +### Nested DSLs + * [actor](#reactor-bulk_create-actor) + * [tenant](#reactor-bulk_create-tenant) + * [wait_for](#reactor-bulk_create-wait_for) + + +### Examples +``` +create :create_posts, MyApp.Post, :create do + initial inputs(:titles) + actor(result(:get_user)) + tenant(result(:get_organisation, [:id])) +end + +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`name`](#reactor-bulk_create-name){: #reactor-bulk_create-name .spark-required} | `atom` | | A unique name for the step. | +| [`resource`](#reactor-bulk_create-resource){: #reactor-bulk_create-resource .spark-required} | `module` | | The resource to call the action on. | +| [`action`](#reactor-bulk_create-action){: #reactor-bulk_create-action } | `atom` | | The name of the action to call on the resource. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`initial`](#reactor-bulk_create-initial){: #reactor-bulk_create-initial .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | A collection of inputs to pass to the create action. Must implement the `Enumerable` protocol. | +| [`assume_casted?`](#reactor-bulk_create-assume_casted?){: #reactor-bulk_create-assume_casted? } | `boolean` | `false` | Whether or not to cast attributes and arguments as input. This is an optimization for cases where the input is already casted and/or not in need of casting | +| [`authorize_changeset_with`](#reactor-bulk_create-authorize_changeset_with){: #reactor-bulk_create-authorize_changeset_with } | `:filter \| :error` | `:filter` | If set to `:error`, instead of filtering unauthorized changes, unauthorized changes will raise an appropriate forbidden error | +| [`authorize_query_with`](#reactor-bulk_create-authorize_query_with){: #reactor-bulk_create-authorize_query_with } | `:filter \| :error` | `:filter` | If set to `:error`, instead of filtering unauthorized query results, unauthorized query results will raise an appropriate forbidden error | +| [`batch_size`](#reactor-bulk_create-batch_size){: #reactor-bulk_create-batch_size } | `nil \| pos_integer` | | The number of records to include in each batch. Defaults to the `default_limit` or `max_page_size` of the action, or 100. | +| [`load`](#reactor-bulk_create-load){: #reactor-bulk_create-load } | `atom \| list(atom)` | `[]` | A load statement to apply to records. Ignored if `return_records?` is not true. | +| [`max_concurrency`](#reactor-bulk_create-max_concurrency){: #reactor-bulk_create-max_concurrency } | `non_neg_integer` | `0` | If set to a value greater than 0, up to that many tasks will be started to run batches asynchronously. | +| [`notification_metadata`](#reactor-bulk_create-notification_metadata){: #reactor-bulk_create-notification_metadata } | `map \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | `%{}` | Metadata to be merged into the metadata field for all notifications sent from this operation. | +| [`notify?`](#reactor-bulk_create-notify?){: #reactor-bulk_create-notify? } | `boolean` | `false` | Whether or not to generate any notifications. This may be intensive for large bulk actions. | +| [`read_action`](#reactor-bulk_create-read_action){: #reactor-bulk_create-read_action } | `atom` | | The action to use when building the read query. | +| [`return_errors?`](#reactor-bulk_create-return_errors?){: #reactor-bulk_create-return_errors? } | `boolean` | `false` | Whether or not to return all of the errors that occur. Defaults to false to account for large inserts. | +| [`return_records?`](#reactor-bulk_create-return_records?){: #reactor-bulk_create-return_records? } | `boolean` | `false` | Whether or not to return all of the records that were inserted. Defaults to false to account for large inserts. | +| [`return_stream?`](#reactor-bulk_create-return_stream?){: #reactor-bulk_create-return_stream? } | `boolean` | `false` | If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned. | +| [`rollback_on_error?`](#reactor-bulk_create-rollback_on_error?){: #reactor-bulk_create-rollback_on_error? } | `boolean` | `true` | Whether or not to rollback the transaction on error, if the resource is in a transaction. | +| [`select`](#reactor-bulk_create-select){: #reactor-bulk_create-select } | `atom \| list(atom)` | | A select statement to apply to records. Ignored if `return_records?` is not `true`. | +| [`skip_unknown_inputs`](#reactor-bulk_create-skip_unknown_inputs){: #reactor-bulk_create-skip_unknown_inputs } | `atom \| list(atom)` | | A list of inputs that, if provided, will be ignored if they are not recognized by the action. | +| [`sorted?`](#reactor-bulk_create-sorted?){: #reactor-bulk_create-sorted? } | `boolean` | `false` | Whether or not to sort results by their input position, in cases where `return_records?` is set to `true`. | +| [`stop_on_error?`](#reactor-bulk_create-stop_on_error?){: #reactor-bulk_create-stop_on_error? } | `boolean` | `false` | If `true`, the first encountered error will stop the action and be returned. Otherwise, errors will be skipped. | +| [`success_state`](#reactor-bulk_create-success_state){: #reactor-bulk_create-success_state } | `:success \| :partial_success` | `:success` | Bulk results can be entirely or partially successful. Chooses the `Ash.BulkResult` state to consider the step a success. | +| [`timeout`](#reactor-bulk_create-timeout){: #reactor-bulk_create-timeout } | `timeout` | | If none is provided, the timeout configured on the domain is used (which defaults to `30_000`). | +| [`transaction`](#reactor-bulk_create-transaction){: #reactor-bulk_create-transaction } | `:all \| :batch \| false` | `:batch` | Whether or not to wrap the entire execution in a transaction, each batch, or not at all. | +| [`upsert_fields`](#reactor-bulk_create-upsert_fields){: #reactor-bulk_create-upsert_fields } | `atom \| list(atom)` | | The fields to upsert. If not set, the action's `upsert_fields` is used. | +| [`upsert_identity`](#reactor-bulk_create-upsert_identity){: #reactor-bulk_create-upsert_identity } | `atom` | | The identity to use for the upsert | +| [`upsert?`](#reactor-bulk_create-upsert?){: #reactor-bulk_create-upsert? } | `boolean` | `false` | Whether or not this action should be executed as an upsert. | +| [`domain`](#reactor-bulk_create-domain){: #reactor-bulk_create-domain } | `module` | | The Domain to use when calling the action. Defaults to the Domain set on the resource or in the `ash` section. | +| [`async?`](#reactor-bulk_create-async?){: #reactor-bulk_create-async? } | `boolean` | `true` | When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`. | +| [`authorize?`](#reactor-bulk_create-authorize?){: #reactor-bulk_create-authorize? } | `boolean \| nil` | | Explicitly enable or disable authorization for the action. | +| [`description`](#reactor-bulk_create-description){: #reactor-bulk_create-description } | `String.t` | | A description for the step | +| [`undo_action`](#reactor-bulk_create-undo_action){: #reactor-bulk_create-undo_action } | `atom` | | The name of the action to call on the resource when the step is to be undone. | +| [`undo`](#reactor-bulk_create-undo){: #reactor-bulk_create-undo } | `:always \| :never \| :outside_transaction` | `:never` | How to handle undoing this action | + + +## reactor.bulk_create.actor +```elixir +actor source +``` + + +Specifies the action actor + + + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`source`](#reactor-bulk_create-actor-source){: #reactor-bulk_create-actor-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the actor. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`transform`](#reactor-bulk_create-actor-transform){: #reactor-bulk_create-actor-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the actor before it is passed to the action. | + + + + + +### Introspection + +Target: `Ash.Reactor.Dsl.Actor` + +## reactor.bulk_create.tenant +```elixir +tenant source +``` + + +Specifies the action tenant + + + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`source`](#reactor-bulk_create-tenant-source){: #reactor-bulk_create-tenant-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the tenant. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`transform`](#reactor-bulk_create-tenant-transform){: #reactor-bulk_create-tenant-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the tenant before it is passed to the action. | + + + + + +### Introspection + +Target: `Ash.Reactor.Dsl.Tenant` + +## reactor.bulk_create.wait_for +```elixir +wait_for names +``` + + +Wait for the named step to complete before allowing this one to start. + +Desugars to `argument :_, result(step_to_wait_for)` + + + + +### Examples +``` +wait_for :create_user +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`names`](#reactor-bulk_create-wait_for-names){: #reactor-bulk_create-wait_for-names .spark-required} | `atom \| list(atom)` | | The name of the step to wait for. | + + + + + + +### Introspection + +Target: `Reactor.Dsl.WaitFor` + + + + +### Introspection + +Target: `Ash.Reactor.Dsl.BulkCreate` + + + ## reactor.change ```elixir change name, change diff --git a/lib/ash/reactor/builders/bulk_create.ex b/lib/ash/reactor/builders/bulk_create.ex new file mode 100644 index 00000000..7cbecde7 --- /dev/null +++ b/lib/ash/reactor/builders/bulk_create.ex @@ -0,0 +1,246 @@ +defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.BulkCreate do + @moduledoc false + + alias Ash.Reactor.BulkCreateStep + alias Ash.Resource.Info + alias Reactor.{Argument, Builder} + alias Spark.{Dsl.Transformer, Error.DslError} + import Ash.Reactor.BuilderUtils + import Reactor.Template, only: :macros + + @doc false + @impl true + def build(bulk_create, reactor) do + with {:ok, reactor} <- ensure_hooked(reactor) do + initial = %Argument{name: :initial, source: bulk_create.initial} + + notification_metadata = + case bulk_create.notification_metadata do + template when is_template(template) -> + %Argument{name: :notification_metadata, source: template} + + map when is_map(map) -> + Argument.from_value(:notification_metadata, map) + end + + arguments = + [initial, notification_metadata] + |> maybe_append(bulk_create.actor) + |> maybe_append(bulk_create.tenant) + |> Enum.concat(bulk_create.wait_for) + + action_options = + bulk_create + |> Map.take([ + :action, + :assume_casted?, + :authorize_changeset_with, + :authorize_query_with, + :authorize?, + :batch_size, + :domain, + :max_concurrency, + :notify?, + :read_action, + :resource, + :return_errors?, + :return_records?, + :return_stream?, + :rollback_on_error?, + :select, + :skip_unknown_inputs, + :sorted?, + :stop_on_error?, + :success_state, + :timeout, + :transaction, + :upsert_fields, + :upsert_fields, + :upsert_identity, + :upsert_identity, + :upsert?, + :undo_action, + :undo + ]) + |> Map.put(:return_notifications?, bulk_create.notify?) + |> Enum.reject(&is_nil(elem(&1, 1))) + + step_options = + bulk_create + |> Map.take([:async?]) + |> Map.put(:ref, :step_name) + |> Enum.to_list() + + Builder.add_step( + reactor, + bulk_create.name, + {BulkCreateStep, action_options}, + arguments, + step_options + ) + end + end + + @doc false + @impl true + def transform(_bulk_create, dsl_state), do: {:ok, dsl_state} + + @doc false + @impl true + def verify(bulk_create, dsl_state) do + action_error_path = [:bulk_create, bulk_create.name, :action] + + with {:ok, action} <- + get_action(dsl_state, bulk_create.resource, bulk_create.action, action_error_path), + :ok <- + verify_action_type(dsl_state, bulk_create.resource, action, :create, action_error_path), + :ok <- verify_undo(dsl_state, bulk_create), + :ok <- maybe_verify_undo_action(dsl_state, bulk_create), + :ok <- maybe_verify_upsert_fields(dsl_state, bulk_create, action, action_error_path), + :ok <- verify_select(dsl_state, bulk_create), + :ok <- verify_rollback_on_error(dsl_state, bulk_create), + :ok <- verify_sorted(dsl_state, bulk_create) do + verify_notify(dsl_state, bulk_create) + end + end + + defguardp is_falsy(value) when value in [nil, false] + + defp verify_notify(dsl_state, bulk_create) + when bulk_create.notify? == true and bulk_create.return_stream? == true, + do: + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: [:bulk_create, bulk_create.name, :notify?], + message: """ + Setting `notify?` has no effect when `return_stream?` is `true`. + + You must manually consume the resulting stream of records and notifications in a subsequent step. + """ + )} + + defp verify_notify(_dsl_state, _bulk_create), do: :ok + + defp verify_sorted(dsl_state, bulk_create) + when bulk_create.sorted? == true and is_falsy(bulk_create.return_records?), + do: + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: [:bulk_create, bulk_create.name, :sorted?], + message: "Setting `sorted?` has no effect with `return_records?` is not `true`." + )} + + defp verify_sorted(_dsl_state, _bulk_create), do: :ok + + defp verify_rollback_on_error(dsl_state, bulk_create) + when bulk_create.rollback_on_error? == true and is_falsy(bulk_create.transaction), + do: + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: [:bulk_create, bulk_create.name, :rollback_on_error?], + message: "Setting `rollback_on_error?` has no effect when `transaction` is `false`." + )} + + defp verify_rollback_on_error(_dsl_state, _bulk_create), do: :ok + + defp verify_select(_dsl_state, bulk_create) when bulk_create.select == [], do: :ok + defp verify_select(_dsl_state, bulk_create) when bulk_create.return_records? == true, do: :ok + + defp verify_select(dsl_state, bulk_create), + do: + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: [:bulk_create, bulk_create.name, :select], + message: "Setting `select` has no effect when `return_records?` is not `true`." + )} + + defp maybe_verify_upsert_fields(dsl_state, bulk_create, action, error_path) + when bulk_create.upsert? == true and bulk_create.upsert_fields == [] and + action.upsert_fields == [], + do: + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: error_path, + message: + "Expected `upsert_fields` to be set on either the bulk create step or the underlying action." + )} + + defp maybe_verify_upsert_fields(_dsl_state, _bulk_create, _action, _error_path), do: :ok + + defp verify_undo(dsl_state, bulk_create) + when bulk_create.undo != :never and bulk_create.return_stream? == true, + do: + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: [:bulk_create, bulk_create.name, :undo], + message: + "Cannot set undo to anything other than `:never` when `return_stream?` is `true`." + )} + + defp verify_undo(_dsl_state, _bulk_create), do: :ok + + defp maybe_verify_undo_action(_dsl_state, bulk_create) when bulk_create.undo == :never, do: :ok + + defp maybe_verify_undo_action(dsl_state, bulk_create) do + error_path = [:bulk_create, bulk_create.name, :undo_action] + + with {:ok, action} <- + get_action(dsl_state, bulk_create.resource, bulk_create.undo_action, error_path), + :ok <- verify_action_type(dsl_state, bulk_create.resource, action, :action, error_path) do + verify_action_takes_bulk_result(dsl_state, bulk_create.resource, action, error_path) + end + end + + defp get_action(dsl_state, resource, action_name, error_path) do + case Info.action(resource, action_name) do + nil -> + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: error_path, + message: + "No action found matching the name `#{action_name}` on resource `#{inspect(resource)}`." + )} + + action when is_struct(action) -> + {:ok, action} + end + end + + defp verify_action_type(_dsl_state, _resource, action, action_type, _error_path) + when action.type == action_type, + do: :ok + + defp verify_action_type(dsl_state, resource, action, action_type, error_path) do + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: error_path, + message: + "Expected the action `#{inspect(action.name)}` on `#{inspect(resource)}` to be a #{action_type}, however it is a #{action.type}" + )} + end + + defp verify_action_takes_bulk_result( + _dsl_state, + _resource, + %{arguments: [%{name: :bulk_result}]}, + _error_path + ), + do: :ok + + defp verify_action_takes_bulk_result(dsl_state, _resource, _action, error_path) do + {:error, + DslError.exception( + module: Transformer.get_persisted(dsl_state, :module), + path: error_path, + message: "The undo action for an create step should take a single `bulk_result` argument." + )} + end +end diff --git a/lib/ash/reactor/builders/create.ex b/lib/ash/reactor/builders/create.ex index 438e1548..96b7827d 100644 --- a/lib/ash/reactor/builders/create.ex +++ b/lib/ash/reactor/builders/create.ex @@ -6,6 +6,7 @@ defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Create do alias Reactor.{Argument, Builder} alias Spark.{Dsl.Transformer, Error.DslError} import Ash.Reactor.BuilderUtils + import Reactor.Template, only: :macros @doc false @impl true @@ -20,10 +21,7 @@ defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Create do module when is_atom(module) -> Argument.from_value(:initial, module) - template - when is_struct(template, Reactor.Template.Input) or - is_struct(template, Reactor.Template.Result) or - is_struct(template, Reactor.Template.Value) -> + template when is_template(template) -> %Argument{name: :initial, source: template} end diff --git a/lib/ash/reactor/dsl/action_transformer.ex b/lib/ash/reactor/dsl/action_transformer.ex index eadc8f49..ecb588f3 100644 --- a/lib/ash/reactor/dsl/action_transformer.ex +++ b/lib/ash/reactor/dsl/action_transformer.ex @@ -189,7 +189,7 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do defp validate_entity_input_dupes(_entity, _dsl_state), do: :ok - defp validate_entity_input_empty(entity, dsl_state) do + defp validate_entity_input_empty(entity, dsl_state) when is_map_key(entity, :inputs) do entity.inputs |> Enum.filter(&Enum.empty?(&1.template)) |> case do @@ -222,7 +222,9 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do end end - defp validate_entity_input_names(entity, action, dsl_state) do + defp validate_entity_input_empty(_, _), do: :ok + + defp validate_entity_input_names(entity, action, dsl_state) when is_map_key(entity, :inputs) do argument_names = Enum.map(action.arguments, & &1.name) allowed_input_names = @@ -291,6 +293,8 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do end end + defp validate_entity_input_names(_entity, _action, _dsl_state), do: :ok + defp maybe_accept_inputs(input_names, action) when length(action.accepts) > 0, do: Enum.filter(input_names, &(&1 in action.accepts)) @@ -333,7 +337,7 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do end defp get_entity_resource_action(entity, dsl_state) do - case Resource.Info.action(entity.resource, entity.action, entity.type) do + case Resource.Info.action(entity.resource, entity.action, action_type(entity.type)) do nil -> suggestions = entity.resource @@ -356,6 +360,13 @@ defmodule Ash.Reactor.Dsl.ActionTransformer do end end + defp action_type(:bulk_create), do: :create + defp action_type(:create), do: :create + defp action_type(:read), do: :read + defp action_type(:update), do: :update + defp action_type(:destroy), do: :destroy + defp action_type(:action), do: :action + defp maybe_validate_upsert_identity(entity, dsl_state) when entity.upsert? and entity.upsert_identity do if Resource.Info.identity(entity.resource, entity.upsert_identity) do diff --git a/lib/ash/reactor/dsl/bulk_create.ex b/lib/ash/reactor/dsl/bulk_create.ex new file mode 100644 index 00000000..012a9b2c --- /dev/null +++ b/lib/ash/reactor/dsl/bulk_create.ex @@ -0,0 +1,290 @@ +defmodule Ash.Reactor.Dsl.BulkCreate do + @moduledoc """ + The `bulk_create` entity for the `Ash.Reactor` reactor extension. + """ + + defstruct __identifier__: nil, + action_step?: true, + action: nil, + actor: [], + assume_casted?: false, + async?: true, + authorize_changeset_with: :filter, + authorize_query_with: :filter, + authorize?: nil, + batch_size: nil, + description: nil, + domain: nil, + initial: nil, + load: [], + max_concurrency: 0, + name: nil, + notification_metadata: %{}, + notify?: false, + read_action: nil, + resource: nil, + return_errors?: false, + return_records?: false, + return_stream?: false, + rollback_on_error?: true, + select: [], + skip_unknown_inputs: [], + sorted?: false, + stop_on_error?: false, + success_state: :success, + tenant: [], + timeout: 30_000, + transaction: false, + transform: nil, + type: :bulk_create, + undo_action: nil, + undo: :never, + upsert_fields: [], + upsert_identity: nil, + upsert?: false, + wait_for: [] + + @type t :: %__MODULE__{ + __identifier__: any, + action_step?: true, + action: atom, + actor: [Ash.Reactor.Dsl.Actor.t()], + assume_casted?: boolean, + async?: boolean, + authorize_changeset_with: :filter | :error, + authorize_query_with: :filter | :error, + authorize?: boolean | nil, + batch_size: nil | pos_integer(), + description: String.t() | nil, + domain: Ash.Domain.t(), + initial: Reactor.Template.t(), + load: [atom], + max_concurrency: non_neg_integer(), + name: atom, + notification_metadata: map, + notify?: boolean, + read_action: atom, + resource: module, + return_errors?: boolean, + return_records?: boolean, + return_stream?: boolean, + rollback_on_error?: boolean, + select: [atom], + skip_unknown_inputs: [atom], + sorted?: boolean, + stop_on_error?: boolean, + success_state: :success | :partial_success, + tenant: [Ash.Reactor.Dsl.Tenant.t()], + timeout: nil | timeout, + transaction: :all | :batch | false, + type: :bulk_create, + undo_action: nil, + undo: :never, + upsert_fields: [], + upsert_identity: nil + } + + @doc false + def __entity__, + do: %Spark.Dsl.Entity{ + name: :bulk_create, + describe: """ + Declares a step which will call a create action on a resource with a collection of inputs. + + > ### Check the docs! {: .warning} + > + > Make sure to thoroughly read and understand the documentation in `Ash.bulk_create/4` before using. Read each option and note the default values. By default, bulk creates don't return records or errors, and don't emit notifications. + + Caveats/differences from `Ash.bulk_create/4`: + + 1. `max_concurrency` specifies the number of tasks that Ash will start to process batches, and has no effect on Reactor concurrency targets. It's could be possible to create a very large number of processes if a number of steps are running bulk actions with a high degree of concurrency. + 2. Setting `notify?` to `true` will cause both `notify?` and `return_notifications?` to be set to true in the underlying call to `Ash.bulk_create/4`. Notifications will then be managed by the `Ash.Reactor.Notifications` Reactor middleware. + 3. If you specify an undo action it must be a generic action which takes the bulk result as it's only argument. + + #{Ash.Reactor.Dsl.Action.__shared_undo_docs__()} + """, + examples: [ + """ + create :create_posts, MyApp.Post, :create do + initial inputs(:titles) + actor(result(:get_user)) + tenant(result(:get_organisation, [:id])) + end + """ + ], + no_depend_modules: [:domain, :resource], + target: __MODULE__, + args: [:name, :resource, {:optional, :action}], + identifier: :name, + imports: [Reactor.Dsl.Argument], + entities: [ + actor: [Ash.Reactor.Dsl.Actor.__entity__()], + tenant: [Ash.Reactor.Dsl.Tenant.__entity__()], + wait_for: [Reactor.Dsl.WaitFor.__entity__()] + ], + singleton_entity_keys: [:actor, :tenant], + recursive_as: :steps, + schema: + [ + assume_casted?: [ + type: :boolean, + doc: + "Whether or not to cast attributes and arguments as input. This is an optimization for cases where the input is already casted and/or not in need of casting", + required: false, + default: false + ], + authorize_changeset_with: [ + type: {:in, [:filter, :error]}, + doc: + "If set to `:error`, instead of filtering unauthorized changes, unauthorized changes will raise an appropriate forbidden error", + required: false, + default: :filter + ], + authorize_query_with: [ + type: {:in, [:filter, :error]}, + doc: + "If set to `:error`, instead of filtering unauthorized query results, unauthorized query results will raise an appropriate forbidden error", + required: false, + default: :filter + ], + batch_size: [ + type: {:or, [nil, :pos_integer]}, + doc: + "The number of records to include in each batch. Defaults to the `default_limit` or `max_page_size` of the action, or 100.", + required: false + ], + initial: [ + type: Reactor.Template.type(), + required: true, + doc: + "A collection of inputs to pass to the create action. Must implement the `Enumerable` protocol." + ], + load: [ + type: {:wrap_list, :atom}, + doc: + "A load statement to apply to records. Ignored if `return_records?` is not true.", + required: false, + default: [] + ], + max_concurrency: [ + type: :non_neg_integer, + doc: + "If set to a value greater than 0, up to that many tasks will be started to run batches asynchronously.", + required: false, + default: 0 + ], + notification_metadata: [ + type: {:or, [:map, Reactor.Template.type()]}, + doc: + "Metadata to be merged into the metadata field for all notifications sent from this operation.", + required: false, + default: %{} + ], + notify?: [ + type: :boolean, + doc: + "Whether or not to generate any notifications. This may be intensive for large bulk actions.", + required: false, + default: false + ], + read_action: [ + type: :atom, + doc: "The action to use when building the read query.", + required: false + ], + return_errors?: [ + type: :boolean, + doc: + "Whether or not to return all of the errors that occur. Defaults to false to account for large inserts.", + required: false, + default: false + ], + return_records?: [ + type: :boolean, + doc: + "Whether or not to return all of the records that were inserted. Defaults to false to account for large inserts.", + required: false, + default: false + ], + return_stream?: [ + type: :boolean, + doc: "If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned.", + required: false, + default: false + ], + rollback_on_error?: [ + type: :boolean, + doc: + "Whether or not to rollback the transaction on error, if the resource is in a transaction.", + required: false, + default: true + ], + select: [ + type: {:wrap_list, :atom}, + doc: + "A select statement to apply to records. Ignored if `return_records?` is not `true`.", + required: false + ], + skip_unknown_inputs: [ + type: {:wrap_list, :atom}, + doc: + "A list of inputs that, if provided, will be ignored if they are not recognized by the action.", + required: false + ], + sorted?: [ + type: :boolean, + doc: + "Whether or not to sort results by their input position, in cases where `return_records?` is set to `true`.", + required: false, + default: false + ], + stop_on_error?: [ + type: :boolean, + doc: + "If `true`, the first encountered error will stop the action and be returned. Otherwise, errors will be skipped.", + required: false, + default: false + ], + success_state: [ + type: {:in, [:success, :partial_success]}, + doc: + "Bulk results can be entirely or partially successful. Chooses the `Ash.BulkResult` state to consider the step a success.", + required: false, + default: :success + ], + timeout: [ + type: :timeout, + doc: + "If none is provided, the timeout configured on the domain is used (which defaults to `30_000`).", + required: false + ], + transaction: [ + type: {:in, [:all, :batch, false]}, + doc: + "Whether or not to wrap the entire execution in a transaction, each batch, or not at all.", + required: false, + default: :batch + ], + upsert_fields: [ + type: {:wrap_list, :atom}, + doc: "The fields to upsert. If not set, the action's `upsert_fields` is used.", + required: false + ], + upsert_identity: [ + type: :atom, + required: false, + doc: "The identity to use for the upsert" + ], + upsert?: [ + type: :boolean, + required: false, + default: false, + doc: "Whether or not this action should be executed as an upsert." + ] + ] + |> Spark.Options.merge( + Ash.Reactor.Dsl.Action.__shared_action_option_schema__(), + "Shared action options" + ) + } +end diff --git a/lib/ash/reactor/reactor.ex b/lib/ash/reactor/reactor.ex index 5c50e976..507aeb0e 100644 --- a/lib/ash/reactor/reactor.ex +++ b/lib/ash/reactor/reactor.ex @@ -26,6 +26,7 @@ defmodule Ash.Reactor do dsl_patches: [ Ash.Reactor.Dsl.Action, + Ash.Reactor.Dsl.BulkCreate, Ash.Reactor.Dsl.Change, Ash.Reactor.Dsl.Create, Ash.Reactor.Dsl.Destroy, diff --git a/lib/ash/reactor/steps/bulk_create_step.ex b/lib/ash/reactor/steps/bulk_create_step.ex new file mode 100644 index 00000000..6541aec3 --- /dev/null +++ b/lib/ash/reactor/steps/bulk_create_step.ex @@ -0,0 +1,116 @@ +defmodule Ash.Reactor.BulkCreateStep do + @moduledoc """ + The Reactor stop which is used to execute create actions in bulk. + """ + + use Reactor.Step + import Ash.Reactor.StepUtils + alias Ash.{BulkResult, DataLayer} + + @doc false + @impl true + def run(arguments, context, options) do + bulk_create_options = + options + |> Keyword.take([ + :assume_casted?, + :authorize_changeset_with, + :authorize_query_with, + :authorize?, + :batch_size, + :domain, + :load, + :max_concurrency, + :notify?, + :read_action, + :return_errors?, + :return_notifications?, + :return_records?, + :return_stream?, + :rollback_on_error?, + :select, + :skip_unknown_inputs, + :sorted?, + :stop_on_error?, + :timeout, + :transaction, + :upsert_fields, + :upsert_identity, + :upsert? + ]) + |> maybe_set_kw(:actor, arguments[:actor]) + |> maybe_set_kw(:tenant, arguments[:tenant]) + |> maybe_set_kw(:notification_metadata, arguments[:notification_metadata]) + + success_states = + options[:success_state] + |> case do + :partial_success -> [:success, :partial_success] + _ -> [:success] + end + + return_stream? = options[:return_stream?] + + arguments.initial + |> Ash.bulk_create(options[:resource], options[:action], bulk_create_options) + |> case do + result when is_struct(result, BulkResult) -> + if result.status in success_states do + maybe_queue_notifications(result, context) + else + {:error, result} + end + + stream when return_stream? == true -> + {:ok, stream} + end + end + + @doc false + @impl true + def undo(bulk_result, arguments, _context, options) when is_struct(bulk_result, BulkResult) do + action_options = + options + |> Keyword.take([:authorize?, :domain]) + |> maybe_set_kw(:actor, arguments[:actor]) + |> maybe_set_kw(:tenant, arguments[:tenant]) + + options[:resource] + |> Ash.ActionInput.for_action(options[:undo_action], %{bulk_result: bulk_result}) + |> Ash.run_action(action_options) + |> case do + :ok -> :ok + {:ok, _} -> :ok + {:error, reason} -> {:error, reason} + end + end + + @doc false + @impl true + def can?(%{impl: {_, options}}, :undo) do + case options[:undo] do + :always -> + true + + :never -> + false + + :outside_transaction -> + !DataLayer.in_transaction?(options[:resource]) || options[:transaction] != false + end + end + + def can?(_, :compensate), do: false + + defp maybe_queue_notifications(result, _context) when is_nil(result.notifications), + do: {:ok, result} + + defp maybe_queue_notifications(result, _context) when result.notifications == [], + do: {:ok, result} + + defp maybe_queue_notifications(result, context) do + with :ok <- Ash.Reactor.Notifications.enqueue_notifications(context, result.notifications) do + {:ok, %{result | notifications: nil}} + end + end +end diff --git a/lib/ash/reactor/steps/create_step.ex b/lib/ash/reactor/steps/create_step.ex index 5613899a..a8edf401 100644 --- a/lib/ash/reactor/steps/create_step.ex +++ b/lib/ash/reactor/steps/create_step.ex @@ -19,7 +19,7 @@ defmodule Ash.Reactor.CreateStep do |> maybe_set_kw(:tenant, arguments[:tenant]) action_options = - [return_notifications?: true] + [return_notifications?: true, domain: options[:domain]] |> maybe_set_kw(:authorize?, options[:authorize?]) changeset = @@ -38,7 +38,7 @@ defmodule Ash.Reactor.CreateStep do end changeset - |> options[:domain].create(action_options) + |> Ash.create(action_options) |> case do {:ok, record} -> {:ok, store_changeset_in_metadata(context.current_step.name, record, changeset)} @@ -62,7 +62,7 @@ defmodule Ash.Reactor.CreateStep do |> maybe_set_kw(:tenant, arguments[:tenant]) action_options = - [return_notifications?: true, return_destroyed?: false] + [return_notifications?: true, return_destroyed?: false, domain: options[:domain]] |> maybe_set_kw(:authorize?, options[:authorize?]) attributes = @@ -70,7 +70,7 @@ defmodule Ash.Reactor.CreateStep do record |> Changeset.for_destroy(options[:undo_action], attributes, changeset_options) - |> options[:domain].destroy(action_options) + |> Ash.destroy(action_options) # We always want to discard the notifications. |> case do :ok -> :ok diff --git a/test/reactor/bulk_create_test.exs b/test/reactor/bulk_create_test.exs new file mode 100644 index 00000000..2a87279e --- /dev/null +++ b/test/reactor/bulk_create_test.exs @@ -0,0 +1,46 @@ +defmodule Ash.Test.Reactor.BulkCreateTest do + @moduledoc false + use ExUnit.Case, async: true + + alias Ash.Test.Domain + + defmodule Post do + @moduledoc false + use Ash.Resource, data_layer: Ash.DataLayer.Ets, domain: Domain + + attributes do + uuid_primary_key :id + attribute :title, :string, allow_nil?: false, public?: true + end + + actions do + defaults [:read, create: :*] + end + end + + defmodule BulkCreateReactor do + @moduledoc false + use Reactor, extensions: [Ash.Reactor] + + input :post_attrs + + bulk_create :create_posts, Post, :create do + initial(input(:post_attrs)) + end + end + + test "it can create a bunch of records all at once" do + how_many = :rand.uniform(99) + :rand.uniform(99) + + post_attrs = + 1..how_many + |> Enum.map(&%{title: "Post number #{&1}"}) + + assert {:ok, _} = + Reactor.run(BulkCreateReactor, %{post_attrs: post_attrs}, %{}, async?: false) + + created_posts = Ash.read!(Post, action: :read) + + assert length(created_posts) == how_many + end +end