improvement: support atomics (#165)

This commit is contained in:
Zach Daniel 2023-08-28 16:18:56 -04:00 committed by GitHub
parent cbde3958f3
commit 5a4a52854b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 279 additions and 39 deletions

View file

@ -482,6 +482,7 @@ defmodule AshPostgres.DataLayer do
def can?(_, :transact), do: true
def can?(_, :composite_primary_key), do: true
def can?(_, {:atomic, :update}), do: true
def can?(_, :upsert), do: true
def can?(_, :changeset_filter), do: true
@ -1327,16 +1328,32 @@ defmodule AshPostgres.DataLayer do
Map.get(changeset, :filters, %{})
end
filters =
if changeset.action_type == :create do
filters
else
changeset.resource
|> Ash.Resource.Info.primary_key()
|> Enum.reduce(filters, fn key, filters ->
Map.put(filters, key, Map.get(record, key))
end)
end
attributes =
changeset.resource
|> Ash.Resource.Info.attributes()
|> Enum.map(& &1.name)
attributes_to_change =
Enum.reject(attributes, fn attribute ->
Keyword.has_key?(changeset.atomics, attribute)
end)
ecto_changeset =
record
|> to_ecto()
|> set_table(changeset, type)
|> Ecto.Changeset.change(Map.take(changeset.attributes, attributes))
|> Ecto.Changeset.change(Map.take(changeset.attributes, attributes_to_change))
|> Map.update!(:filters, &Map.merge(&1, filters))
|> add_configured_foreign_key_constraints(record.__struct__)
|> add_unique_indexes(record.__struct__, changeset)
@ -1423,7 +1440,42 @@ defmodule AshPostgres.DataLayer do
)
end
defp handle_raised_error(error, stacktrace, _context, _resource) do
defp handle_raised_error(
%Postgrex.Error{} = error,
stacktrace,
%{constraints: user_constraints},
_resource
) do
case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do
[{type, constraint}] ->
user_constraint =
Enum.find(user_constraints, fn c ->
case {c.type, c.constraint, c.match} do
{^type, ^constraint, :exact} -> true
{^type, cc, :suffix} -> String.ends_with?(constraint, cc)
{^type, cc, :prefix} -> String.starts_with?(constraint, cc)
{^type, %Regex{} = r, _match} -> Regex.match?(r, constraint)
_ -> false
end
end)
case user_constraint do
%{field: field, error_message: error_message, error_type: error_type} ->
{:error,
to_ash_error(
{field, {error_message, [constraint: error_type, constraint_name: constraint]}}
)}
nil ->
reraise error, stacktrace
end
_ ->
reraise error, stacktrace
end
end
defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do
{:error, Ash.Error.to_ash_error(error, stacktrace)}
end
@ -1812,32 +1864,155 @@ defmodule AshPostgres.DataLayer do
@impl true
def update(resource, changeset) do
ecto_changeset =
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|> ecto_changeset(changeset, :update)
|> dynamic_repo(resource, changeset).update(
repo_opts(changeset.timeout, changeset.tenant, changeset.resource)
)
|> from_ecto()
|> handle_errors()
|> case do
{:ok, result} ->
maybe_update_tenant(resource, changeset, result)
{:ok, result}
try do
attr_names =
resource
|> Ash.Resource.Info.attributes()
|> Enum.map(& &1.name)
query = from(row in resource, as: ^0)
query =
query
|> default_bindings(resource, changeset.context)
|> Ecto.Query.select(^attr_names)
query =
Enum.reduce(ecto_changeset.filters, query, fn {key, value}, query ->
from(row in query,
where: field(row, ^key) == ^value
)
end)
set =
ecto_changeset.changes
|> Map.take(attr_names)
|> Map.to_list()
atomics_result =
Enum.reduce_while(changeset.atomics, {:ok, query, set}, fn {field, expr},
{:ok, query, set} ->
used_calculations =
Ash.Filter.used_calculations(
expr,
resource
)
used_aggregates =
expr
|> AshPostgres.Aggregate.used_aggregates(
resource,
used_calculations,
[]
)
|> Enum.map(fn aggregate ->
%{aggregate | load: aggregate.name}
end)
with {:ok, query} <-
AshPostgres.Join.join_all_relationships(
query,
%Ash.Filter{
resource: resource,
expression: expr
},
left_only?: true
),
{:ok, query} <-
AshPostgres.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0),
dynamic <-
AshPostgres.Expr.dynamic_expr(query, expr, query.__ash_bindings__) do
{:cont, {:ok, query, Keyword.put(set, field, dynamic)}}
else
other ->
{:halt, other}
end
end)
case atomics_result do
{:ok, query, set} ->
{params, set, _} =
Enum.reduce(set, {[], [], 0}, fn {key, value}, {params, set, count} ->
case AshPostgres.Expr.dynamic_expr(query, value, query.__ash_bindings__) do
%Ecto.Query.DynamicExpr{} = dynamic ->
result =
Ecto.Query.Builder.Dynamic.partially_expand(
:select,
query,
dynamic,
params,
count
)
expr = elem(result, 0)
new_params = elem(result, 1)
new_count =
result |> Tuple.to_list() |> List.last()
{new_params, [{key, expr} | set], new_count}
other ->
{[{other, {0, key}} | params], [{key, {:^, [], [count]}} | set], count + 1}
end
end)
query =
Map.put(query, :updates, [
%Ecto.Query.QueryExpr{
# why do I have to reverse the `set`???
# it breaks if I don't
expr: [set: Enum.reverse(set)],
params: Enum.reverse(params)
}
])
repo_opts = repo_opts(changeset.timeout, changeset.tenant, changeset.resource)
repo_opts =
Keyword.put(repo_opts, :returning, Keyword.keys(changeset.atomics))
result =
dynamic_repo(resource, changeset).update_all(
query,
[],
repo_opts
)
case result do
{0, []} ->
{:error,
Ash.Error.Changes.StaleRecord.exception(
resource: resource,
filters: ecto_changeset.filters
)}
{1, [record]} ->
maybe_update_tenant(resource, changeset, record)
{:ok, record}
end
{:error, error} ->
{:error, error}
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, changeset, resource)
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
@impl true
def destroy(resource, %{data: record} = changeset) do
record
|> ecto_changeset(changeset, :delete)
ecto_changeset = ecto_changeset(record, changeset, :delete)
try do
ecto_changeset
|> dynamic_repo(resource, changeset).delete(
repo_opts(changeset.timeout, changeset.tenant, changeset.resource)
)
@ -1851,7 +2026,8 @@ defmodule AshPostgres.DataLayer do
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, changeset, resource)
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
@impl true

View file

@ -1376,11 +1376,9 @@ defmodule AshPostgres.Expr do
end
end)
exprs = Enum.reverse(exprs)
%Ecto.Query.DynamicExpr{
fun: fn _query ->
{exprs, Enum.reverse(params), [], []}
{Enum.reverse(exprs), Enum.reverse(params), [], []}
end,
binding: [],
file: __ENV__.file,

View file

@ -161,7 +161,7 @@ defmodule AshPostgres.MixProject do
{:ecto, "~> 3.9"},
{:jason, "~> 1.0"},
{:postgrex, ">= 0.0.0"},
{:ash, ash_version("~> 2.14 and >= 2.14.3")},
{:ash, ash_version(github: "ash-project/ash")},
{:git_ops, "~> 2.5", only: [:dev, :test]},
{:ex_doc, "~> 0.22", only: [:dev, :test], runtime: false},
{:ex_check, "~> 0.14", only: [:dev, :test]},

View file

@ -1,5 +1,5 @@
%{
"ash": {:hex, :ash, "2.14.3", "ebe59b9d390a9c488a30ce7a06c6aeb154845517186d94535c5da2a2753ed20b", [:mix], [{:comparable, "~> 1.0", [hex: :comparable, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: true]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8.0", [hex: :ets, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: false]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:spark, ">= 1.1.20 and < 2.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:stream_data, "~> 0.5.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6366bf0950fd2e6409bc820a9f8046e21086fa73741ed8bf94191d40e6aa1c91"},
"ash": {:git, "https://github.com/ash-project/ash.git", "18cb24e7f720282d46ad6aed7ba065d9cd71604c", []},
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
"comparable": {:hex, :comparable, "1.0.0", "bb669e91cedd14ae9937053e5bcbc3c52bb2f22422611f43b6e38367d94a495f", [:mix], [{:typable, "~> 0.1", [hex: :typable, repo: "hexpm", optional: false]}], "hexpm", "277c11eeb1cd726e7cd41c6c199e7e52fa16ee6830b45ad4cdc62e51f62eb60c"},
@ -33,7 +33,7 @@
"picosat_elixir": {:hex, :picosat_elixir, "0.2.3", "bf326d0f179fbb3b706bb2c15fbc367dacfa2517157d090fdfc32edae004c597", [:make, :mix], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "f76c9db2dec9d2561ffaa9be35f65403d53e984e8cd99c832383b7ab78c16c66"},
"postgrex": {:hex, :postgrex, "0.17.2", "a3ec9e3239d9b33f1e5841565c4eb200055c52cc0757a22b63ca2d529bbe764c", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "80a918a9e9531d39f7bd70621422f3ebc93c01618c645f2d91306f50041ed90c"},
"sobelow": {:hex, :sobelow, "0.11.1", "23438964486f8112b41e743bbfd402da3e5b296fdc9eacab29914b79c48916dd", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9897363a7eff96f4809304a90aad819e2ad5e5d24db547af502885146746a53c"},
"sourceror": {:hex, :sourceror, "0.12.3", "a2ad3a1a4554b486d8a113ae7adad5646f938cad99bf8bfcef26dc0c88e8fade", [:mix], [], "hexpm", "4d4e78010ca046524e8194ffc4683422f34a96f6b82901abbb45acc79ace0316"},
"sourceror": {:hex, :sourceror, "0.13.0", "c6ecc96ee3ae0e042e9082a9550a1989ea40182492dc29024a8d9d2b136e5014", [:mix], [], "hexpm", "d0a819491061cd26bfa4450d1c84301a410c19c1782a6577ce15853fc0e7e4e1"},
"spark": {:hex, :spark, "1.1.22", "68ba00f9acb4c8bc2c93ef82249493687ddf0f0a4f7e79c3c0e22b06719add56", [:mix], [{:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "b798b95990eed8f2409df47b818b5dbcd00e9b5c30d0355465d0b04bbf9b5c4c"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"},

59
test/atomics_test.exs Normal file
View file

@ -0,0 +1,59 @@
defmodule AshPostgres.AtomicsTest do
use AshPostgres.RepoCase, async: false
alias AshPostgres.Test.{Api, Post}
import Ash.Expr
test "a basic atomic works" do
post =
Post
|> Ash.Changeset.for_create(:create, %{title: "foo", price: 1})
|> Api.create!()
assert %{price: 2} =
post
|> Ash.Changeset.for_update(:update, %{})
|> Ash.Changeset.atomic(:price, expr(price + 1))
|> Api.update!()
end
test "an atomic that violates a constraint will return the proper error" do
post =
Post
|> Ash.Changeset.for_create(:create, %{title: "foo", price: 1})
|> Api.create!()
assert_raise Ash.Error.Invalid, ~r/does not exist/, fn ->
post
|> Ash.Changeset.for_update(:update, %{})
|> Ash.Changeset.atomic(:organization_id, Ash.UUID.generate())
|> Api.update!()
end
end
test "an atomic can refer to a calculation" do
post =
Post
|> Ash.Changeset.for_create(:create, %{title: "foo", price: 1})
|> Api.create!()
post =
post
|> Ash.Changeset.for_update(:update, %{})
|> Ash.Changeset.atomic(:score, expr(score_after_winning))
|> Api.update!()
assert post.score == 1
end
test "an atomic can be attached to an action" do
post =
Post
|> Ash.Changeset.for_create(:create, %{title: "foo", price: 1})
|> Api.create!()
assert Post.increment_score!(post, 2).score == 2
assert Post.increment_score!(post, 2).score == 4
end
end

View file

@ -61,6 +61,11 @@ defmodule AshPostgres.Test.Post do
)
)
end
update :increment_score do
argument(:amount, :integer, default: 1)
set(:score, expr((score || 0) + ^arg(:amount)))
end
end
identities do
@ -92,6 +97,7 @@ defmodule AshPostgres.Test.Post do
code_interface do
define_for(AshPostgres.Test.Api)
define(:get_by_id, action: :read, get_by: [:id])
define(:increment_score, args: [{:optional, :amount}])
end
relationships do
@ -144,6 +150,7 @@ defmodule AshPostgres.Test.Post do
end
calculations do
calculate(:score_after_winning, :integer, expr((score || 0) + 1))
calculate(:negative_score, :integer, expr(-score))
calculate(:category_label, :ci_string, expr("(" <> category <> ")"))
calculate(:score_with_score, :string, expr(score <> score))