diff --git a/lib/data_layer.ex b/lib/data_layer.ex index ee602ba..7510610 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -482,6 +482,7 @@ defmodule AshPostgres.DataLayer do def can?(_, :transact), do: true def can?(_, :composite_primary_key), do: true + def can?(_, {:atomic, :update}), do: true def can?(_, :upsert), do: true def can?(_, :changeset_filter), do: true @@ -1327,16 +1328,32 @@ defmodule AshPostgres.DataLayer do Map.get(changeset, :filters, %{}) end + filters = + if changeset.action_type == :create do + filters + else + changeset.resource + |> Ash.Resource.Info.primary_key() + |> Enum.reduce(filters, fn key, filters -> + Map.put(filters, key, Map.get(record, key)) + end) + end + attributes = changeset.resource |> Ash.Resource.Info.attributes() |> Enum.map(& &1.name) + attributes_to_change = + Enum.reject(attributes, fn attribute -> + Keyword.has_key?(changeset.atomics, attribute) + end) + ecto_changeset = record |> to_ecto() |> set_table(changeset, type) - |> Ecto.Changeset.change(Map.take(changeset.attributes, attributes)) + |> Ecto.Changeset.change(Map.take(changeset.attributes, attributes_to_change)) |> Map.update!(:filters, &Map.merge(&1, filters)) |> add_configured_foreign_key_constraints(record.__struct__) |> add_unique_indexes(record.__struct__, changeset) @@ -1423,7 +1440,42 @@ defmodule AshPostgres.DataLayer do ) end - defp handle_raised_error(error, stacktrace, _context, _resource) do + defp handle_raised_error( + %Postgrex.Error{} = error, + stacktrace, + %{constraints: user_constraints}, + _resource + ) do + case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do + [{type, constraint}] -> + user_constraint = + Enum.find(user_constraints, fn c -> + case {c.type, c.constraint, c.match} do + {^type, ^constraint, :exact} -> true + {^type, cc, :suffix} -> String.ends_with?(constraint, cc) + {^type, cc, :prefix} -> String.starts_with?(constraint, cc) + {^type, %Regex{} = r, _match} -> Regex.match?(r, constraint) + _ -> false + end + end) + + case user_constraint do + %{field: field, error_message: error_message, error_type: error_type} -> + {:error, + to_ash_error( + {field, {error_message, [constraint: error_type, constraint_name: constraint]}} + )} + + nil -> + reraise error, stacktrace + end + + _ -> + reraise error, stacktrace + end + end + + defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do {:error, Ash.Error.to_ash_error(error, stacktrace)} end @@ -1812,46 +1864,170 @@ defmodule AshPostgres.DataLayer do @impl true def update(resource, changeset) do - changeset.data - |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset))) - |> ecto_changeset(changeset, :update) - |> dynamic_repo(resource, changeset).update( - repo_opts(changeset.timeout, changeset.tenant, changeset.resource) - ) - |> from_ecto() - |> handle_errors() - |> case do - {:ok, result} -> - maybe_update_tenant(resource, changeset, result) + ecto_changeset = + changeset.data + |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset))) + |> ecto_changeset(changeset, :update) - {:ok, result} + try do + attr_names = + resource + |> Ash.Resource.Info.attributes() + |> Enum.map(& &1.name) - {:error, error} -> - {:error, error} + query = from(row in resource, as: ^0) + + query = + query + |> default_bindings(resource, changeset.context) + |> Ecto.Query.select(^attr_names) + + query = + Enum.reduce(ecto_changeset.filters, query, fn {key, value}, query -> + from(row in query, + where: field(row, ^key) == ^value + ) + end) + + set = + ecto_changeset.changes + |> Map.take(attr_names) + |> Map.to_list() + + atomics_result = + Enum.reduce_while(changeset.atomics, {:ok, query, set}, fn {field, expr}, + {:ok, query, set} -> + used_calculations = + Ash.Filter.used_calculations( + expr, + resource + ) + + used_aggregates = + expr + |> AshPostgres.Aggregate.used_aggregates( + resource, + used_calculations, + [] + ) + |> Enum.map(fn aggregate -> + %{aggregate | load: aggregate.name} + end) + + with {:ok, query} <- + AshPostgres.Join.join_all_relationships( + query, + %Ash.Filter{ + resource: resource, + expression: expr + }, + left_only?: true + ), + {:ok, query} <- + AshPostgres.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0), + dynamic <- + AshPostgres.Expr.dynamic_expr(query, expr, query.__ash_bindings__) do + {:cont, {:ok, query, Keyword.put(set, field, dynamic)}} + else + other -> + {:halt, other} + end + end) + + case atomics_result do + {:ok, query, set} -> + {params, set, _} = + Enum.reduce(set, {[], [], 0}, fn {key, value}, {params, set, count} -> + case AshPostgres.Expr.dynamic_expr(query, value, query.__ash_bindings__) do + %Ecto.Query.DynamicExpr{} = dynamic -> + result = + Ecto.Query.Builder.Dynamic.partially_expand( + :select, + query, + dynamic, + params, + count + ) + + expr = elem(result, 0) + new_params = elem(result, 1) + + new_count = + result |> Tuple.to_list() |> List.last() + + {new_params, [{key, expr} | set], new_count} + + other -> + {[{other, {0, key}} | params], [{key, {:^, [], [count]}} | set], count + 1} + end + end) + + query = + Map.put(query, :updates, [ + %Ecto.Query.QueryExpr{ + # why do I have to reverse the `set`??? + # it breaks if I don't + expr: [set: Enum.reverse(set)], + params: Enum.reverse(params) + } + ]) + + repo_opts = repo_opts(changeset.timeout, changeset.tenant, changeset.resource) + + repo_opts = + Keyword.put(repo_opts, :returning, Keyword.keys(changeset.atomics)) + + result = + dynamic_repo(resource, changeset).update_all( + query, + [], + repo_opts + ) + + case result do + {0, []} -> + {:error, + Ash.Error.Changes.StaleRecord.exception( + resource: resource, + filters: ecto_changeset.filters + )} + + {1, [record]} -> + maybe_update_tenant(resource, changeset, record) + + {:ok, record} + end + + {:error, error} -> + {:error, error} + end + rescue + e -> + handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end - rescue - e -> - handle_raised_error(e, __STACKTRACE__, changeset, resource) end @impl true def destroy(resource, %{data: record} = changeset) do - record - |> ecto_changeset(changeset, :delete) - |> dynamic_repo(resource, changeset).delete( - repo_opts(changeset.timeout, changeset.tenant, changeset.resource) - ) - |> from_ecto() - |> case do - {:ok, _record} -> - :ok + ecto_changeset = ecto_changeset(record, changeset, :delete) - {:error, error} -> - handle_errors({:error, error}) + try do + ecto_changeset + |> dynamic_repo(resource, changeset).delete( + repo_opts(changeset.timeout, changeset.tenant, changeset.resource) + ) + |> from_ecto() + |> case do + {:ok, _record} -> + :ok + + {:error, error} -> + handle_errors({:error, error}) + end + rescue + e -> + handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end - rescue - e -> - handle_raised_error(e, __STACKTRACE__, changeset, resource) end @impl true diff --git a/lib/expr.ex b/lib/expr.ex index 765324b..d1c4952 100644 --- a/lib/expr.ex +++ b/lib/expr.ex @@ -1376,11 +1376,9 @@ defmodule AshPostgres.Expr do end end) - exprs = Enum.reverse(exprs) - %Ecto.Query.DynamicExpr{ fun: fn _query -> - {exprs, Enum.reverse(params), [], []} + {Enum.reverse(exprs), Enum.reverse(params), [], []} end, binding: [], file: __ENV__.file, diff --git a/mix.exs b/mix.exs index 4f77a4a..15fda55 100644 --- a/mix.exs +++ b/mix.exs @@ -161,7 +161,7 @@ defmodule AshPostgres.MixProject do {:ecto, "~> 3.9"}, {:jason, "~> 1.0"}, {:postgrex, ">= 0.0.0"}, - {:ash, ash_version("~> 2.14 and >= 2.14.3")}, + {:ash, ash_version(github: "ash-project/ash")}, {:git_ops, "~> 2.5", only: [:dev, :test]}, {:ex_doc, "~> 0.22", only: [:dev, :test], runtime: false}, {:ex_check, "~> 0.14", only: [:dev, :test]}, diff --git a/mix.lock b/mix.lock index 755d351..1abc040 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{ - "ash": {:hex, :ash, "2.14.3", "ebe59b9d390a9c488a30ce7a06c6aeb154845517186d94535c5da2a2753ed20b", [:mix], [{:comparable, "~> 1.0", [hex: :comparable, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: true]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8.0", [hex: :ets, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: false]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:spark, ">= 1.1.20 and < 2.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:stream_data, "~> 0.5.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6366bf0950fd2e6409bc820a9f8046e21086fa73741ed8bf94191d40e6aa1c91"}, + "ash": {:git, "https://github.com/ash-project/ash.git", "18cb24e7f720282d46ad6aed7ba065d9cd71604c", []}, "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"}, "comparable": {:hex, :comparable, "1.0.0", "bb669e91cedd14ae9937053e5bcbc3c52bb2f22422611f43b6e38367d94a495f", [:mix], [{:typable, "~> 0.1", [hex: :typable, repo: "hexpm", optional: false]}], "hexpm", "277c11eeb1cd726e7cd41c6c199e7e52fa16ee6830b45ad4cdc62e51f62eb60c"}, @@ -33,7 +33,7 @@ "picosat_elixir": {:hex, :picosat_elixir, "0.2.3", "bf326d0f179fbb3b706bb2c15fbc367dacfa2517157d090fdfc32edae004c597", [:make, :mix], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "f76c9db2dec9d2561ffaa9be35f65403d53e984e8cd99c832383b7ab78c16c66"}, "postgrex": {:hex, :postgrex, "0.17.2", "a3ec9e3239d9b33f1e5841565c4eb200055c52cc0757a22b63ca2d529bbe764c", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "80a918a9e9531d39f7bd70621422f3ebc93c01618c645f2d91306f50041ed90c"}, "sobelow": {:hex, :sobelow, "0.11.1", "23438964486f8112b41e743bbfd402da3e5b296fdc9eacab29914b79c48916dd", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9897363a7eff96f4809304a90aad819e2ad5e5d24db547af502885146746a53c"}, - "sourceror": {:hex, :sourceror, "0.12.3", "a2ad3a1a4554b486d8a113ae7adad5646f938cad99bf8bfcef26dc0c88e8fade", [:mix], [], "hexpm", "4d4e78010ca046524e8194ffc4683422f34a96f6b82901abbb45acc79ace0316"}, + "sourceror": {:hex, :sourceror, "0.13.0", "c6ecc96ee3ae0e042e9082a9550a1989ea40182492dc29024a8d9d2b136e5014", [:mix], [], "hexpm", "d0a819491061cd26bfa4450d1c84301a410c19c1782a6577ce15853fc0e7e4e1"}, "spark": {:hex, :spark, "1.1.22", "68ba00f9acb4c8bc2c93ef82249493687ddf0f0a4f7e79c3c0e22b06719add56", [:mix], [{:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "b798b95990eed8f2409df47b818b5dbcd00e9b5c30d0355465d0b04bbf9b5c4c"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"}, diff --git a/test/atomics_test.exs b/test/atomics_test.exs new file mode 100644 index 0000000..2f8b76d --- /dev/null +++ b/test/atomics_test.exs @@ -0,0 +1,59 @@ +defmodule AshPostgres.AtomicsTest do + use AshPostgres.RepoCase, async: false + alias AshPostgres.Test.{Api, Post} + + import Ash.Expr + + test "a basic atomic works" do + post = + Post + |> Ash.Changeset.for_create(:create, %{title: "foo", price: 1}) + |> Api.create!() + + assert %{price: 2} = + post + |> Ash.Changeset.for_update(:update, %{}) + |> Ash.Changeset.atomic(:price, expr(price + 1)) + |> Api.update!() + end + + test "an atomic that violates a constraint will return the proper error" do + post = + Post + |> Ash.Changeset.for_create(:create, %{title: "foo", price: 1}) + |> Api.create!() + + assert_raise Ash.Error.Invalid, ~r/does not exist/, fn -> + post + |> Ash.Changeset.for_update(:update, %{}) + |> Ash.Changeset.atomic(:organization_id, Ash.UUID.generate()) + |> Api.update!() + end + end + + test "an atomic can refer to a calculation" do + post = + Post + |> Ash.Changeset.for_create(:create, %{title: "foo", price: 1}) + |> Api.create!() + + post = + post + |> Ash.Changeset.for_update(:update, %{}) + |> Ash.Changeset.atomic(:score, expr(score_after_winning)) + |> Api.update!() + + assert post.score == 1 + end + + test "an atomic can be attached to an action" do + post = + Post + |> Ash.Changeset.for_create(:create, %{title: "foo", price: 1}) + |> Api.create!() + + assert Post.increment_score!(post, 2).score == 2 + + assert Post.increment_score!(post, 2).score == 4 + end +end diff --git a/test/support/resources/post.ex b/test/support/resources/post.ex index bc7c86b..6ab5a36 100644 --- a/test/support/resources/post.ex +++ b/test/support/resources/post.ex @@ -61,6 +61,11 @@ defmodule AshPostgres.Test.Post do ) ) end + + update :increment_score do + argument(:amount, :integer, default: 1) + set(:score, expr((score || 0) + ^arg(:amount))) + end end identities do @@ -92,6 +97,7 @@ defmodule AshPostgres.Test.Post do code_interface do define_for(AshPostgres.Test.Api) define(:get_by_id, action: :read, get_by: [:id]) + define(:increment_score, args: [{:optional, :amount}]) end relationships do @@ -144,6 +150,7 @@ defmodule AshPostgres.Test.Post do end calculations do + calculate(:score_after_winning, :integer, expr((score || 0) + 1)) calculate(:negative_score, :integer, expr(-score)) calculate(:category_label, :ci_string, expr("(" <> category <> ")")) calculate(:score_with_score, :string, expr(score <> score))