improvement: add iterative, streaming implementation for bulk updates

This commit is contained in:
Zach Daniel 2023-12-20 17:33:43 -05:00
parent 1cb5e2860d
commit 8c17434803
11 changed files with 1977 additions and 142 deletions

File diff suppressed because it is too large Load diff

View file

@ -387,6 +387,116 @@ defmodule Ash.Api do
"Shared create/update/destroy Options"
)
@shared_bulk_opts_schema [
assume_casted?: [
type: :boolean,
default: false,
doc:
"Whether or not to cast attributes and arguments as input. This is an optimization for cases where the input is already casted and/or not in need of casting"
],
context: [
type: :map,
doc: "Context to set on each changeset"
],
sorted?: [
type: :boolean,
default: false,
doc:
"Whether or not to sort results by their input position, in cases where `return_records?: true` was provided."
],
return_records?: [
type: :boolean,
default: false,
doc:
"Whether or not to return all of the records that were inserted. Defaults to false to account for large inserts."
],
return_errors?: [
type: :boolean,
default: false,
doc:
"Whether or not to return all of the errors that occur. Defaults to false to account for large inserts."
],
batch_size: [
type: :pos_integer,
doc: """
The number of records to include in each batch. Defaults to the `default_limit`
or `max_page_size` of the action, or 100.
"""
],
return_stream?: [
type: :boolean,
default: false,
doc: """
If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned.
Potential elements:
`{:notification, notification}` - if `return_notifications?` is set to `true`
`{:ok, record}` - if `return_records?` is set to `true`
`{:error, error}` - an error that occurred. May be changeset or an invidual error.
"""
],
stop_on_error?: [
type: :boolean,
default: false,
doc: """
If true, the first encountered error will stop the action and be returned. Otherwise, errors
will be skipped.
"""
],
notify?: [
type: :boolean,
default: false,
doc: """
Whether or not to send notifications out. If this is set to `true` then the data layer must return
the results from each batch. This may be intensive for large bulk actions.
"""
],
transaction: [
type: {:one_of, [:all, :batch, false]},
default: :batch,
doc: """
Whether or not to wrap the entire execution in a transaction, each batch, or not at all.
Keep in mind:
`before_transaction` and `after_transaction` hooks attached to changesets will have to be run
*inside* the transaction if you choose `transaction: :all`.
"""
],
max_concurrency: [
type: :non_neg_integer,
default: 0,
doc:
"If set to a value greater than 0, up to that many tasks will be started to run batches asynchronously"
]
]
@bulk_update_opts_schema [
resource: [
type: {:spark, Ash.Resource},
doc:
"The resource being updated. This must be provided if the input given is a stream, so we know ahead of time what the resource being updated is."
],
atomic_update: [
type: :map,
doc:
"A map of atomic updates to apply. See `Ash.Changeset.atomic_update/3` for more."
]
]
|> merge_schemas(
Keyword.delete(@global_opts, :action),
"Global options"
)
|> merge_schemas(
@shared_created_update_and_destroy_opts_schema,
"Shared create/update/destroy options"
)
|> merge_schemas(
@shared_bulk_opts_schema,
"Shared bulk options"
)
@bulk_create_opts_schema [
upsert?: [
type: :boolean,
@ -394,27 +504,11 @@ defmodule Ash.Api do
doc:
"If a conflict is found based on the primary key, the record is updated in the database (requires upsert support)"
],
max_concurrency: [
type: :non_neg_integer,
default: 0,
doc:
"If set to a value greater than 0, up to that many tasks will be started to run batches asynchronously"
],
assume_casted?: [
type: :boolean,
default: false,
doc:
"Whether or not to cast attributes and arguments as input. This is an optimization for cases where the input is already casted and/or not in need of casting"
],
upsert_identity: [
type: :atom,
doc:
"The identity to use when detecting conflicts for `upsert?`, e.g. `upsert_identity: :full_name`. By default, the primary key is used. Has no effect if `upsert?: true` is not provided"
],
context: [
type: :map,
doc: "Context to set on each changeset"
],
upsert_fields: [
type:
{:or,
@ -427,81 +521,19 @@ defmodule Ash.Api do
]},
doc:
"The fields to upsert. If not set, the action's `upsert_fields` is used. Unlike singular `create`, `bulk_create` with `upsert?` requires that `upsert_fields` be specified explicitly in one of these two locations."
],
sorted?: [
type: :boolean,
default: false,
doc:
"Whether or not to sort results by their input position, in cases where `return_records?: true` was provided."
],
return_records?: [
type: :boolean,
default: false,
doc:
"Whether or not to return all of the records that were inserted. Defaults to false to account for large inserts."
],
return_errors?: [
type: :boolean,
default: false,
doc:
"Whether or not to return all of the errors that occur. Defaults to false to account for large inserts."
],
batch_size: [
type: :pos_integer,
doc: """
The number of records to include in each batch. Defaults to the `default_limit`
or `max_page_size` of the action, or 100.
"""
],
return_stream?: [
type: :boolean,
default: false,
doc: """
If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned.
Potential elements:
`{:notification, notification}` - if `return_notifications?` is set to `true`
`{:ok, record}` - if `return_records?` is set to `true`
`{:error, error}` - an error that occurred. May be changeset or an invidual error.
"""
],
stop_on_error?: [
type: :boolean,
default: false,
doc: """
If true, the first encountered error will stop the action and be returned. Otherwise, errors
will be skipped.
"""
],
notify?: [
type: :boolean,
default: false,
doc: """
Whether or not to send notifications out. If this is set to `true` then the data layer must return
the results from each batch. This may be intensive for large bulk actions.
"""
],
transaction: [
type: {:one_of, [:all, :batch, false]},
default: :batch,
doc: """
Whether or not to wrap the entire execution in a transaction, each batch, or not at all.
Keep in mind:
`before_transaction` and `after_transaction` hooks attached to changesets will have to be run
*inside* the transaction if you choose `transaction: :all`.
"""
]
]
|> merge_schemas(
Keyword.delete(@global_opts, :action),
"Global Options"
"Global options"
)
|> merge_schemas(
@shared_created_update_and_destroy_opts_schema,
"Shared create/update/destroy Options"
"Shared create/update/destroy options"
)
|> merge_schemas(
@shared_bulk_opts_schema,
"Shared bulk options"
)
@doc false
@ -1264,24 +1296,19 @@ defmodule Ash.Api do
See `can/3` for more info.
"""
@callback can?(
query_or_changeset_or_action ::
Ash.Query.t()
| Ash.Changeset.t()
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action()},
actor :: term,
opts :: Keyword.t()
) ::
boolean | no_return
@callback can?(
query_or_changeset_or_action ::
Ash.Query.t()
| Ash.Changeset.t()
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action()},
actor :: term
| Ash.ActionInput.t()
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action()}
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action(), input :: map},
actor :: term,
opts :: Keyword.t()
) ::
boolean | no_return
@doc """
Returns whether or not the user can perform the action, or `:maybe`, returning any errors.
@ -1303,24 +1330,21 @@ defmodule Ash.Api do
this mans adding field visibility calculations and altering the filter or the sort. For a changeset, this means only adding
field visibility calculations. The default value is `false`.
"""
@callback can(
action_or_query_or_changeset ::
Ash.Query.t()
| Ash.Changeset.t()
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action()},
actor :: term,
opts :: Keyword.t()
) ::
{:ok, boolean | :maybe} | {:error, term}
@callback can(
action_or_query_or_changeset ::
Ash.Query.t()
| Ash.Changeset.t()
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action()},
actor :: term
| Ash.ActionInput.t()
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action()}
| {Ash.Resource.t(), atom | Ash.Resource.Actions.action(), input :: map},
actor :: term,
opts :: Keyword.t()
) ::
{:ok, boolean | :maybe} | {:error, term}
{:ok, boolean | :maybe}
| {:ok, true, Ash.Changeset.t() | Ash.Query.t()}
| {:ok, false, Exception.t()}
| {:error, term}
@callback calculate(resource :: Ash.Resource.t(), calculation :: atom, opts :: Keyword.t()) ::
{:ok, term} | {:error, term}
@ -2290,6 +2314,81 @@ defmodule Ash.Api do
end
end
@doc false
@spec bulk_update!(
Ash.Api.t(),
Enumerable.t(Ash.Resource.record()),
atom,
input :: map,
Keyword.t()
) ::
Ash.BulkResult.t() | no_return
def bulk_update!(api, stream_or_query, action, input, opts) do
api
|> bulk_update(stream_or_query, action, input, opts)
|> case do
%Ash.BulkResult{status: :error, errors: errors} when errors in [nil, []] ->
if opts[:return_errors?] do
raise Ash.Error.to_error_class(
Ash.Error.Unknown.UnknownError.exception(
error: "Something went wrong with bulk update, but no errors were produced."
)
)
else
raise Ash.Error.to_error_class(
Ash.Error.Unknown.UnknownError.exception(
error:
"Something went wrong with bulk update, but no errors were produced due to `return_errors?` being set to `false`."
)
)
end
%Ash.BulkResult{status: :error, errors: errors} ->
raise Ash.Error.to_error_class(errors)
bulk_result ->
bulk_result
end
end
@doc false
@spec bulk_update(
Ash.Api.t(),
Enumerable.t(Ash.Resource.record()),
atom,
input :: map,
Keyword.t()
) ::
Ash.BulkResult.t()
def bulk_update(api, query_or_stream, action, input, opts) do
case query_or_stream do
[] ->
result = %Ash.BulkResult{status: :success, errors: []}
result =
if opts[:return_records?] do
%{result | records: []}
else
result
end
if opts[:return_notifications?] do
%{result | notifications: []}
else
result
end
query_or_stream ->
case Spark.OptionsHelpers.validate(opts, @bulk_update_opts_schema) do
{:ok, opts} ->
Update.Bulk.run(api, query_or_stream, action, input, opts)
{:error, error} ->
%Ash.BulkResult{status: :error, errors: [Ash.Error.to_ash_error(error)]}
end
end
end
@doc false
def update!(api, changeset, opts) do
opts = Spark.OptionsHelpers.validate!(opts, @update_opts_schema)

View file

@ -359,6 +359,20 @@ defmodule Ash.Api.Interface do
end
end
def bulk_update!(stream_or_query, action, input, opts \\ []) do
Api.bulk_update!(__MODULE__, stream_or_query, action, input, opts)
end
def bulk_update(stream_or_query, action, input, opts \\ []) do
case Api.bulk_update(__MODULE__, stream_or_query, action, input, opts) do
{:error, error} ->
{:error, Ash.Error.to_error_class(error)}
other ->
other
end
end
def create!(changeset, params \\ []) do
Api.create!(__MODULE__, changeset, params)
end

View file

@ -256,8 +256,6 @@ defmodule Ash.Changeset do
IO.warn("""
Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.
In the future, this will become an error.
For safety, we prevent any changes after that point because they will bypass validations or other action logic.. To proceed anyway,
you can use `#{unquote(alternative)}/#{unquote(arity)}`. However, you should prefer a pattern like the below, which makes
any custom changes *before* calling the action.
@ -277,8 +275,6 @@ defmodule Ash.Changeset do
IO.warn("""
Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.
In the future, this will become an error.
For safety, we prevent any changes using `#{unquote(function)}/#{unquote(arity)}` after that point because they will bypass validations or other action logic.
Instead, you should change or set this value before calling the action, like so:
@ -528,6 +524,8 @@ defmodule Ash.Changeset do
%{change: {module, change_opts}, where: where}, changeset ->
with {:atomic, atomic_changes} <- module.atomic(changeset, change_opts, context),
{:atomic, condition} <- atomic_condition(where, changeset) do
changeset = add_after_atomic(changeset, module, change_opts)
case condition do
true ->
{:cont, atomic_update(changeset, atomic_changes)}
@ -558,8 +556,10 @@ defmodule Ash.Changeset do
end
%{validation: {module, validation_opts}, where: where}, changeset ->
with {:atomic, condition_expr, error_expr} <-
with {:atomic, fields, condition_expr, error_expr} <-
module.atomic(changeset, validation_opts),
{:changing?, true} <-
{:changing?, Enum.any?(fields, &changing_attribute?(changeset, &1))},
{:atomic, condition} <- atomic_condition(where, changeset) do
case condition do
true ->
@ -577,10 +577,28 @@ defmodule Ash.Changeset do
else
:not_atomic ->
{:halt, :not_atomic}
{:changing?, false} ->
{:cont, changeset}
end
end)
end
defp add_after_atomic(changeset, module, opts) do
if function_exported?(module, :after_atomic?, 3) do
after_action(changeset, fn changeset, result ->
context = %{
actor: changeset.context[:private][:actor],
tenant: changeset.tenant,
authorize?: changeset.context[:private][:authorize?],
tracer: changeset.context[:private][:tracer]
}
module.after_atomic(changeset, opts, result, context)
end)
end
end
defp validate_atomically(changeset, condition_expr, error_expr) do
[first_pkey_field | _] = Ash.Resource.Info.primary_key(changeset.resource)
@ -612,7 +630,7 @@ defmodule Ash.Changeset do
Enum.reduce_while(where, {:atomic, true}, fn {module, validation_opts},
{:atomic, condition} ->
case module.atomic(changeset, validation_opts) do
{:atomic, expr, _as_error} ->
{:atomic, _, expr, _as_error} ->
new_expr =
if condition == true do
expr
@ -918,9 +936,9 @@ defmodule Ash.Changeset do
end
@doc """
Adds atomic changes to the changeset
Adds multiple atomic changes to the changeset
i.e `Ash.Changeset.atomic_update(changeset, score: [Ash.Expr.expr(score + 1)])`
See `atomic_update/3` for more information.
"""
@spec atomic_update(t(), map() | Keyword.t()) :: t()
def atomic_update(changeset, atomics) when is_list(atomics) or is_map(atomics) do
@ -930,9 +948,44 @@ defmodule Ash.Changeset do
end
@doc """
Adds an atomic change to the changeset
Adds an atomic change to the changeset.
i.e `Ash.Changeset.atomic_update(changeset, :score, [Ash.Expr.expr(score + 1)])`
Atomic changes are applied by the data layer, and as such have guarantees that are not
given by changes that are based on looking at the previous value and updating it. Here
is an example of a change that is not safe to do concurrently:
```elixir
change fn changeset, _ ->
Ash.Changeset.set_attribute(changeset, :score, changeset.data.score + 1)
end
```
If two processes run this concurrently, they will both read the same value of `score`, and
set the new score to the same value. This means that one of the increments will be lost.
If you were to instead do this using `atomic_update`, you would get the correct result:
```elixir
Ash.Changeset.atomic_update(changeset, :score, [Ash.Expr.expr(score + 1)])
```
There are drawbacks/things to consider, however. The first is that atomic update results
are not known until after the action is run. The following functional validation would not
be able to enforce the score being less than 10, because the atomic happens after the validation.
```elixir
validate fn changeset, _ ->
if Ash.Changeset.get_attribute(changeset, :score) < 10 do
:ok
else
{:error, field: :score, message: "must be less than 10"}
end
end
```
If you want to use atomic updates, it is suggested to write module-based validations & changes,
and implement the appropriate atomic callbacks on those modules. All builtin validations and changes
implement these callbacks in addition to the standard callbacks. Validations will only be run atomically
when the entire action is being run atomically or if one of the relevant fields is being updated atomically.
"""
@spec atomic_update(t(), atom(), {:atomic, Ash.Expr.t()} | Ash.Expr.t()) :: t()
def atomic_update(changeset, key, {:atomic, value}) do
@ -1512,7 +1565,7 @@ defmodule Ash.Changeset do
changeset
else
changeset
|> unsafe_change_attribute(attribute.name, default(:create, attribute))
|> force_change_attribute(attribute.name, default(:create, attribute))
|> Map.update!(:defaults, fn defaults ->
[attribute.name | defaults]
end)
@ -1537,7 +1590,7 @@ defmodule Ash.Changeset do
changeset
else
changeset
|> unsafe_change_attribute(attribute.name, default(:update, attribute))
|> force_change_attribute(attribute.name, default(:update, attribute))
|> Map.update!(:defaults, fn defaults ->
[attribute.name | defaults]
end)
@ -1605,7 +1658,7 @@ defmodule Ash.Changeset do
changeset
else
changeset
|> unsafe_change_attribute(attribute.name, default_value)
|> force_change_attribute(attribute.name, default_value)
|> Map.update!(:defaults, fn defaults ->
[attribute.name | defaults]
end)
@ -3520,7 +3573,8 @@ defmodule Ash.Changeset do
@doc "Returns true if an attribute exists in the changes"
@spec changing_attribute?(t(), atom) :: boolean
def changing_attribute?(changeset, attribute) do
Map.has_key?(changeset.attributes, attribute)
Map.has_key?(changeset.attributes, attribute) ||
Keyword.has_key?(changeset.atomics, attribute)
end
@doc "Returns true if a relationship exists in the changes"
@ -3866,20 +3920,6 @@ defmodule Ash.Changeset do
end
end
@doc "Calls `unsafe_change_attribute/3` for each key/value pair provided."
@spec unsafe_change_attributes(t(), map | Keyword.t()) :: t()
def unsafe_change_attributes(changeset, changes) do
Enum.reduce(changes, changeset, fn {key, value}, changeset ->
unsafe_change_attribute(changeset, key, value)
end)
end
@doc "Changes an attribute even if it isn't writable, doing no type casting or validation"
@spec unsafe_change_attribute(t(), atom, any) :: t()
def unsafe_change_attribute(changeset, attribute, value) do
%{changeset | attributes: Map.put(changeset.attributes, attribute, value)}
end
@doc """
Adds a before_action hook to the changeset.

View file

@ -2,20 +2,31 @@ defmodule Ash.Error.Invalid.NonStreamableAction do
@moduledoc "Used when Api.stream is used with an action that does not support keyset pagination"
use Ash.Error.Exception
def_ash_error([:resource, :action], class: :invalid)
def_ash_error([:resource, :action, :for_bulk_update], class: :invalid)
defimpl Ash.ErrorKind do
def id(_), do: Ash.UUID.generate()
def code(_), do: "non_streamable_action"
def message(%{for_bulk_update: action} = error) when not is_nil(action) do
"""
You are attempting to pair read action #{error.action.name} with bulk update
action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not support streaming.
To enable it, keyset pagination to the action #{error.action.name}:
pagination keyset?: true, required?: false
"""
end
def message(error) do
"""
Action #{inspect(error.resource)}.#{error.action.name} does not support streaming.
To enable it, keyset pagination to the action #{error.action.name}:
pagination keyset?: true
pagination keyset?: true, required?: false
"""
end
end

View file

@ -135,7 +135,7 @@ defmodule Ash.Resource.Change do
| {:non_atomic, Ash.Changeset.t()}
| {:error, term()}
@callback after_atomic(Ash.Changeset.t(), Keyword.t(), Ash.Resource.record()) ::
@callback after_atomic(Ash.Changeset.t(), Keyword.t(), Ash.Resource.record(), context()) ::
{:ok, Ash.Resource.record()} | {:error, term()}
@optional_callbacks before_batch: 3,
@ -143,7 +143,7 @@ defmodule Ash.Resource.Change do
batch_change: 3,
change: 3,
atomic: 3,
after_atomic: 3
after_atomic: 4
defmacro __using__(_) do
quote do

View file

@ -48,7 +48,8 @@ defmodule Ash.Resource.Validation do
@callback atomic?() :: boolean
@callback atomic(changeset :: Ash.Changeset.t(), opts :: Keyword.t()) ::
:ok
| {:atomic, condition_expr :: Ash.Expr.t(), error_expr :: Ash.Expr.t()}
| {:atomic, involved_fields :: list(atom), condition_expr :: Ash.Expr.t(),
error_expr :: Ash.Expr.t()}
| :not_atomic
| {:error, term()}

View file

@ -22,7 +22,7 @@ defmodule Ash.Resource.Validation.AttributeEquals do
def atomic(changeset, opts) do
field_value = Ash.Changeset.atomic_ref(changeset, opts[:attribute])
{:atomic, Ash.Expr.expr(^field_value != ^opts[:value]),
{:atomic, [opts[:attribute]], Ash.Expr.expr(^field_value != ^opts[:value]),
Ash.Expr.expr(
error(^InvalidAttribute, %{
field: ^opts[:attribute],

View file

@ -86,7 +86,7 @@ defmodule Ash.Test.Actions.BulkCreateTest do
identities do
identity :unique_title, :title do
pre_check_with Ash.Test.Actions.BulkCreateTest
pre_check_with Ash.Test.Actions.BulkCreateTest.Api
end
end

View file

@ -0,0 +1,442 @@
defmodule Ash.Test.Actions.BulkUpdateTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule AddAfterToTitle do
use Ash.Resource.Change
def change(changeset, _, %{bulk?: true}) do
changeset
end
def after_batch(results, _, _) do
Stream.map(results, fn {_changeset, result} ->
{:ok, %{result | title: result.title <> "_after"}}
end)
end
end
defmodule AddBeforeToTitle do
use Ash.Resource.Change
def change(changeset, _, %{bulk?: true}) do
changeset
end
def before_batch(changesets, _, _) do
changesets
|> Stream.map(fn changeset ->
title = Ash.Changeset.get_attribute(changeset, :title)
Ash.Changeset.force_change_attribute(changeset, :title, "before_" <> title)
end)
end
end
defmodule Post do
@moduledoc false
use Ash.Resource,
data_layer: Ash.DataLayer.Ets,
authorizers: [Ash.Policy.Authorizer]
ets do
private? true
end
actions do
defaults [:create, :read, :update, :destroy]
update :update_with_change do
change fn changeset, _ ->
title = Ash.Changeset.get_attribute(changeset, :title)
Ash.Changeset.force_change_attribute(changeset, :title, title <> "_stuff")
end
end
update :update_with_argument do
argument :a_title, :string do
allow_nil? false
end
change set_attribute(:title2, arg(:a_title))
end
update :update_with_after_action do
change after_action(fn _changeset, result ->
{:ok, %{result | title: result.title <> "_stuff"}}
end)
end
update :update_with_after_batch do
change AddAfterToTitle
change AddBeforeToTitle
end
update :update_with_after_transaction do
change after_transaction(fn _changeset, {:ok, result} ->
{:ok, %{result | title: result.title <> "_stuff"}}
end)
end
update :update_with_policy do
argument :authorize?, :boolean, allow_nil?: false
change set_context(%{authorize?: arg(:authorize?)})
end
end
identities do
identity :unique_title, :title do
pre_check_with Ash.Test.Actions.BulkUpdateTest.Api
end
end
policies do
policy action(:update_with_policy) do
authorize_if context_equals(:authorize?, true)
end
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
attribute :title2, :string
attribute :title3, :string
timestamps()
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Post
end
end
test "returns updated records" do
assert %Ash.BulkResult{records: [%{title2: "updated value"}, %{title2: "updated value"}]} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update!(:update, %{title2: "updated value"},
resource: Post,
return_records?: true,
return_errors?: true
)
end
test "runs changes" do
assert %Ash.BulkResult{
records: [
%{title: "title1_stuff", title2: "updated value"},
%{title: "title2_stuff", title2: "updated value"}
]
} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update!(:update_with_change, %{title2: "updated value"},
resource: Post,
return_records?: true,
return_errors?: true
)
|> Map.update!(:records, fn records ->
Enum.sort_by(records, & &1.title)
end)
end
test "accepts arguments" do
assert %Ash.BulkResult{
records: [
%{title: "title1", title2: "updated value"},
%{title: "title2", title2: "updated value"}
]
} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update!(:update_with_argument, %{a_title: "updated value"},
resource: Post,
return_records?: true,
return_errors?: true
)
|> Map.update!(:records, fn records ->
Enum.sort_by(records, & &1.title)
end)
end
test "runs after batch hooks" do
assert %Ash.BulkResult{
records: [
%{title: "before_title1_after", title2: "updated value"},
%{title: "before_title2_after", title2: "updated value"}
]
} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update!(:update_with_after_batch, %{title2: "updated value"},
resource: Post,
return_records?: true,
return_errors?: true
)
|> Map.update!(:records, fn records ->
Enum.sort_by(records, & &1.title)
end)
end
test "will return error count" do
assert %Ash.BulkResult{
error_count: 2
} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update(:update, %{title2: %{invalid: :value}},
resource: Post,
return_records?: true
)
end
test "will return errors on request" do
assert %Ash.BulkResult{
error_count: 1,
errors: [%Ash.Changeset{}]
} =
Api.bulk_create!([%{title: "title1"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update(:update, %{title2: %{invalid: :value}},
resource: Post,
return_errors?: true
)
end
test "runs after action hooks" do
assert %Ash.BulkResult{
records: [
%{title: "title1_stuff", title2: "updated value"},
%{title: "title2_stuff", title2: "updated value"}
]
} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update!(:update_with_after_action, %{title2: "updated value"},
resource: Post,
return_records?: true,
return_errors?: true
)
|> Map.update!(:records, fn records ->
Enum.sort_by(records, & &1.title)
end)
end
test "runs after transaction hooks" do
assert %Ash.BulkResult{
records: [
%{title: "title1_stuff", title2: "updated value"},
%{title: "title2_stuff", title2: "updated value"}
]
} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update!(:update_with_after_transaction, %{title2: "updated value"},
resource: Post,
return_records?: true,
return_errors?: true
)
|> Map.update!(:records, fn records ->
Enum.sort_by(records, & &1.title)
end)
end
describe "authorization" do
test "policy success results in successes" do
assert %Ash.BulkResult{records: [_, _], errors: []} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update(
:update_with_policy,
%{title2: "updated value", authorize?: true},
authorize?: true,
resource: Post,
return_records?: true,
return_errors?: true
)
end
test "policy failure results in failures" do
assert %Ash.BulkResult{errors: [_, _], records: []} =
Api.bulk_create!([%{title: "title1"}, %{title: "title2"}], Post, :create,
return_stream?: true,
return_records?: true
)
|> Stream.map(fn {:ok, result} ->
result
end)
|> Api.bulk_update(
:update_with_policy,
%{title2: "updated value", authorize?: false},
authorize?: true,
resource: Post,
return_records?: true,
return_errors?: true
)
end
end
# describe "streaming" do
# test "by default nothing is returned in the stream" do
# assert [] =
# [%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
# |> Api.bulk_create!(
# Post,
# :create_with_policy,
# authorize?: true,
# return_stream?: true
# )
# |> Enum.to_list()
# end
# test "by returning notifications, you get the notifications in the stream" do
# assert [{:notification, _}, {:notification, _}] =
# [%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
# |> Api.bulk_create!(
# Post,
# :create_with_policy,
# authorize?: true,
# return_stream?: true,
# notify?: true,
# return_notifications?: true
# )
# |> Enum.to_list()
# end
# test "by returning records, you get the records in the stream" do
# assert [{:ok, %{title: "title1"}}, {:ok, %{title: "title2"}}] =
# [%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
# |> Api.bulk_create!(
# Post,
# :create_with_policy,
# authorize?: true,
# return_stream?: true,
# return_records?: true
# )
# |> Enum.to_list()
# |> Enum.sort_by(fn
# {:ok, v} ->
# v.title
# _ ->
# nil
# end)
# end
# test "by returning notifications and records, you get them both in the stream" do
# assert [
# {:notification, _},
# {:notification, _},
# {:ok, %{title: "title1"}},
# {:ok, %{title: "title2"}}
# ] =
# [%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
# |> Api.bulk_create!(
# Post,
# :create_with_policy,
# authorize?: true,
# notify?: true,
# return_stream?: true,
# return_notifications?: true,
# return_records?: true
# )
# |> Enum.to_list()
# |> Enum.sort_by(fn
# {:ok, v} ->
# v.title
# {:notification, _} ->
# true
# _ ->
# nil
# end)
# end
# test "any errors are also returned in the stream" do
# assert [
# {:error, %Ash.Changeset{}},
# {:notification, _},
# {:ok, %{title: "title1"}}
# ] =
# [
# %{title: "title1", authorize?: true},
# %{title: "title2", authorize?: false}
# ]
# |> Api.bulk_create!(
# Post,
# :create_with_policy,
# authorize?: true,
# notify?: true,
# return_stream?: true,
# return_notifications?: true,
# return_records?: true,
# return_errors?: true
# )
# |> Enum.to_list()
# |> Enum.sort_by(fn
# {:ok, v} ->
# v.title
# {:notification, _} ->
# true
# {:error, _} ->
# false
# _ ->
# nil
# end)
# end
# end
end