improvement: implement atomics, expression-based changes (#682)

there is still a lot of potential work that needs to be on this front.
1. supporting atomics on create actions.
2. supporting atomics in upserts (this one may actually be much easier than the first one, for postgres specifically, due to ecto implementation details)
3. discovering places atomics can be more nicely integrated into existing changes, validations, policies
This commit is contained in:
Zach Daniel 2023-08-28 16:05:26 -04:00 committed by GitHub
parent 452ec7df9d
commit 18cb24e7f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 275 additions and 114 deletions

View file

@ -189,6 +189,8 @@ spark_locals_without_parens = [
run_flow: 3,
select: 1,
sensitive?: 1,
set: 2,
set: 3,
short_name: 1,
simple_notifiers: 1,
skip_global_validations?: 1,

View file

@ -13,6 +13,14 @@ defmodule Ash.Actions.Update do
| {:error, Ash.Changeset.t()}
| {:error, term}
def run(api, changeset, action, opts) do
if changeset.atomics != [] &&
!Ash.DataLayer.data_layer_can?(changeset.resource, {:atomic, :update}) do
{:error,
Ash.Error.Invalid.AtomicsNotSupported.exception(
resource: changeset.resource,
action_type: :update
)}
else
{changeset, opts} = Ash.Actions.Helpers.add_process_context(api, changeset, opts)
Ash.Tracer.span :action,
@ -49,6 +57,7 @@ defmodule Ash.Actions.Update do
end
end
end
end
@doc false
def do_run(api, changeset, action, opts) do
@ -352,6 +361,8 @@ defmodule Ash.Actions.Update do
)
|> Ash.Changeset.with_hooks(
fn changeset ->
changeset = Ash.Changeset.hydrate_atomic_refs(changeset, actor)
case Ash.Actions.ManagedRelationships.setup_managed_belongs_to_relationships(
changeset,
actor,
@ -409,7 +420,8 @@ defmodule Ash.Actions.Update do
authorize?: authorize?
)
Ash.Changeset.changing_attributes?(changeset) ->
Ash.Changeset.changing_attributes?(changeset) ||
!Enum.empty?(changeset.atomics) ->
changeset =
changeset
|> Ash.Changeset.set_defaults(:update, true)

View file

@ -975,7 +975,7 @@ defmodule Ash.Api do
end
with {:ok, expr} <- expr do
case Ash.Expr.eval(expr, record: record) do
case Ash.Expr.eval(expr, record: record, resource: resource) do
{:ok, result} ->
{:ok, result}

View file

@ -23,6 +23,7 @@ defmodule Ash.Changeset do
invalid_keys: MapSet.new(),
filters: %{},
action_failed?: false,
atomics: [],
after_action: [],
after_transaction: [],
arguments: %{},
@ -743,6 +744,26 @@ defmodule Ash.Changeset do
end)
end
@doc """
Adds atomic changes to the changeset
i.e `Ash.Changeset.atomic(changeset, score: [Ash.Expr.expr(score + 1)])`
"""
def atomic(changeset, atomics) when is_list(atomics) do
Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
atomic(changeset, key, value)
end)
end
@doc """
Adds an atomic change to the changeset
i.e `Ash.Changeset.atomic(changeset, :score, [Ash.Expr.expr(score + 1)])`
"""
def atomic(changeset, key, value) do
%{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
end
@doc """
Set the result of the action. This will prevent running the underlying datalayer behavior
"""
@ -1228,6 +1249,28 @@ defmodule Ash.Changeset do
end)
end
@doc false
def hydrate_atomic_refs(changeset, actor) do
%{
changeset
| atomics:
Enum.map(changeset.atomics, fn {key, expr} ->
expr =
Ash.Filter.build_filter_from_template(
expr,
actor,
changeset.arguments,
changeset.context
)
{:ok, expr} =
Ash.Filter.hydrate_refs(expr, %{resource: changeset.resource, public?: false})
{key, expr}
end)
}
end
@doc false
def set_defaults(changeset, action_type, lazy? \\ false)

View file

@ -47,6 +47,7 @@ defmodule Ash.DataLayer do
@type feature() ::
:transact
| :multitenancy
| {:atomic, :update}
| {:lateral_join, list(Ash.Resource.t())}
| {:join, Ash.Resource.t()}
| {:aggregate, Ash.Query.Aggregate.kind()}

View file

@ -239,6 +239,7 @@ defmodule Ash.DataLayer.Ets do
def can?(_, {:query_aggregate, :avg}), do: true
def can?(_, {:query_aggregate, :exists}), do: true
def can?(_, {:sort, _}), do: true
def can?(_, {:atomic, :update}), do: true
def can?(_, _), do: false
@doc false
@ -1033,11 +1034,11 @@ defmodule Ash.DataLayer.Ets do
@doc false
def dump_to_native(record, attributes) do
Enum.reduce_while(attributes, {:ok, %{}}, fn attribute, {:ok, attrs} ->
case Map.get(record, attribute.name) do
nil ->
{:cont, {:ok, Map.put(attrs, attribute.name, nil)}}
case Map.fetch(record, attribute.name) do
:error ->
{:cont, {:ok, attrs}}
value ->
{:ok, value} ->
case Ash.Type.dump_to_native(
attribute.type,
value,
@ -1080,10 +1081,21 @@ defmodule Ash.DataLayer.Ets do
def update(resource, changeset, pkey \\ nil) do
pkey = pkey || pkey_map(resource, changeset.data)
with {:ok, table} <- wrap_or_create_table(resource, changeset.tenant),
{:ok, record} <- Ash.Changeset.apply_attributes(changeset),
atomic_changes =
Enum.reduce_while(changeset.atomics, {:ok, %{}}, fn {key, value}, {:ok, acc} ->
case Ash.Expr.eval(value, resource: resource, record: changeset.data) do
{:ok, value} ->
{:cont, {:ok, Map.put(acc, key, value)}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
with {:ok, atomics} <- atomic_changes,
{:ok, table} <- wrap_or_create_table(resource, changeset.tenant),
{:ok, record} <-
do_update(table, {pkey, record}, resource),
do_update(table, {pkey, Map.merge(changeset.attributes, atomics)}, resource),
{:ok, record} <- cast_record(record, resource) do
new_pkey = pkey_map(resource, record)
@ -1120,7 +1132,10 @@ defmodule Ash.DataLayer.Ets do
{:ok, casted} ->
case ETS.Set.get(table, pkey) do
{:ok, {_key, record}} when is_map(record) ->
case ETS.Set.put(table, {pkey, Map.merge(record, casted)}) do
case ETS.Set.put(
table,
{pkey, Map.merge(record, casted)}
) do
{:ok, set} ->
{_key, record} = ETS.Set.get!(set, pkey)
{:ok, record}

View file

@ -15,7 +15,7 @@ defmodule Ash.Error.Changes.StaleRecord do
"#{key}: #{inspect(value)}"
end)
"record of #{inspect(error.resource)} with filter `#{filter}`"
"Attempted to update stale record of #{inspect(error.resource)} with filter `#{filter}`"
end
end
end

View file

@ -0,0 +1,16 @@
defmodule Ash.Error.Invalid.AtomicsNotSupported do
@moduledoc "Used when atomics for the given action type are not not supported by the data layer, but one is used."
use Ash.Error.Exception
def_ash_error([:resource, :action_type], class: :invalid)
defimpl Ash.ErrorKind do
def id(_), do: Ash.UUID.generate()
def code(_), do: "atomics_not_supported"
def message(%{resource: resource, action_type: action_type}) do
"The data layer for #{inspect(resource)} does not support atomics on #{action_type} actions"
end
end
end

View file

@ -226,8 +226,8 @@ defmodule Ash.Filter do
def parse_input(
resource,
statement,
aggregates \\ %{},
calculations \\ %{},
_aggregates \\ %{},
_calculations \\ %{},
context \\ %{}
) do
context =
@ -236,8 +236,6 @@ defmodule Ash.Filter do
resource: resource,
root_resource: resource,
relationship_path: [],
aggregates: aggregates,
calculations: calculations,
public?: true,
input?: true,
data_layer: Ash.DataLayer.data_layer(resource)
@ -273,8 +271,8 @@ defmodule Ash.Filter do
See `parse/2` for more
"""
def parse!(resource, statement, aggregates \\ %{}, calculations \\ %{}, context \\ %{}) do
case parse(resource, statement, aggregates, calculations, context) do
def parse!(resource, statement, _aggregates \\ %{}, _calculations \\ %{}, context \\ %{}) do
case parse(resource, statement, %{}, %{}, context) do
{:ok, filter} ->
filter
@ -296,22 +294,6 @@ defmodule Ash.Filter do
If you are trying to validate a filter supplied from an external/untrusted source,
be sure to use `parse_input/2` instead! The only difference is that it only accepts
filters over public attributes/relationships.
### Aggregates and calculations
Since custom aggregates/calculations can be added to a query, and they must be explicitly loaded into
a query, the filter parser does not parse them by default. If you wish to support parsing filters
over aggregates/calculations, provide them as the third argument. The best way to do this is to build a query
with them added/loaded, and then use the `aggregates` and `calculations` keys on the query.
### NOTE
A change was made recently that will automatically load any aggregates/calculations that are used in a filter, but
if you are using this function you still need to pass them in.
```elixir
Ash.Filter.parse(MyResource, [id: 1], query.aggregates, query.calculations)
```
"""
def parse(resource, statement, aggregates \\ %{}, calculations \\ %{}, context \\ %{})
@ -319,14 +301,12 @@ defmodule Ash.Filter do
{:ok, nil}
end
def parse(resource, statement, aggregates, calculations, original_context) do
def parse(resource, statement, _aggregates, _calculations, original_context) do
context =
Map.merge(
%{
resource: resource,
relationship_path: [],
aggregates: aggregates,
calculations: calculations,
public?: false,
input?: false,
root_resource: resource,
@ -912,8 +892,8 @@ defmodule Ash.Filter do
filter,
resource,
relationship_path \\ [],
calculations \\ %{},
aggregates \\ %{}
_calculations \\ %{},
_aggregates \\ %{}
) do
filter
|> list_refs()
@ -928,18 +908,14 @@ defmodule Ash.Filter do
|> Enum.map(& &1.attribute)
|> calculations_used_by_calculations(
resource,
relationship_path,
calculations,
aggregates
relationship_path
)
end
defp calculations_used_by_calculations(
used_calculations,
resource,
relationship_path,
calculations,
aggregates
relationship_path
) do
used_calculations
|> Enum.flat_map(fn calculation ->
@ -947,8 +923,6 @@ defmodule Ash.Filter do
case hydrate_refs(expression, %{
resource: resource,
aggregates: aggregates,
calculations: calculations,
relationship_path: [],
public?: false
}) do
@ -958,14 +932,10 @@ defmodule Ash.Filter do
used_calculations(
expression,
resource,
relationship_path,
calculations,
aggregates
relationship_path
),
resource,
relationship_path,
calculations,
aggregates
relationship_path
)
[calculation | with_recursive_used]
@ -1047,7 +1017,7 @@ defmodule Ash.Filter do
end
def add_to_filter(%__MODULE__{} = base, statement, op, aggregates, calculations, context) do
case parse(base.resource, statement, aggregates, calculations, context) do
case parse(base.resource, statement, %{}, %{}, context) do
{:ok, filter} -> add_to_filter(base, filter, op, aggregates, calculations)
{:error, error} -> {:error, error}
end
@ -2253,8 +2223,6 @@ defmodule Ash.Filter do
relationship_path: ref.relationship_path,
resource: related,
root_resource: context.root_resource,
aggregates: context.aggregates,
calculations: context.calculations,
public?: context.public?
}
@ -2929,9 +2897,26 @@ defmodule Ash.Filter do
end
end
def do_hydrate_refs(
%Ref{relationship_path: relationship_path, resource: nil} = ref,
%{resource: resource} = context
)
when not is_nil(resource) do
case Ash.Resource.Info.related(resource, relationship_path || []) do
nil ->
{:error, "Invalid reference #{inspect(ref)}"}
related ->
do_hydrate_refs(
%{ref | resource: related},
context
)
end
end
def do_hydrate_refs(
%Ref{attribute: attribute} = ref,
%{aggregates: aggregates, calculations: calculations} = context
context
)
when is_atom(attribute) or is_binary(attribute) do
ref = %{ref | input?: ref.input? || context[:input?] || false}
@ -2945,12 +2930,6 @@ defmodule Ash.Filter do
context = %{context | resource: related}
cond do
Map.has_key?(aggregates, attribute) ->
{:ok, %{ref | attribute: Map.get(aggregates, attribute), resource: related}}
Map.has_key?(calculations, attribute) ->
{:ok, %{ref | attribute: Map.get(calculations, attribute), resource: related}}
attribute = attribute(context, attribute) ->
{:ok, %{ref | attribute: attribute, resource: related}}
@ -3116,8 +3095,6 @@ defmodule Ash.Filter do
root_resource: new_resource,
parent_stack: [context[:root_resource] | context[:parent_stack] || []],
relationship_path: [],
aggregates: %{},
calculations: %{},
public?: context[:public?],
input?: context[:input?],
data_layer: Ash.DataLayer.data_layer(new_resource)

View file

@ -687,6 +687,7 @@ defmodule Ash.Filter.Runtime do
# once per expanded result. I'm not sure what that will
# look like though.
if record do
case module.calculate([record], opts, context) do
[result] ->
{:ok, result}
@ -697,6 +698,9 @@ defmodule Ash.Filter.Runtime do
_ ->
{:ok, nil}
end
else
:unknown
end
end
end

View file

@ -9,6 +9,7 @@ defmodule Ash.Resource.Actions.Update do
accept: nil,
manual: nil,
manual?: false,
atomics: [],
require_attributes: [],
delay_global_validations?: false,
skip_global_validations?: false,

View file

@ -0,0 +1,8 @@
defmodule Ash.Resource.Change.Atomic do
@moduledoc false
use Ash.Resource.Change
def change(changeset, opts, _) do
Ash.Changeset.atomic(changeset, opts[:attribute], opts[:expr])
end
end

View file

@ -63,6 +63,19 @@ defmodule Ash.Resource.Change do
]
end
def atomic_schema do
schema()
|> Keyword.take([:description, :where])
|> Keyword.put(:attribute, type: :atom, required: true, doc: "The attribute to update")
|> Keyword.put(:expr,
type: :any,
required: true,
doc: """
The expression to use to set the attribute
"""
)
end
@doc false
def action_schema do
Keyword.delete(schema(), :on)
@ -104,12 +117,22 @@ defmodule Ash.Resource.Change do
@callback init(Keyword.t()) :: {:ok, Keyword.t()} | {:error, term}
@callback change(Ash.Changeset.t(), Keyword.t(), context) :: Ash.Changeset.t()
@doc """
Replaces `change/3` for batch actions, allowing to optimize changes for bulk actions.
"""
@callback batch_change([Ash.Changeset.t()], Keyword.t(), context) ::
Enumerable.t(Ash.Changeset.t() | Ash.Notifier.Notification.t())
@doc """
Runs on each batch before it is dispatched to the data layer.
"""
@callback before_batch([Ash.Changeset.t()], Keyword.t(), context) ::
Enumerable.t(Ash.Changeset.t() | Ash.Notifier.Notification.t())
@doc """
Runs on each batch result after it is dispatched to the data layer.
"""
@callback after_batch(
[{Ash.Changeset.t(), Ash.Resource.record()}],
Keyword.t(),
@ -126,10 +149,12 @@ defmodule Ash.Resource.Change do
defmacro __using__(_) do
quote do
@behaviour Ash.Resource.Change
require Ash.Expr
def init(opts), do: {:ok, opts}
def atomic(_opts, _context), do: :not_atomic
defoverridable init: 1
defoverridable init: 1, atomic: 2
end
end
end

View file

@ -399,6 +399,37 @@ defmodule Ash.Resource.Dsl do
args: [:change]
}
defmodule Set do
@moduledoc false
defstruct [:description, :where, :attribute, :expr]
def transform(set) do
{:ok,
%Ash.Resource.Change{
change: {Ash.Resource.Change.Atomic, attribute: set.attribute, expr: set.expr}
}
|> Map.merge(Map.take(set, [:description, :where]))}
end
end
@set %Spark.Dsl.Entity{
name: :set,
describe: """
Set an attribute to the result of an expression.
This as a thin wrapper over create an `Ash.Resource.Change` that defines
`atomic/2`.
""",
examples: [
"set :score, expr(score + 1)",
"set :title, expr(some_calc(some_arg: :foo))"
],
target: Set,
schema: Ash.Resource.Change.atomic_schema(),
transform: {Set, :transform, []},
args: [:attribute, :expr]
}
@validate %Spark.Dsl.Entity{
name: :validate,
describe: """
@ -583,7 +614,8 @@ defmodule Ash.Resource.Dsl do
entities: [
changes: [
@action_change,
@action_validate
@action_validate,
@set
],
metadata: [
@metadata
@ -626,7 +658,8 @@ defmodule Ash.Resource.Dsl do
entities: [
changes: [
@action_change,
@action_validate
@action_validate,
@set
],
metadata: [
@metadata

View file

@ -43,6 +43,7 @@ defmodule Ash.Resource.Validation do
@callback init(Keyword.t()) :: {:ok, Keyword.t()} | {:error, String.t()}
@callback validate(Ash.Changeset.t(), Keyword.t()) :: :ok | {:error, term}
@callback describe(Keyword.t()) :: [message: String.t(), vars: Keyword.t()]
@callback atomic?(Keyword.t()) :: boolean
@optional_callbacks describe: 1
@ -103,8 +104,7 @@ defmodule Ash.Resource.Validation do
@behaviour Ash.Resource.Validation
def init(opts), do: {:ok, opts}
defoverridable init: 1
def atomic?(_), do: false
defp with_description(keyword, opts) do
if Kernel.function_exported?(__MODULE__, :describe, 1) do
@ -113,6 +113,8 @@ defmodule Ash.Resource.Validation do
keyword
end
end
defoverridable init: 1, atomic?: 1
end
end

View file

@ -727,6 +727,9 @@ defmodule Ash.Type do
def dump_to_native({:array, type}, term, constraints) do
if is_atom(type) && :erlang.function_exported(type, :dump_to_native_array, 2) do
type.dump_to_native_array(term, constraints)
else
if is_nil(term) do
{:ok, nil}
else
single_constraints = constraints[:items] || []
@ -743,6 +746,7 @@ defmodule Ash.Type do
end)
end
end
end
def dump_to_native(type, term, constraints) do
type = get_type(type)

View file

@ -5,6 +5,7 @@ defmodule Ash.Test.Actions.UpdateTest do
import Ash.Changeset
import Ash.Test
require Ash.Query
require Ash.Expr
defmodule Authorized do
@moduledoc false
@ -349,6 +350,23 @@ defmodule Ash.Test.Actions.UpdateTest do
end
end
describe "atomics" do
test "atomics can be added to a changeset" do
author =
Author
|> new(%{name: "fred"})
|> Api.create!()
author =
author
|> Ash.Changeset.for_update(:only_allow_name)
|> Ash.Changeset.atomic(:name, Ash.Expr.expr(name <> " weasley"))
|> Api.update!()
assert author.name == "fred weasley"
end
end
describe "changeset" do
test "changes are run properly" do
author =