imorovement: support bulk action callbacks in the data layer

This commit is contained in:
Zach Daniel 2023-12-31 00:49:22 -05:00
parent 56387d40e0
commit fe2156a9ac
15 changed files with 399 additions and 57 deletions

View file

@ -57,7 +57,7 @@ See `Ash.Changeset` for more information.
A packaged bundle of code that can be included in a resource to provide additional functionality. Built-in functionality such as the resource DSL itself is provided by an extension, and libraries like AshPostgres and AshAdmin also provide extensions that you can add to your resources with just one line of code.
See [Extending Resources](/documentation/tutorials/4-extending-resources.md) for more information.
See [Extending Resources](/documentation/tutorials/extending-resources.md) for more information.
## Filter

View file

@ -4,7 +4,7 @@
> #### HexDocs {: .tip}
>
> Hexdocs does not support multi-package search. To assist with this, we provide a mirror of this documentation at [ash-hq.org](https://ash-hq.org). Use Ctrl+K or Cmd+K to search all packages on that site. For the best way to use the hex documentation, see the [hexdocs guide](/documentation/tutorials/5-using-hexdocs.md).
> Hexdocs does not support multi-package search. To assist with this, we provide a mirror of this documentation at [ash-hq.org](https://ash-hq.org). Use Ctrl+K or Cmd+K to search all packages on that site. For the best way to use the hex documentation, see the [hexdocs guide](/documentation/tutorials/using-hexdocs.md).
<!--- ash-hq-hide-stop --> <!--- -->
@ -31,8 +31,8 @@ In this guide we will:
## Things you may want to read first
- [Install Elixir](https://elixir-lang.org/install.html)
- [Philosophy Guide](/documentation/tutorials/2-philosophy.md)
- [Using Hexdocs](/documentation/tutorials/5-using-hexdocs.md)
- [Philosophy Guide](/documentation/tutorials/philosophy.md)
- [Using Hexdocs](/documentation/tutorials/using-hexdocs.md)
## Requirements

View file

@ -5,6 +5,11 @@ defmodule Ash.Actions.Destroy.Bulk do
| {:ok, [Ash.Resource.record()]}
| {:ok, [Ash.Resource.record()], [Ash.Notifier.Notification.t()]}
| {:error, term}
def run(api, resource, action, input, opts) when is_atom(resource) do
run(api, Ash.Query.new(resource), action, input, opts)
end
def run(api, %Ash.Query{} = query, action, input, opts) do
query =
if query.action do
@ -21,25 +26,77 @@ defmodule Ash.Actions.Destroy.Bulk do
query
end
if !query.action.pagination || !query.action.pagination.keyset? do
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action,
for_bulk_destroy: action.name
query = %{query | api: api}
fully_atomic_changeset =
if Ash.DataLayer.data_layer_can?(query.resource, :destroy_query) do
Ash.Changeset.fully_atomic_changeset(query.resource, action, input, opts)
else
:not_atomic
end
case fully_atomic_changeset do
:not_atomic ->
read_opts =
opts
|> Keyword.drop([
:resource,
:stream_batch_size,
:batch_size,
:stream_with,
:allow_stream_with
])
|> Keyword.put(:batch_size, opts[:stream_batch_size])
run(
api,
api.stream!(query, read_opts),
action,
input,
Keyword.put(opts, :resource, query.resource)
)
%Ash.Changeset{valid?: false, errors: errors} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(errors)]
}
atomic_changeset ->
with {:ok, query} <- authorize_bulk_query(query, opts),
{:ok, atomic_changeset, query} <-
authorize_atomic_changeset(query, atomic_changeset, opts),
{:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do
case Ash.DataLayer.destroy_query(
data_layer_query,
atomic_changeset,
Map.new(Keyword.take(opts, [:return_records?, :tenant]))
) do
:ok ->
%Ash.BulkResult{
status: :success
}
{:ok, results} ->
%Ash.BulkResult{
status: :success,
records: results
}
{:error, error} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(error)]
}
end
else
{:error, error} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(error)]
}
end
end
read_opts =
opts
|> Keyword.drop([:resource, :stream_batch_size, :batch_size])
|> Keyword.put(:batch_size, opts[:stream_batch_size])
run(
api,
api.stream!(query, read_opts),
action,
input,
Keyword.put(opts, :resource, query.resource)
)
end
def run(api, stream, action, input, opts) do
@ -285,6 +342,58 @@ defmodule Ash.Actions.Destroy.Bulk do
|> Ash.Changeset.set_arguments(arguments)
end
defp authorize_bulk_query(query, opts) do
if opts[:authorize?] do
case query.api.can(query, opts[:actor],
return_forbidden_error?: true,
maybe_is: false,
modify_source?: true
) do
{:ok, true} ->
{:ok, query}
{:ok, true, query} ->
{:ok, query}
{:ok, false, error} ->
{:error, error}
{:error, error} ->
{:error, error}
end
else
{:ok, query}
end
end
defp authorize_atomic_changeset(query, changeset, opts) do
if opts[:authorize?] do
case query.api.can(query, opts[:actor],
return_forbidden_error?: true,
maybe_is: false,
modify_source?: true,
base_query: query
) do
{:ok, true} ->
{:ok, changeset, query}
{:ok, true, %Ash.Query{} = query} ->
{:ok, changeset, query}
{:ok, true, %Ash.Changeset{} = changeset, %Ash.Query{} = query} ->
{:ok, changeset, query}
{:ok, false, error} ->
{:error, error}
{:error, error} ->
{:error, error}
end
else
{:ok, changeset, query}
end
end
defp pre_template_all_changes(action, resource, :destroy, base, actor) do
action.changes
|> Enum.concat(Ash.Resource.Info.validations(resource, action.type))

View file

@ -5,6 +5,10 @@ defmodule Ash.Actions.Update.Bulk do
| {:ok, [Ash.Resource.record()]}
| {:ok, [Ash.Resource.record()], [Ash.Notifier.Notification.t()]}
| {:error, term}
def run(api, resource, action, input, opts) when is_atom(resource) do
run(api, Ash.Query.new(resource), action, input, opts)
end
def run(api, %Ash.Query{} = query, action, input, opts) do
query =
if query.action do
@ -21,25 +25,78 @@ defmodule Ash.Actions.Update.Bulk do
query
end
if !query.action.pagination || !query.action.pagination.keyset? do
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action,
for_bulk_update: action.name
query = %{query | api: api}
fully_atomic_changeset =
if Ash.DataLayer.data_layer_can?(query.resource, :update_query) do
Ash.Changeset.fully_atomic_changeset(query.resource, action, input, opts)
else
:not_atomic
end
case fully_atomic_changeset do
:not_atomic ->
read_opts =
opts
|> Keyword.drop([
:resource,
:atomic_update,
:stream_batch_size,
:batch_size,
:stream_with,
:allow_stream_with
])
|> Keyword.put(:batch_size, opts[:stream_batch_size])
run(
api,
api.stream!(query, read_opts),
action,
input,
Keyword.put(opts, :resource, query.resource)
)
%Ash.Changeset{valid?: false, errors: errors} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(errors)]
}
atomic_changeset ->
with {:ok, query} <- authorize_bulk_query(query, opts),
{:ok, atomic_changeset, query} <-
authorize_atomic_changeset(query, atomic_changeset, opts),
{:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do
case Ash.DataLayer.update_query(
data_layer_query,
atomic_changeset,
Map.new(Keyword.take(opts, [:return_records?, :tenant]))
) do
:ok ->
%Ash.BulkResult{
status: :success
}
{:ok, results} ->
%Ash.BulkResult{
status: :success,
records: results
}
{:error, error} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(error)]
}
end
else
{:error, error} ->
%Ash.BulkResult{
status: :error,
errors: [Ash.Error.to_error_class(error)]
}
end
end
read_opts =
opts
|> Keyword.drop([:resource, :atomic_update, :stream_batch_size, :batch_size])
|> Keyword.put(:batch_size, opts[:stream_batch_size])
run(
api,
api.stream!(query, read_opts),
action,
input,
Keyword.put(opts, :resource, query.resource)
)
end
def run(api, stream, action, input, opts) do
@ -270,6 +327,58 @@ defmodule Ash.Actions.Update.Bulk do
end
end
defp authorize_bulk_query(query, opts) do
if opts[:authorize?] do
case query.api.can(query, opts[:actor],
return_forbidden_error?: true,
maybe_is: false,
modify_source?: true
) do
{:ok, true} ->
{:ok, query}
{:ok, true, query} ->
{:ok, query}
{:ok, false, error} ->
{:error, error}
{:error, error} ->
{:error, error}
end
else
{:ok, query}
end
end
defp authorize_atomic_changeset(query, changeset, opts) do
if opts[:authorize?] do
case query.api.can(query, opts[:actor],
return_forbidden_error?: true,
maybe_is: false,
modify_source?: true,
base_query: query
) do
{:ok, true} ->
{:ok, changeset, query}
{:ok, true, %Ash.Query{} = query} ->
{:ok, changeset, query}
{:ok, true, %Ash.Changeset{} = changeset, %Ash.Query{} = query} ->
{:ok, changeset, query}
{:ok, false, error} ->
{:error, error}
{:error, error} ->
{:error, error}
end
else
{:ok, changeset, query}
end
end
defp base_changeset(resource, api, opts, action, input) do
arguments =
Enum.reduce(input, %{}, fn {key, value}, acc ->

View file

@ -955,7 +955,7 @@ defmodule Ash.Api do
authorizers ->
authorizers
|> Enum.reduce_while(
{false, nil},
{false, opts[:base_query]},
fn {authorizer, authorizer_state, context}, {_authorized?, query} ->
case authorizer.strict_check(authorizer_state, context) do
{:error, %{class: :forbidden} = e} when is_exception(e) ->
@ -1441,6 +1441,9 @@ defmodule Ash.Api do
- `alter_source?` - If true, the query or changeset will be returned with authorization modifications made. For a query,
this mans adding field visibility calculations and altering the filter or the sort. For a changeset, this means only adding
field visibility calculations. The default value is `false`.
- `base_query` - If authorizing an update, some cases can return both a new changeset and a query filtered for only things
that will be authorized to update. Providing the `base_query` will cause that query to be altered instead of a new one to be
generated.
"""
@callback can(
@ -1653,6 +1656,8 @@ defmodule Ash.Api do
or `after_action` hooks that can operate on the entire list at once. See the documentation for that callback for more on
how to do accomplish that.
## Options
#{Spark.OptionsHelpers.docs(@bulk_create_opts_schema)}
"""
@callback bulk_create(
@ -1682,8 +1687,14 @@ defmodule Ash.Api do
@doc """
Updates all items in the provided enumerable or query with the provided input.
Currently, this streams each record and updates it. Soon, this will use special data layer
callbacks to run these update statements in a single query.
If the data layer supports updating from a query, and the update action can be done fully atomically,
it will be updated in a single pass using the data layer.
Otherwise, this will stream each record and update it.
## Options
#{Spark.OptionsHelpers.docs(@bulk_update_opts_schema)}
"""
@callback bulk_update(
Enumerable.t(Ash.Resource.record()) | Ash.Query.t(),
@ -1707,8 +1718,14 @@ defmodule Ash.Api do
@doc """
Destroys all items in the provided enumerable or query with the provided input.
Currently, this streams each record and destroys it. Soon, this will use special data layer
callbacks to run these update statements in a single query.
If the data layer supports destroying from a query, and the destroy action can be done fully atomically,
it will be updated in a single pass using the data layer.
Otherwise, this will stream each record and update it.
## Options
#{Spark.OptionsHelpers.docs(@bulk_destroy_opts_schema)}
"""
@callback bulk_destroy(
Enumerable.t(Ash.Resource.record()) | Ash.Query.t(),

View file

@ -507,8 +507,14 @@ defmodule Ash.Changeset do
opts
)
with %Ash.Changeset{} = changeset <- atomic_params(changeset, action, params) do
atomic_changes(changeset, action)
with %Ash.Changeset{} = changeset <-
atomic_update(changeset, opts[:atomic_update] || []),
%Ash.Changeset{} = changeset <- atomic_params(changeset, action, params),
%Ash.Changeset{} = changeset <- atomic_changes(changeset, action) do
hydrate_atomic_refs(changeset, opts[:actor])
else
_ ->
:not_atomic
end
end
@ -625,7 +631,10 @@ defmodule Ash.Changeset do
"""
def atomic_ref(changeset, field) do
if base_value = changeset.atomics[field] do
base_value
%{type: type, constraints: constraints} =
Ash.Resource.Info.attribute(changeset.resource, field)
Ash.Expr.expr(type(^base_value, ^type, ^constraints))
else
Ash.Expr.expr(ref(^field))
end
@ -1565,6 +1574,28 @@ defmodule Ash.Changeset do
{key, expr}
end)
}
|> add_known_atomic_errors()
end
defp add_known_atomic_errors(changeset) do
Enum.reduce(changeset.atomics, changeset, fn
{_,
%Ash.Query.Function.Error{
arguments: [exception, input]
}},
changeset ->
if Ash.Filter.TemplateHelpers.expr?(input) do
changeset
else
add_error(
changeset,
Ash.Error.from_json(exception, Jason.decode!(Jason.encode!(input)))
)
end
_other, changeset ->
changeset
end)
end
@doc false

View file

@ -87,6 +87,9 @@ defmodule Ash.DataLayer do
| :aggregate_sort
| :boolean_filter
| :async_engine
| :bulk_create
| :update_query
| :destroy_query
| :create
| :read
| :update
@ -166,7 +169,7 @@ defmodule Ash.DataLayer do
@callback return_query(data_layer_query(), Ash.Resource.t()) ::
{:ok, data_layer_query()} | {:error, term}
@type bulk_options :: %{
@type bulk_create_options :: %{
batch_size: pos_integer,
return_records?: boolean,
upsert?: boolean,
@ -180,20 +183,49 @@ defmodule Ash.DataLayer do
tenant: String.t() | nil
}
@type bulk_update_options :: %{
return_records?: boolean,
tenant: String.t() | nil
}
@callback bulk_create(
Ash.Resource.t(),
Enumerable.t(Ash.Changeset.t()),
options :: bulk_options
options :: bulk_create_options
) ::
{:ok, Enumerable.t(:ok | {:ok, Ash.Resource.record()} | {:error, Ash.Error.t()})}
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}
| {:error, Ash.Error.t()}
| {:error, :no_rollback, term}
| {:error, :no_rollback, Ash.Error.t()}
@callback create(Ash.Resource.t(), Ash.Changeset.t()) ::
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
@callback upsert(Ash.Resource.t(), Ash.Changeset.t(), list(atom)) ::
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
@callback update(Ash.Resource.t(), Ash.Changeset.t()) ::
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
@callback update_query(
data_layer_query(),
Ash.Changeset.t(),
Ash.Resource.t(),
opts :: bulk_update_options()
) ::
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}
| {:error, Ash.Error.t()}
| {:error, :no_rollback, Ash.Error.t()}
@callback destroy_query(
data_layer_query(),
Ash.Changeset.t(),
Ash.Resource.t(),
opts :: bulk_update_options()
) ::
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}
| {:error, Ash.Error.t()}
| {:error, :no_rollback, Ash.Error.t()}
@callback add_aggregate(
data_layer_query(),
Ash.Query.Aggregate.t(),
@ -240,6 +272,8 @@ defmodule Ash.DataLayer do
@optional_callbacks source: 1,
run_query: 2,
bulk_create: 3,
update_query: 4,
destroy_query: 4,
distinct: 3,
return_query: 2,
lock: 3,
@ -383,6 +417,34 @@ defmodule Ash.DataLayer do
Ash.DataLayer.data_layer(resource).update(resource, changeset)
end
@spec update_query(data_layer_query(), Ash.Changeset.t(), opts :: bulk_update_options()) ::
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}
| {:error, Ash.Error.t()}
| {:error, :no_rollback, Ash.Error.t()}
def update_query(query, changeset, opts) do
Ash.DataLayer.data_layer(changeset.resource).update_query(
query,
changeset,
changeset.resource,
opts
)
end
@spec destroy_query(data_layer_query(), Ash.Changeset.t(), opts :: bulk_update_options()) ::
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}
| {:error, Ash.Error.t()}
| {:error, :no_rollback, Ash.Error.t()}
def destroy_query(query, changeset, opts) do
Ash.DataLayer.data_layer(changeset.resource).destroy_query(
query,
changeset,
changeset.resource,
opts
)
end
@spec create(Ash.Resource.t(), Ash.Changeset.t()) ::
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
def create(resource, changeset) do
@ -404,7 +466,7 @@ defmodule Ash.DataLayer do
@spec bulk_create(
Ash.Resource.t(),
Enumerable.t(Ash.Changeset.t()),
options :: bulk_options
options :: bulk_create_options
) ::
:ok
| {:ok, Enumerable.t(Ash.Resource.record())}

View file

@ -51,7 +51,7 @@ defmodule Ash.Error.Exception do
%{
__struct__: Ash.Error.Stacktrace,
stacktrace: Enum.drop(stacktrace, 2)
stacktrace: Enum.drop(stacktrace, 4)
}
end)

View file

@ -10,6 +10,6 @@ defmodule Ash.Query.Function.Error do
def args, do: [[:atom, :any]]
def evaluate(%{arguments: [exception, input]}) do
{:error, exception.exception(input)}
{:error, Ash.Error.from_json(exception, Jason.decode!(Jason.encode!(Map.new(input))))}
end
end

View file

@ -4,6 +4,7 @@ defmodule Ash.Resource.Validation.AttributeDoesNotEqual do
use Ash.Resource.Validation
alias Ash.Error.Changes.InvalidAttribute
require Ash.Expr
@opt_schema [
attribute: [
@ -43,6 +44,21 @@ defmodule Ash.Resource.Validation.AttributeDoesNotEqual do
end
end
@impl true
def atomic(changeset, opts) do
field_value = Ash.Changeset.atomic_ref(changeset, opts[:attribute])
{:atomic, [opts[:attribute]], Ash.Expr.expr(^field_value == ^opts[:value]),
Ash.Expr.expr(
error(^InvalidAttribute, %{
field: ^opts[:attribute],
value: ^field_value,
message: "must not equal %{value}",
vars: %{field: ^opts[:attribute], value: ^opts[:value]}
})
)}
end
@impl true
def describe(opts) do
[

View file

@ -44,8 +44,6 @@ defmodule Ash.MixProject do
|> Path.basename(".md")
|> Path.basename(".livemd")
|> Path.basename(".cheatmd")
# We want to keep permalinks, so we remove the sorting number
|> String.replace(~r/^\d+\-/, "")
title =
html_filename