From fe2156a9ac9e49cae44132552ce6e52aee14104c Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Sun, 31 Dec 2023 00:49:22 -0500 Subject: [PATCH] imorovement: support bulk action callbacks in the data layer --- documentation/topics/glossary.md | 2 +- ...ng-resources.md => extending-resources.md} | 0 .../{1-get-started.md => get-started.md} | 6 +- .../{2-philosophy.md => philosophy.md} | 0 .../{5-using-hexdocs.md => using-hexdocs.md} | 0 .../tutorials/{3-why-ash.md => why-ash.md} | 0 lib/ash/actions/destroy/bulk.ex | 145 +++++++++++++++--- lib/ash/actions/update/bulk.ex | 145 +++++++++++++++--- lib/ash/api/api.ex | 27 +++- lib/ash/changeset/changeset.ex | 37 ++++- lib/ash/data_layer/data_layer.ex | 72 ++++++++- lib/ash/error/exception.ex | 2 +- lib/ash/query/function/error.ex | 2 +- .../validation/attribute_does_not_equal.ex | 16 ++ mix.exs | 2 - 15 files changed, 399 insertions(+), 57 deletions(-) rename documentation/tutorials/{4-extending-resources.md => extending-resources.md} (100%) rename documentation/tutorials/{1-get-started.md => get-started.md} (99%) rename documentation/tutorials/{2-philosophy.md => philosophy.md} (100%) rename documentation/tutorials/{5-using-hexdocs.md => using-hexdocs.md} (100%) rename documentation/tutorials/{3-why-ash.md => why-ash.md} (100%) diff --git a/documentation/topics/glossary.md b/documentation/topics/glossary.md index 90ed02d4..249409be 100644 --- a/documentation/topics/glossary.md +++ b/documentation/topics/glossary.md @@ -57,7 +57,7 @@ See `Ash.Changeset` for more information. A packaged bundle of code that can be included in a resource to provide additional functionality. Built-in functionality such as the resource DSL itself is provided by an extension, and libraries like AshPostgres and AshAdmin also provide extensions that you can add to your resources with just one line of code. -See [Extending Resources](/documentation/tutorials/4-extending-resources.md) for more information. +See [Extending Resources](/documentation/tutorials/extending-resources.md) for more information. ## Filter diff --git a/documentation/tutorials/4-extending-resources.md b/documentation/tutorials/extending-resources.md similarity index 100% rename from documentation/tutorials/4-extending-resources.md rename to documentation/tutorials/extending-resources.md diff --git a/documentation/tutorials/1-get-started.md b/documentation/tutorials/get-started.md similarity index 99% rename from documentation/tutorials/1-get-started.md rename to documentation/tutorials/get-started.md index 8b41eb28..2541022d 100644 --- a/documentation/tutorials/1-get-started.md +++ b/documentation/tutorials/get-started.md @@ -4,7 +4,7 @@ > #### HexDocs {: .tip} > -> Hexdocs does not support multi-package search. To assist with this, we provide a mirror of this documentation at [ash-hq.org](https://ash-hq.org). Use Ctrl+K or Cmd+K to search all packages on that site. For the best way to use the hex documentation, see the [hexdocs guide](/documentation/tutorials/5-using-hexdocs.md). +> Hexdocs does not support multi-package search. To assist with this, we provide a mirror of this documentation at [ash-hq.org](https://ash-hq.org). Use Ctrl+K or Cmd+K to search all packages on that site. For the best way to use the hex documentation, see the [hexdocs guide](/documentation/tutorials/using-hexdocs.md). @@ -31,8 +31,8 @@ In this guide we will: ## Things you may want to read first - [Install Elixir](https://elixir-lang.org/install.html) -- [Philosophy Guide](/documentation/tutorials/2-philosophy.md) -- [Using Hexdocs](/documentation/tutorials/5-using-hexdocs.md) +- [Philosophy Guide](/documentation/tutorials/philosophy.md) +- [Using Hexdocs](/documentation/tutorials/using-hexdocs.md) ## Requirements diff --git a/documentation/tutorials/2-philosophy.md b/documentation/tutorials/philosophy.md similarity index 100% rename from documentation/tutorials/2-philosophy.md rename to documentation/tutorials/philosophy.md diff --git a/documentation/tutorials/5-using-hexdocs.md b/documentation/tutorials/using-hexdocs.md similarity index 100% rename from documentation/tutorials/5-using-hexdocs.md rename to documentation/tutorials/using-hexdocs.md diff --git a/documentation/tutorials/3-why-ash.md b/documentation/tutorials/why-ash.md similarity index 100% rename from documentation/tutorials/3-why-ash.md rename to documentation/tutorials/why-ash.md diff --git a/lib/ash/actions/destroy/bulk.ex b/lib/ash/actions/destroy/bulk.ex index d6550f28..2afa01ec 100644 --- a/lib/ash/actions/destroy/bulk.ex +++ b/lib/ash/actions/destroy/bulk.ex @@ -5,6 +5,11 @@ defmodule Ash.Actions.Destroy.Bulk do | {:ok, [Ash.Resource.record()]} | {:ok, [Ash.Resource.record()], [Ash.Notifier.Notification.t()]} | {:error, term} + + def run(api, resource, action, input, opts) when is_atom(resource) do + run(api, Ash.Query.new(resource), action, input, opts) + end + def run(api, %Ash.Query{} = query, action, input, opts) do query = if query.action do @@ -21,25 +26,77 @@ defmodule Ash.Actions.Destroy.Bulk do query end - if !query.action.pagination || !query.action.pagination.keyset? do - raise Ash.Error.Invalid.NonStreamableAction, - resource: query.resource, - action: query.action, - for_bulk_destroy: action.name + query = %{query | api: api} + + fully_atomic_changeset = + if Ash.DataLayer.data_layer_can?(query.resource, :destroy_query) do + Ash.Changeset.fully_atomic_changeset(query.resource, action, input, opts) + else + :not_atomic + end + + case fully_atomic_changeset do + :not_atomic -> + read_opts = + opts + |> Keyword.drop([ + :resource, + :stream_batch_size, + :batch_size, + :stream_with, + :allow_stream_with + ]) + |> Keyword.put(:batch_size, opts[:stream_batch_size]) + + run( + api, + api.stream!(query, read_opts), + action, + input, + Keyword.put(opts, :resource, query.resource) + ) + + %Ash.Changeset{valid?: false, errors: errors} -> + %Ash.BulkResult{ + status: :error, + errors: [Ash.Error.to_error_class(errors)] + } + + atomic_changeset -> + with {:ok, query} <- authorize_bulk_query(query, opts), + {:ok, atomic_changeset, query} <- + authorize_atomic_changeset(query, atomic_changeset, opts), + {:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do + case Ash.DataLayer.destroy_query( + data_layer_query, + atomic_changeset, + Map.new(Keyword.take(opts, [:return_records?, :tenant])) + ) do + :ok -> + %Ash.BulkResult{ + status: :success + } + + {:ok, results} -> + %Ash.BulkResult{ + status: :success, + records: results + } + + {:error, error} -> + %Ash.BulkResult{ + status: :error, + errors: [Ash.Error.to_error_class(error)] + } + end + else + {:error, error} -> + %Ash.BulkResult{ + status: :error, + errors: [Ash.Error.to_error_class(error)] + } + end end - - read_opts = - opts - |> Keyword.drop([:resource, :stream_batch_size, :batch_size]) - |> Keyword.put(:batch_size, opts[:stream_batch_size]) - - run( - api, - api.stream!(query, read_opts), - action, - input, - Keyword.put(opts, :resource, query.resource) - ) end def run(api, stream, action, input, opts) do @@ -285,6 +342,58 @@ defmodule Ash.Actions.Destroy.Bulk do |> Ash.Changeset.set_arguments(arguments) end + defp authorize_bulk_query(query, opts) do + if opts[:authorize?] do + case query.api.can(query, opts[:actor], + return_forbidden_error?: true, + maybe_is: false, + modify_source?: true + ) do + {:ok, true} -> + {:ok, query} + + {:ok, true, query} -> + {:ok, query} + + {:ok, false, error} -> + {:error, error} + + {:error, error} -> + {:error, error} + end + else + {:ok, query} + end + end + + defp authorize_atomic_changeset(query, changeset, opts) do + if opts[:authorize?] do + case query.api.can(query, opts[:actor], + return_forbidden_error?: true, + maybe_is: false, + modify_source?: true, + base_query: query + ) do + {:ok, true} -> + {:ok, changeset, query} + + {:ok, true, %Ash.Query{} = query} -> + {:ok, changeset, query} + + {:ok, true, %Ash.Changeset{} = changeset, %Ash.Query{} = query} -> + {:ok, changeset, query} + + {:ok, false, error} -> + {:error, error} + + {:error, error} -> + {:error, error} + end + else + {:ok, changeset, query} + end + end + defp pre_template_all_changes(action, resource, :destroy, base, actor) do action.changes |> Enum.concat(Ash.Resource.Info.validations(resource, action.type)) diff --git a/lib/ash/actions/update/bulk.ex b/lib/ash/actions/update/bulk.ex index 82ea6935..6f58610d 100644 --- a/lib/ash/actions/update/bulk.ex +++ b/lib/ash/actions/update/bulk.ex @@ -5,6 +5,10 @@ defmodule Ash.Actions.Update.Bulk do | {:ok, [Ash.Resource.record()]} | {:ok, [Ash.Resource.record()], [Ash.Notifier.Notification.t()]} | {:error, term} + def run(api, resource, action, input, opts) when is_atom(resource) do + run(api, Ash.Query.new(resource), action, input, opts) + end + def run(api, %Ash.Query{} = query, action, input, opts) do query = if query.action do @@ -21,25 +25,78 @@ defmodule Ash.Actions.Update.Bulk do query end - if !query.action.pagination || !query.action.pagination.keyset? do - raise Ash.Error.Invalid.NonStreamableAction, - resource: query.resource, - action: query.action, - for_bulk_update: action.name + query = %{query | api: api} + + fully_atomic_changeset = + if Ash.DataLayer.data_layer_can?(query.resource, :update_query) do + Ash.Changeset.fully_atomic_changeset(query.resource, action, input, opts) + else + :not_atomic + end + + case fully_atomic_changeset do + :not_atomic -> + read_opts = + opts + |> Keyword.drop([ + :resource, + :atomic_update, + :stream_batch_size, + :batch_size, + :stream_with, + :allow_stream_with + ]) + |> Keyword.put(:batch_size, opts[:stream_batch_size]) + + run( + api, + api.stream!(query, read_opts), + action, + input, + Keyword.put(opts, :resource, query.resource) + ) + + %Ash.Changeset{valid?: false, errors: errors} -> + %Ash.BulkResult{ + status: :error, + errors: [Ash.Error.to_error_class(errors)] + } + + atomic_changeset -> + with {:ok, query} <- authorize_bulk_query(query, opts), + {:ok, atomic_changeset, query} <- + authorize_atomic_changeset(query, atomic_changeset, opts), + {:ok, data_layer_query} <- Ash.Query.data_layer_query(query) do + case Ash.DataLayer.update_query( + data_layer_query, + atomic_changeset, + Map.new(Keyword.take(opts, [:return_records?, :tenant])) + ) do + :ok -> + %Ash.BulkResult{ + status: :success + } + + {:ok, results} -> + %Ash.BulkResult{ + status: :success, + records: results + } + + {:error, error} -> + %Ash.BulkResult{ + status: :error, + errors: [Ash.Error.to_error_class(error)] + } + end + else + {:error, error} -> + %Ash.BulkResult{ + status: :error, + errors: [Ash.Error.to_error_class(error)] + } + end end - - read_opts = - opts - |> Keyword.drop([:resource, :atomic_update, :stream_batch_size, :batch_size]) - |> Keyword.put(:batch_size, opts[:stream_batch_size]) - - run( - api, - api.stream!(query, read_opts), - action, - input, - Keyword.put(opts, :resource, query.resource) - ) end def run(api, stream, action, input, opts) do @@ -270,6 +327,58 @@ defmodule Ash.Actions.Update.Bulk do end end + defp authorize_bulk_query(query, opts) do + if opts[:authorize?] do + case query.api.can(query, opts[:actor], + return_forbidden_error?: true, + maybe_is: false, + modify_source?: true + ) do + {:ok, true} -> + {:ok, query} + + {:ok, true, query} -> + {:ok, query} + + {:ok, false, error} -> + {:error, error} + + {:error, error} -> + {:error, error} + end + else + {:ok, query} + end + end + + defp authorize_atomic_changeset(query, changeset, opts) do + if opts[:authorize?] do + case query.api.can(query, opts[:actor], + return_forbidden_error?: true, + maybe_is: false, + modify_source?: true, + base_query: query + ) do + {:ok, true} -> + {:ok, changeset, query} + + {:ok, true, %Ash.Query{} = query} -> + {:ok, changeset, query} + + {:ok, true, %Ash.Changeset{} = changeset, %Ash.Query{} = query} -> + {:ok, changeset, query} + + {:ok, false, error} -> + {:error, error} + + {:error, error} -> + {:error, error} + end + else + {:ok, changeset, query} + end + end + defp base_changeset(resource, api, opts, action, input) do arguments = Enum.reduce(input, %{}, fn {key, value}, acc -> diff --git a/lib/ash/api/api.ex b/lib/ash/api/api.ex index b355c3a1..1e665f63 100644 --- a/lib/ash/api/api.ex +++ b/lib/ash/api/api.ex @@ -955,7 +955,7 @@ defmodule Ash.Api do authorizers -> authorizers |> Enum.reduce_while( - {false, nil}, + {false, opts[:base_query]}, fn {authorizer, authorizer_state, context}, {_authorized?, query} -> case authorizer.strict_check(authorizer_state, context) do {:error, %{class: :forbidden} = e} when is_exception(e) -> @@ -1441,6 +1441,9 @@ defmodule Ash.Api do - `alter_source?` - If true, the query or changeset will be returned with authorization modifications made. For a query, this mans adding field visibility calculations and altering the filter or the sort. For a changeset, this means only adding field visibility calculations. The default value is `false`. + - `base_query` - If authorizing an update, some cases can return both a new changeset and a query filtered for only things + that will be authorized to update. Providing the `base_query` will cause that query to be altered instead of a new one to be + generated. """ @callback can( @@ -1653,6 +1656,8 @@ defmodule Ash.Api do or `after_action` hooks that can operate on the entire list at once. See the documentation for that callback for more on how to do accomplish that. + ## Options + #{Spark.OptionsHelpers.docs(@bulk_create_opts_schema)} """ @callback bulk_create( @@ -1682,8 +1687,14 @@ defmodule Ash.Api do @doc """ Updates all items in the provided enumerable or query with the provided input. - Currently, this streams each record and updates it. Soon, this will use special data layer - callbacks to run these update statements in a single query. + If the data layer supports updating from a query, and the update action can be done fully atomically, + it will be updated in a single pass using the data layer. + + Otherwise, this will stream each record and update it. + + ## Options + + #{Spark.OptionsHelpers.docs(@bulk_update_opts_schema)} """ @callback bulk_update( Enumerable.t(Ash.Resource.record()) | Ash.Query.t(), @@ -1707,8 +1718,14 @@ defmodule Ash.Api do @doc """ Destroys all items in the provided enumerable or query with the provided input. - Currently, this streams each record and destroys it. Soon, this will use special data layer - callbacks to run these update statements in a single query. + If the data layer supports destroying from a query, and the destroy action can be done fully atomically, + it will be updated in a single pass using the data layer. + + Otherwise, this will stream each record and update it. + + ## Options + + #{Spark.OptionsHelpers.docs(@bulk_destroy_opts_schema)} """ @callback bulk_destroy( Enumerable.t(Ash.Resource.record()) | Ash.Query.t(), diff --git a/lib/ash/changeset/changeset.ex b/lib/ash/changeset/changeset.ex index c7f66e0c..ae552e8f 100644 --- a/lib/ash/changeset/changeset.ex +++ b/lib/ash/changeset/changeset.ex @@ -507,8 +507,14 @@ defmodule Ash.Changeset do opts ) - with %Ash.Changeset{} = changeset <- atomic_params(changeset, action, params) do - atomic_changes(changeset, action) + with %Ash.Changeset{} = changeset <- + atomic_update(changeset, opts[:atomic_update] || []), + %Ash.Changeset{} = changeset <- atomic_params(changeset, action, params), + %Ash.Changeset{} = changeset <- atomic_changes(changeset, action) do + hydrate_atomic_refs(changeset, opts[:actor]) + else + _ -> + :not_atomic end end @@ -625,7 +631,10 @@ defmodule Ash.Changeset do """ def atomic_ref(changeset, field) do if base_value = changeset.atomics[field] do - base_value + %{type: type, constraints: constraints} = + Ash.Resource.Info.attribute(changeset.resource, field) + + Ash.Expr.expr(type(^base_value, ^type, ^constraints)) else Ash.Expr.expr(ref(^field)) end @@ -1565,6 +1574,28 @@ defmodule Ash.Changeset do {key, expr} end) } + |> add_known_atomic_errors() + end + + defp add_known_atomic_errors(changeset) do + Enum.reduce(changeset.atomics, changeset, fn + {_, + %Ash.Query.Function.Error{ + arguments: [exception, input] + }}, + changeset -> + if Ash.Filter.TemplateHelpers.expr?(input) do + changeset + else + add_error( + changeset, + Ash.Error.from_json(exception, Jason.decode!(Jason.encode!(input))) + ) + end + + _other, changeset -> + changeset + end) end @doc false diff --git a/lib/ash/data_layer/data_layer.ex b/lib/ash/data_layer/data_layer.ex index df04ed8d..9255286d 100644 --- a/lib/ash/data_layer/data_layer.ex +++ b/lib/ash/data_layer/data_layer.ex @@ -87,6 +87,9 @@ defmodule Ash.DataLayer do | :aggregate_sort | :boolean_filter | :async_engine + | :bulk_create + | :update_query + | :destroy_query | :create | :read | :update @@ -166,7 +169,7 @@ defmodule Ash.DataLayer do @callback return_query(data_layer_query(), Ash.Resource.t()) :: {:ok, data_layer_query()} | {:error, term} - @type bulk_options :: %{ + @type bulk_create_options :: %{ batch_size: pos_integer, return_records?: boolean, upsert?: boolean, @@ -180,20 +183,49 @@ defmodule Ash.DataLayer do tenant: String.t() | nil } + @type bulk_update_options :: %{ + return_records?: boolean, + tenant: String.t() | nil + } + @callback bulk_create( Ash.Resource.t(), Enumerable.t(Ash.Changeset.t()), - options :: bulk_options + options :: bulk_create_options ) :: - {:ok, Enumerable.t(:ok | {:ok, Ash.Resource.record()} | {:error, Ash.Error.t()})} + :ok + | {:ok, Enumerable.t(Ash.Resource.record())} | {:error, Ash.Error.t()} - | {:error, :no_rollback, term} + | {:error, :no_rollback, Ash.Error.t()} @callback create(Ash.Resource.t(), Ash.Changeset.t()) :: {:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term} @callback upsert(Ash.Resource.t(), Ash.Changeset.t(), list(atom)) :: {:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term} @callback update(Ash.Resource.t(), Ash.Changeset.t()) :: {:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term} + + @callback update_query( + data_layer_query(), + Ash.Changeset.t(), + Ash.Resource.t(), + opts :: bulk_update_options() + ) :: + :ok + | {:ok, Enumerable.t(Ash.Resource.record())} + | {:error, Ash.Error.t()} + | {:error, :no_rollback, Ash.Error.t()} + + @callback destroy_query( + data_layer_query(), + Ash.Changeset.t(), + Ash.Resource.t(), + opts :: bulk_update_options() + ) :: + :ok + | {:ok, Enumerable.t(Ash.Resource.record())} + | {:error, Ash.Error.t()} + | {:error, :no_rollback, Ash.Error.t()} + @callback add_aggregate( data_layer_query(), Ash.Query.Aggregate.t(), @@ -240,6 +272,8 @@ defmodule Ash.DataLayer do @optional_callbacks source: 1, run_query: 2, bulk_create: 3, + update_query: 4, + destroy_query: 4, distinct: 3, return_query: 2, lock: 3, @@ -383,6 +417,34 @@ defmodule Ash.DataLayer do Ash.DataLayer.data_layer(resource).update(resource, changeset) end + @spec update_query(data_layer_query(), Ash.Changeset.t(), opts :: bulk_update_options()) :: + :ok + | {:ok, Enumerable.t(Ash.Resource.record())} + | {:error, Ash.Error.t()} + | {:error, :no_rollback, Ash.Error.t()} + def update_query(query, changeset, opts) do + Ash.DataLayer.data_layer(changeset.resource).update_query( + query, + changeset, + changeset.resource, + opts + ) + end + + @spec destroy_query(data_layer_query(), Ash.Changeset.t(), opts :: bulk_update_options()) :: + :ok + | {:ok, Enumerable.t(Ash.Resource.record())} + | {:error, Ash.Error.t()} + | {:error, :no_rollback, Ash.Error.t()} + def destroy_query(query, changeset, opts) do + Ash.DataLayer.data_layer(changeset.resource).destroy_query( + query, + changeset, + changeset.resource, + opts + ) + end + @spec create(Ash.Resource.t(), Ash.Changeset.t()) :: {:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term} def create(resource, changeset) do @@ -404,7 +466,7 @@ defmodule Ash.DataLayer do @spec bulk_create( Ash.Resource.t(), Enumerable.t(Ash.Changeset.t()), - options :: bulk_options + options :: bulk_create_options ) :: :ok | {:ok, Enumerable.t(Ash.Resource.record())} diff --git a/lib/ash/error/exception.ex b/lib/ash/error/exception.ex index 65f3b605..1aca65b2 100644 --- a/lib/ash/error/exception.ex +++ b/lib/ash/error/exception.ex @@ -51,7 +51,7 @@ defmodule Ash.Error.Exception do %{ __struct__: Ash.Error.Stacktrace, - stacktrace: Enum.drop(stacktrace, 2) + stacktrace: Enum.drop(stacktrace, 4) } end) diff --git a/lib/ash/query/function/error.ex b/lib/ash/query/function/error.ex index 9867504f..00dea7e1 100644 --- a/lib/ash/query/function/error.ex +++ b/lib/ash/query/function/error.ex @@ -10,6 +10,6 @@ defmodule Ash.Query.Function.Error do def args, do: [[:atom, :any]] def evaluate(%{arguments: [exception, input]}) do - {:error, exception.exception(input)} + {:error, Ash.Error.from_json(exception, Jason.decode!(Jason.encode!(Map.new(input))))} end end diff --git a/lib/ash/resource/validation/attribute_does_not_equal.ex b/lib/ash/resource/validation/attribute_does_not_equal.ex index 6c6e3d05..952d0628 100644 --- a/lib/ash/resource/validation/attribute_does_not_equal.ex +++ b/lib/ash/resource/validation/attribute_does_not_equal.ex @@ -4,6 +4,7 @@ defmodule Ash.Resource.Validation.AttributeDoesNotEqual do use Ash.Resource.Validation alias Ash.Error.Changes.InvalidAttribute + require Ash.Expr @opt_schema [ attribute: [ @@ -43,6 +44,21 @@ defmodule Ash.Resource.Validation.AttributeDoesNotEqual do end end + @impl true + def atomic(changeset, opts) do + field_value = Ash.Changeset.atomic_ref(changeset, opts[:attribute]) + + {:atomic, [opts[:attribute]], Ash.Expr.expr(^field_value == ^opts[:value]), + Ash.Expr.expr( + error(^InvalidAttribute, %{ + field: ^opts[:attribute], + value: ^field_value, + message: "must not equal %{value}", + vars: %{field: ^opts[:attribute], value: ^opts[:value]} + }) + )} + end + @impl true def describe(opts) do [ diff --git a/mix.exs b/mix.exs index 612b5cae..9c2dc090 100644 --- a/mix.exs +++ b/mix.exs @@ -44,8 +44,6 @@ defmodule Ash.MixProject do |> Path.basename(".md") |> Path.basename(".livemd") |> Path.basename(".cheatmd") - # We want to keep permalinks, so we remove the sorting number - |> String.replace(~r/^\d+\-/, "") title = html_filename