diff --git a/.formatter.exs b/.formatter.exs index 969c3ef8..e84acf32 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -189,6 +189,8 @@ spark_locals_without_parens = [ run_flow: 3, select: 1, sensitive?: 1, + set: 2, + set: 3, short_name: 1, simple_notifiers: 1, skip_global_validations?: 1, diff --git a/lib/ash/actions/update.ex b/lib/ash/actions/update.ex index a6dfb28f..850664de 100644 --- a/lib/ash/actions/update.ex +++ b/lib/ash/actions/update.ex @@ -13,38 +13,47 @@ defmodule Ash.Actions.Update do | {:error, Ash.Changeset.t()} | {:error, term} def run(api, changeset, action, opts) do - {changeset, opts} = Ash.Actions.Helpers.add_process_context(api, changeset, opts) + if changeset.atomics != [] && + !Ash.DataLayer.data_layer_can?(changeset.resource, {:atomic, :update}) do + {:error, + Ash.Error.Invalid.AtomicsNotSupported.exception( + resource: changeset.resource, + action_type: :update + )} + else + {changeset, opts} = Ash.Actions.Helpers.add_process_context(api, changeset, opts) - Ash.Tracer.span :action, - Ash.Api.Info.span_name( - api, - changeset.resource, - action.name - ), - opts[:tracer] do - metadata = %{ - api: api, - resource: changeset.resource, - resource_short_name: Ash.Resource.Info.short_name(changeset.resource), - actor: opts[:actor], - tenant: opts[:tenant], - action: action.name, - authorize?: opts[:authorize?] - } + Ash.Tracer.span :action, + Ash.Api.Info.span_name( + api, + changeset.resource, + action.name + ), + opts[:tracer] do + metadata = %{ + api: api, + resource: changeset.resource, + resource_short_name: Ash.Resource.Info.short_name(changeset.resource), + actor: opts[:actor], + tenant: opts[:tenant], + action: action.name, + authorize?: opts[:authorize?] + } - Ash.Tracer.set_metadata(opts[:tracer], :action, metadata) + Ash.Tracer.set_metadata(opts[:tracer], :action, metadata) - Ash.Tracer.telemetry_span [:ash, Ash.Api.Info.short_name(api), :update], metadata do - case do_run(api, changeset, action, opts) do - {:error, error} -> - if opts[:tracer] do - opts[:tracer].set_error(Ash.Error.to_error_class(error)) - end + Ash.Tracer.telemetry_span [:ash, Ash.Api.Info.short_name(api), :update], metadata do + case do_run(api, changeset, action, opts) do + {:error, error} -> + if opts[:tracer] do + opts[:tracer].set_error(Ash.Error.to_error_class(error)) + end - {:error, error} + {:error, error} - other -> - other + other -> + other + end end end end @@ -352,6 +361,8 @@ defmodule Ash.Actions.Update do ) |> Ash.Changeset.with_hooks( fn changeset -> + changeset = Ash.Changeset.hydrate_atomic_refs(changeset, actor) + case Ash.Actions.ManagedRelationships.setup_managed_belongs_to_relationships( changeset, actor, @@ -409,7 +420,8 @@ defmodule Ash.Actions.Update do authorize?: authorize? ) - Ash.Changeset.changing_attributes?(changeset) -> + Ash.Changeset.changing_attributes?(changeset) || + !Enum.empty?(changeset.atomics) -> changeset = changeset |> Ash.Changeset.set_defaults(:update, true) diff --git a/lib/ash/api/api.ex b/lib/ash/api/api.ex index be9fe57a..1e5c4813 100644 --- a/lib/ash/api/api.ex +++ b/lib/ash/api/api.ex @@ -975,7 +975,7 @@ defmodule Ash.Api do end with {:ok, expr} <- expr do - case Ash.Expr.eval(expr, record: record) do + case Ash.Expr.eval(expr, record: record, resource: resource) do {:ok, result} -> {:ok, result} diff --git a/lib/ash/changeset/changeset.ex b/lib/ash/changeset/changeset.ex index 503deb8b..297a904e 100644 --- a/lib/ash/changeset/changeset.ex +++ b/lib/ash/changeset/changeset.ex @@ -23,6 +23,7 @@ defmodule Ash.Changeset do invalid_keys: MapSet.new(), filters: %{}, action_failed?: false, + atomics: [], after_action: [], after_transaction: [], arguments: %{}, @@ -743,6 +744,26 @@ defmodule Ash.Changeset do end) end + @doc """ + Adds atomic changes to the changeset + + i.e `Ash.Changeset.atomic(changeset, score: [Ash.Expr.expr(score + 1)])` + """ + def atomic(changeset, atomics) when is_list(atomics) do + Enum.reduce(atomics, changeset, fn {key, value}, changeset -> + atomic(changeset, key, value) + end) + end + + @doc """ + Adds an atomic change to the changeset + + i.e `Ash.Changeset.atomic(changeset, :score, [Ash.Expr.expr(score + 1)])` + """ + def atomic(changeset, key, value) do + %{changeset | atomics: Keyword.put(changeset.atomics, key, value)} + end + @doc """ Set the result of the action. This will prevent running the underlying datalayer behavior """ @@ -1228,6 +1249,28 @@ defmodule Ash.Changeset do end) end + @doc false + def hydrate_atomic_refs(changeset, actor) do + %{ + changeset + | atomics: + Enum.map(changeset.atomics, fn {key, expr} -> + expr = + Ash.Filter.build_filter_from_template( + expr, + actor, + changeset.arguments, + changeset.context + ) + + {:ok, expr} = + Ash.Filter.hydrate_refs(expr, %{resource: changeset.resource, public?: false}) + + {key, expr} + end) + } + end + @doc false def set_defaults(changeset, action_type, lazy? \\ false) diff --git a/lib/ash/data_layer/data_layer.ex b/lib/ash/data_layer/data_layer.ex index bbaf78eb..e2118bae 100644 --- a/lib/ash/data_layer/data_layer.ex +++ b/lib/ash/data_layer/data_layer.ex @@ -47,6 +47,7 @@ defmodule Ash.DataLayer do @type feature() :: :transact | :multitenancy + | {:atomic, :update} | {:lateral_join, list(Ash.Resource.t())} | {:join, Ash.Resource.t()} | {:aggregate, Ash.Query.Aggregate.kind()} diff --git a/lib/ash/data_layer/ets/ets.ex b/lib/ash/data_layer/ets/ets.ex index 5d1158a4..3cf148e2 100644 --- a/lib/ash/data_layer/ets/ets.ex +++ b/lib/ash/data_layer/ets/ets.ex @@ -239,6 +239,7 @@ defmodule Ash.DataLayer.Ets do def can?(_, {:query_aggregate, :avg}), do: true def can?(_, {:query_aggregate, :exists}), do: true def can?(_, {:sort, _}), do: true + def can?(_, {:atomic, :update}), do: true def can?(_, _), do: false @doc false @@ -1033,11 +1034,11 @@ defmodule Ash.DataLayer.Ets do @doc false def dump_to_native(record, attributes) do Enum.reduce_while(attributes, {:ok, %{}}, fn attribute, {:ok, attrs} -> - case Map.get(record, attribute.name) do - nil -> - {:cont, {:ok, Map.put(attrs, attribute.name, nil)}} + case Map.fetch(record, attribute.name) do + :error -> + {:cont, {:ok, attrs}} - value -> + {:ok, value} -> case Ash.Type.dump_to_native( attribute.type, value, @@ -1080,10 +1081,21 @@ defmodule Ash.DataLayer.Ets do def update(resource, changeset, pkey \\ nil) do pkey = pkey || pkey_map(resource, changeset.data) - with {:ok, table} <- wrap_or_create_table(resource, changeset.tenant), - {:ok, record} <- Ash.Changeset.apply_attributes(changeset), + atomic_changes = + Enum.reduce_while(changeset.atomics, {:ok, %{}}, fn {key, value}, {:ok, acc} -> + case Ash.Expr.eval(value, resource: resource, record: changeset.data) do + {:ok, value} -> + {:cont, {:ok, Map.put(acc, key, value)}} + + {:error, error} -> + {:halt, {:error, error}} + end + end) + + with {:ok, atomics} <- atomic_changes, + {:ok, table} <- wrap_or_create_table(resource, changeset.tenant), {:ok, record} <- - do_update(table, {pkey, record}, resource), + do_update(table, {pkey, Map.merge(changeset.attributes, atomics)}, resource), {:ok, record} <- cast_record(record, resource) do new_pkey = pkey_map(resource, record) @@ -1120,7 +1132,10 @@ defmodule Ash.DataLayer.Ets do {:ok, casted} -> case ETS.Set.get(table, pkey) do {:ok, {_key, record}} when is_map(record) -> - case ETS.Set.put(table, {pkey, Map.merge(record, casted)}) do + case ETS.Set.put( + table, + {pkey, Map.merge(record, casted)} + ) do {:ok, set} -> {_key, record} = ETS.Set.get!(set, pkey) {:ok, record} diff --git a/lib/ash/error/changes/stale_record.ex b/lib/ash/error/changes/stale_record.ex index 984a98b1..9b3690f7 100644 --- a/lib/ash/error/changes/stale_record.ex +++ b/lib/ash/error/changes/stale_record.ex @@ -15,7 +15,7 @@ defmodule Ash.Error.Changes.StaleRecord do "#{key}: #{inspect(value)}" end) - "record of #{inspect(error.resource)} with filter `#{filter}`" + "Attempted to update stale record of #{inspect(error.resource)} with filter `#{filter}`" end end end diff --git a/lib/ash/error/invalid/atomics_not_supported.ex b/lib/ash/error/invalid/atomics_not_supported.ex new file mode 100644 index 00000000..12ee1800 --- /dev/null +++ b/lib/ash/error/invalid/atomics_not_supported.ex @@ -0,0 +1,16 @@ +defmodule Ash.Error.Invalid.AtomicsNotSupported do + @moduledoc "Used when atomics for the given action type are not not supported by the data layer, but one is used." + use Ash.Error.Exception + + def_ash_error([:resource, :action_type], class: :invalid) + + defimpl Ash.ErrorKind do + def id(_), do: Ash.UUID.generate() + + def code(_), do: "atomics_not_supported" + + def message(%{resource: resource, action_type: action_type}) do + "The data layer for #{inspect(resource)} does not support atomics on #{action_type} actions" + end + end +end diff --git a/lib/ash/filter/filter.ex b/lib/ash/filter/filter.ex index 8e9cdd3d..51fe8eca 100644 --- a/lib/ash/filter/filter.ex +++ b/lib/ash/filter/filter.ex @@ -226,8 +226,8 @@ defmodule Ash.Filter do def parse_input( resource, statement, - aggregates \\ %{}, - calculations \\ %{}, + _aggregates \\ %{}, + _calculations \\ %{}, context \\ %{} ) do context = @@ -236,8 +236,6 @@ defmodule Ash.Filter do resource: resource, root_resource: resource, relationship_path: [], - aggregates: aggregates, - calculations: calculations, public?: true, input?: true, data_layer: Ash.DataLayer.data_layer(resource) @@ -273,8 +271,8 @@ defmodule Ash.Filter do See `parse/2` for more """ - def parse!(resource, statement, aggregates \\ %{}, calculations \\ %{}, context \\ %{}) do - case parse(resource, statement, aggregates, calculations, context) do + def parse!(resource, statement, _aggregates \\ %{}, _calculations \\ %{}, context \\ %{}) do + case parse(resource, statement, %{}, %{}, context) do {:ok, filter} -> filter @@ -296,22 +294,6 @@ defmodule Ash.Filter do If you are trying to validate a filter supplied from an external/untrusted source, be sure to use `parse_input/2` instead! The only difference is that it only accepts filters over public attributes/relationships. - - ### Aggregates and calculations - - Since custom aggregates/calculations can be added to a query, and they must be explicitly loaded into - a query, the filter parser does not parse them by default. If you wish to support parsing filters - over aggregates/calculations, provide them as the third argument. The best way to do this is to build a query - with them added/loaded, and then use the `aggregates` and `calculations` keys on the query. - - ### NOTE - - A change was made recently that will automatically load any aggregates/calculations that are used in a filter, but - if you are using this function you still need to pass them in. - - ```elixir - Ash.Filter.parse(MyResource, [id: 1], query.aggregates, query.calculations) - ``` """ def parse(resource, statement, aggregates \\ %{}, calculations \\ %{}, context \\ %{}) @@ -319,14 +301,12 @@ defmodule Ash.Filter do {:ok, nil} end - def parse(resource, statement, aggregates, calculations, original_context) do + def parse(resource, statement, _aggregates, _calculations, original_context) do context = Map.merge( %{ resource: resource, relationship_path: [], - aggregates: aggregates, - calculations: calculations, public?: false, input?: false, root_resource: resource, @@ -912,8 +892,8 @@ defmodule Ash.Filter do filter, resource, relationship_path \\ [], - calculations \\ %{}, - aggregates \\ %{} + _calculations \\ %{}, + _aggregates \\ %{} ) do filter |> list_refs() @@ -928,18 +908,14 @@ defmodule Ash.Filter do |> Enum.map(& &1.attribute) |> calculations_used_by_calculations( resource, - relationship_path, - calculations, - aggregates + relationship_path ) end defp calculations_used_by_calculations( used_calculations, resource, - relationship_path, - calculations, - aggregates + relationship_path ) do used_calculations |> Enum.flat_map(fn calculation -> @@ -947,8 +923,6 @@ defmodule Ash.Filter do case hydrate_refs(expression, %{ resource: resource, - aggregates: aggregates, - calculations: calculations, relationship_path: [], public?: false }) do @@ -958,14 +932,10 @@ defmodule Ash.Filter do used_calculations( expression, resource, - relationship_path, - calculations, - aggregates + relationship_path ), resource, - relationship_path, - calculations, - aggregates + relationship_path ) [calculation | with_recursive_used] @@ -1047,7 +1017,7 @@ defmodule Ash.Filter do end def add_to_filter(%__MODULE__{} = base, statement, op, aggregates, calculations, context) do - case parse(base.resource, statement, aggregates, calculations, context) do + case parse(base.resource, statement, %{}, %{}, context) do {:ok, filter} -> add_to_filter(base, filter, op, aggregates, calculations) {:error, error} -> {:error, error} end @@ -2253,8 +2223,6 @@ defmodule Ash.Filter do relationship_path: ref.relationship_path, resource: related, root_resource: context.root_resource, - aggregates: context.aggregates, - calculations: context.calculations, public?: context.public? } @@ -2929,9 +2897,26 @@ defmodule Ash.Filter do end end + def do_hydrate_refs( + %Ref{relationship_path: relationship_path, resource: nil} = ref, + %{resource: resource} = context + ) + when not is_nil(resource) do + case Ash.Resource.Info.related(resource, relationship_path || []) do + nil -> + {:error, "Invalid reference #{inspect(ref)}"} + + related -> + do_hydrate_refs( + %{ref | resource: related}, + context + ) + end + end + def do_hydrate_refs( %Ref{attribute: attribute} = ref, - %{aggregates: aggregates, calculations: calculations} = context + context ) when is_atom(attribute) or is_binary(attribute) do ref = %{ref | input?: ref.input? || context[:input?] || false} @@ -2945,12 +2930,6 @@ defmodule Ash.Filter do context = %{context | resource: related} cond do - Map.has_key?(aggregates, attribute) -> - {:ok, %{ref | attribute: Map.get(aggregates, attribute), resource: related}} - - Map.has_key?(calculations, attribute) -> - {:ok, %{ref | attribute: Map.get(calculations, attribute), resource: related}} - attribute = attribute(context, attribute) -> {:ok, %{ref | attribute: attribute, resource: related}} @@ -3116,8 +3095,6 @@ defmodule Ash.Filter do root_resource: new_resource, parent_stack: [context[:root_resource] | context[:parent_stack] || []], relationship_path: [], - aggregates: %{}, - calculations: %{}, public?: context[:public?], input?: context[:input?], data_layer: Ash.DataLayer.data_layer(new_resource) diff --git a/lib/ash/filter/runtime.ex b/lib/ash/filter/runtime.ex index 58efa8df..268ebf38 100644 --- a/lib/ash/filter/runtime.ex +++ b/lib/ash/filter/runtime.ex @@ -687,15 +687,19 @@ defmodule Ash.Filter.Runtime do # once per expanded result. I'm not sure what that will # look like though. - case module.calculate([record], opts, context) do - [result] -> - {:ok, result} + if record do + case module.calculate([record], opts, context) do + [result] -> + {:ok, result} - {:ok, [result]} -> - {:ok, result} + {:ok, [result]} -> + {:ok, result} - _ -> - {:ok, nil} + _ -> + {:ok, nil} + end + else + :unknown end end end diff --git a/lib/ash/resource/actions/update.ex b/lib/ash/resource/actions/update.ex index 5358b0ec..e6ceea33 100644 --- a/lib/ash/resource/actions/update.ex +++ b/lib/ash/resource/actions/update.ex @@ -9,6 +9,7 @@ defmodule Ash.Resource.Actions.Update do accept: nil, manual: nil, manual?: false, + atomics: [], require_attributes: [], delay_global_validations?: false, skip_global_validations?: false, diff --git a/lib/ash/resource/change/atomic.ex b/lib/ash/resource/change/atomic.ex new file mode 100644 index 00000000..e92aa0c0 --- /dev/null +++ b/lib/ash/resource/change/atomic.ex @@ -0,0 +1,8 @@ +defmodule Ash.Resource.Change.Atomic do + @moduledoc false + use Ash.Resource.Change + + def change(changeset, opts, _) do + Ash.Changeset.atomic(changeset, opts[:attribute], opts[:expr]) + end +end diff --git a/lib/ash/resource/change/change.ex b/lib/ash/resource/change/change.ex index 32337b24..59a75d1d 100644 --- a/lib/ash/resource/change/change.ex +++ b/lib/ash/resource/change/change.ex @@ -63,6 +63,19 @@ defmodule Ash.Resource.Change do ] end + def atomic_schema do + schema() + |> Keyword.take([:description, :where]) + |> Keyword.put(:attribute, type: :atom, required: true, doc: "The attribute to update") + |> Keyword.put(:expr, + type: :any, + required: true, + doc: """ + The expression to use to set the attribute + """ + ) + end + @doc false def action_schema do Keyword.delete(schema(), :on) @@ -104,12 +117,22 @@ defmodule Ash.Resource.Change do @callback init(Keyword.t()) :: {:ok, Keyword.t()} | {:error, term} @callback change(Ash.Changeset.t(), Keyword.t(), context) :: Ash.Changeset.t() + + @doc """ + Replaces `change/3` for batch actions, allowing to optimize changes for bulk actions. + """ @callback batch_change([Ash.Changeset.t()], Keyword.t(), context) :: Enumerable.t(Ash.Changeset.t() | Ash.Notifier.Notification.t()) + @doc """ + Runs on each batch before it is dispatched to the data layer. + """ @callback before_batch([Ash.Changeset.t()], Keyword.t(), context) :: Enumerable.t(Ash.Changeset.t() | Ash.Notifier.Notification.t()) + @doc """ + Runs on each batch result after it is dispatched to the data layer. + """ @callback after_batch( [{Ash.Changeset.t(), Ash.Resource.record()}], Keyword.t(), @@ -126,10 +149,12 @@ defmodule Ash.Resource.Change do defmacro __using__(_) do quote do @behaviour Ash.Resource.Change + require Ash.Expr def init(opts), do: {:ok, opts} + def atomic(_opts, _context), do: :not_atomic - defoverridable init: 1 + defoverridable init: 1, atomic: 2 end end end diff --git a/lib/ash/resource/dsl.ex b/lib/ash/resource/dsl.ex index dd434d7e..625a782d 100644 --- a/lib/ash/resource/dsl.ex +++ b/lib/ash/resource/dsl.ex @@ -399,6 +399,37 @@ defmodule Ash.Resource.Dsl do args: [:change] } + defmodule Set do + @moduledoc false + defstruct [:description, :where, :attribute, :expr] + + def transform(set) do + {:ok, + %Ash.Resource.Change{ + change: {Ash.Resource.Change.Atomic, attribute: set.attribute, expr: set.expr} + } + |> Map.merge(Map.take(set, [:description, :where]))} + end + end + + @set %Spark.Dsl.Entity{ + name: :set, + describe: """ + Set an attribute to the result of an expression. + + This as a thin wrapper over create an `Ash.Resource.Change` that defines + `atomic/2`. + """, + examples: [ + "set :score, expr(score + 1)", + "set :title, expr(some_calc(some_arg: :foo))" + ], + target: Set, + schema: Ash.Resource.Change.atomic_schema(), + transform: {Set, :transform, []}, + args: [:attribute, :expr] + } + @validate %Spark.Dsl.Entity{ name: :validate, describe: """ @@ -583,7 +614,8 @@ defmodule Ash.Resource.Dsl do entities: [ changes: [ @action_change, - @action_validate + @action_validate, + @set ], metadata: [ @metadata @@ -626,7 +658,8 @@ defmodule Ash.Resource.Dsl do entities: [ changes: [ @action_change, - @action_validate + @action_validate, + @set ], metadata: [ @metadata diff --git a/lib/ash/resource/validation.ex b/lib/ash/resource/validation.ex index e71d54c5..7995f01a 100644 --- a/lib/ash/resource/validation.ex +++ b/lib/ash/resource/validation.ex @@ -43,6 +43,7 @@ defmodule Ash.Resource.Validation do @callback init(Keyword.t()) :: {:ok, Keyword.t()} | {:error, String.t()} @callback validate(Ash.Changeset.t(), Keyword.t()) :: :ok | {:error, term} @callback describe(Keyword.t()) :: [message: String.t(), vars: Keyword.t()] + @callback atomic?(Keyword.t()) :: boolean @optional_callbacks describe: 1 @@ -103,8 +104,7 @@ defmodule Ash.Resource.Validation do @behaviour Ash.Resource.Validation def init(opts), do: {:ok, opts} - - defoverridable init: 1 + def atomic?(_), do: false defp with_description(keyword, opts) do if Kernel.function_exported?(__MODULE__, :describe, 1) do @@ -113,6 +113,8 @@ defmodule Ash.Resource.Validation do keyword end end + + defoverridable init: 1, atomic?: 1 end end diff --git a/lib/ash/type/type.ex b/lib/ash/type/type.ex index a15fba5b..66058d71 100644 --- a/lib/ash/type/type.ex +++ b/lib/ash/type/type.ex @@ -728,19 +728,23 @@ defmodule Ash.Type do if is_atom(type) && :erlang.function_exported(type, :dump_to_native_array, 2) do type.dump_to_native_array(term, constraints) else - single_constraints = constraints[:items] || [] + if is_nil(term) do + {:ok, nil} + else + single_constraints = constraints[:items] || [] - term - |> Enum.reverse() - |> Enum.reduce_while({:ok, []}, fn item, {:ok, dumped} -> - case dump_to_native(type, item, single_constraints) do - :error -> - {:halt, :error} + term + |> Enum.reverse() + |> Enum.reduce_while({:ok, []}, fn item, {:ok, dumped} -> + case dump_to_native(type, item, single_constraints) do + :error -> + {:halt, :error} - {:ok, value} -> - {:cont, {:ok, [value | dumped]}} - end - end) + {:ok, value} -> + {:cont, {:ok, [value | dumped]}} + end + end) + end end end diff --git a/test/actions/update_test.exs b/test/actions/update_test.exs index 08a5f841..ac421306 100644 --- a/test/actions/update_test.exs +++ b/test/actions/update_test.exs @@ -5,6 +5,7 @@ defmodule Ash.Test.Actions.UpdateTest do import Ash.Changeset import Ash.Test require Ash.Query + require Ash.Expr defmodule Authorized do @moduledoc false @@ -349,6 +350,23 @@ defmodule Ash.Test.Actions.UpdateTest do end end + describe "atomics" do + test "atomics can be added to a changeset" do + author = + Author + |> new(%{name: "fred"}) + |> Api.create!() + + author = + author + |> Ash.Changeset.for_update(:only_allow_name) + |> Ash.Changeset.atomic(:name, Ash.Expr.expr(name <> " weasley")) + |> Api.update!() + + assert author.name == "fred weasley" + end + end + describe "changeset" do test "changes are run properly" do author =