fix: properly support aggregate references in atomic updates

(yes, you read that right)
This commit is contained in:
Zach Daniel 2024-05-29 12:29:34 -04:00
parent d3b2e96b7b
commit ff47ff0e06
6 changed files with 174 additions and 96 deletions

View file

@ -1433,98 +1433,7 @@ defmodule AshPostgres.DataLayer do
end end
defp bulk_updatable_query(query, resource, atomics, calculations, context, type \\ :update) do defp bulk_updatable_query(query, resource, atomics, calculations, context, type \\ :update) do
requires_adding_inner_join? = Enum.reduce_while(atomics, {:ok, query}, fn {_, expr}, {:ok, query} ->
case type do
:update ->
# could potentially optimize this to avoid the subquery by shuffling free
# inner joins to the top of the query
has_inner_join_to_start? =
case Enum.at(query.joins, 0) do
nil ->
false
%{qual: :inner} ->
true
_ ->
false
end
cond do
has_inner_join_to_start? ->
false
Enum.any?(query.joins, &(&1.qual != :inner)) ->
true
Enum.any?(atomics ++ calculations, fn {_, expr} ->
Ash.Filter.list_refs(expr) |> Enum.any?(&(&1.relationship_path != []))
end) ->
true
true ->
false
end
:destroy ->
Enum.any?(query.joins, &(&1.qual != :inner)) ||
Enum.any?(atomics ++ calculations, fn {_, expr} ->
expr |> Ash.Filter.list_refs() |> Enum.any?(&(&1.relationship_path != []))
end)
end
needs_to_join? =
requires_adding_inner_join? ||
query.limit || query.offset
query =
if needs_to_join? do
root_query = Ecto.Query.exclude(query, :select)
root_query =
cond do
query.limit || query.offset ->
from(row in Ecto.Query.subquery(root_query), [])
!Enum.empty?(query.joins) ->
from(row in Ecto.Query.subquery(Ecto.Query.exclude(root_query, :order_by)), [])
true ->
Ecto.Query.exclude(root_query, :order_by)
end
dynamic =
Enum.reduce(Ash.Resource.Info.primary_key(resource), nil, fn pkey, dynamic ->
if dynamic do
Ecto.Query.dynamic(
[row, joining],
field(row, ^pkey) == field(joining, ^pkey) and ^dynamic
)
else
Ecto.Query.dynamic([row, joining], field(row, ^pkey) == field(joining, ^pkey))
end
end)
faked_query =
from(row in query.from.source,
inner_join: limiter in ^root_query,
as: ^0,
on: ^dynamic
)
|> AshSql.Bindings.default_bindings(
query.__ash_bindings__.resource,
AshPostgres.SqlImplementation,
context
)
faked_query
else
query
|> Ecto.Query.exclude(:select)
|> Ecto.Query.exclude(:order_by)
end
Enum.reduce_while(atomics ++ calculations, {:ok, query}, fn {_, expr}, {:ok, query} ->
used_aggregates = used_aggregates =
Ash.Filter.used_aggregates(expr, []) Ash.Filter.used_aggregates(expr, [])
@ -1545,6 +1454,154 @@ defmodule AshPostgres.DataLayer do
{:halt, {:error, error}} {:halt, {:error, error}}
end end
end) end)
|> case do
{:ok, query} ->
requires_adding_inner_join? =
case type do
:update ->
# could potentially optimize this to avoid the subquery by shuffling free
# inner joins to the top of the query
has_inner_join_to_start? =
case Enum.at(query.joins, 0) do
nil ->
false
%{qual: :inner} ->
true
_ ->
false
end
cond do
has_inner_join_to_start? ->
false
Enum.any?(query.joins, &(&1.qual != :inner)) ->
true
Enum.any?(atomics ++ calculations, fn {_, expr} ->
Ash.Filter.list_refs(expr) |> Enum.any?(&(&1.relationship_path != []))
end) ->
true
true ->
false
end
:destroy ->
Enum.any?(query.joins, &(&1.qual != :inner)) ||
Enum.any?(atomics ++ calculations, fn {_, expr} ->
expr |> Ash.Filter.list_refs() |> Enum.any?(&(&1.relationship_path != []))
end)
end
needs_to_join? =
requires_adding_inner_join? ||
query.limit || query.offset
query =
if needs_to_join? do
root_query = Ecto.Query.exclude(query, :select)
root_query_result =
cond do
query.limit || query.offset ->
with {:ok, root_query} <-
AshSql.Atomics.select_atomics(resource, root_query, atomics) do
{:ok, from(row in Ecto.Query.subquery(root_query), []), atomics != []}
end
!Enum.empty?(query.joins) ->
with root_query <- Ecto.Query.exclude(root_query, :order_by),
{:ok, root_query} <-
AshSql.Atomics.select_atomics(resource, root_query, atomics) do
{:ok, from(row in Ecto.Query.subquery(root_query), []), atomics != []}
end
true ->
{:ok, Ecto.Query.exclude(root_query, :order_by), false}
end
case root_query_result do
{:ok, root_query, selected_atomics?} ->
dynamic =
Enum.reduce(Ash.Resource.Info.primary_key(resource), nil, fn pkey, dynamic ->
if dynamic do
Ecto.Query.dynamic(
[row, joining],
field(row, ^pkey) == field(joining, ^pkey) and ^dynamic
)
else
Ecto.Query.dynamic(
[row, joining],
field(row, ^pkey) == field(joining, ^pkey)
)
end
end)
faked_query =
from(row in query.from.source,
inner_join: limiter in ^root_query,
as: ^0,
on: ^dynamic
)
|> AshSql.Bindings.default_bindings(
query.__ash_bindings__.resource,
AshPostgres.SqlImplementation,
context
)
|> then(fn query ->
if selected_atomics? do
Map.update!(query, :__ash_bindings__, &Map.put(&1, :atomics_in_binding, 0))
else
query
end
end)
{:ok, faked_query}
{:error, error} ->
{:error, error}
end
else
{:ok,
query
|> Ecto.Query.exclude(:select)
|> Ecto.Query.exclude(:order_by)}
end
case query do
{:ok, query} ->
Enum.reduce_while(calculations, {:ok, query}, fn {_, expr}, {:ok, query} ->
used_aggregates =
Ash.Filter.used_aggregates(expr, [])
with {:ok, query} <-
AshSql.Join.join_all_relationships(
query,
%Ash.Filter{
resource: resource,
expression: expr
},
left_only?: true
),
{:ok, query} <-
AshSql.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0) do
{:cont, {:ok, query}}
else
{:error, error} ->
{:halt, {:error, error}}
end
end)
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
end end
@impl true @impl true
@ -1563,7 +1620,7 @@ defmodule AshPostgres.DataLayer do
case bulk_updatable_query( case bulk_updatable_query(
query, query,
resource, resource,
changeset.atomics, [],
options[:calculations] || [], options[:calculations] || [],
changeset.context, changeset.context,
:destroy :destroy

View file

@ -163,7 +163,7 @@ defmodule AshPostgres.MixProject do
defp deps do defp deps do
[ [
{:ash, ash_version("~> 3.0 and >= 3.0.7")}, {:ash, ash_version("~> 3.0 and >= 3.0.7")},
{:ash_sql, ash_sql_version("~> 0.1 and >= 0.1.3")}, {:ash_sql, ash_sql_version("~> 0.2")},
{:ecto_sql, "~> 3.9"}, {:ecto_sql, "~> 3.9"},
{:ecto, "~> 3.9"}, {:ecto, "~> 3.9"},
{:jason, "~> 1.0"}, {:jason, "~> 1.0"},

View file

@ -1,6 +1,6 @@
%{ %{
"ash": {:hex, :ash, "3.0.8", "e84a0707205e2a1ed16e9c1acaf32e08658bf4a36cba460eefaf79fedf92abd7", [:mix], [{:comparable, "~> 1.0", [hex: :comparable, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, ">= 0.8.1 and < 1.0.0-0", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.1.18 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "92436ab7c465d8a8706383cb9cfd9fbf074d4bd8632b86895a6e6bf3b9eee2cd"}, "ash": {:hex, :ash, "3.0.8", "e84a0707205e2a1ed16e9c1acaf32e08658bf4a36cba460eefaf79fedf92abd7", [:mix], [{:comparable, "~> 1.0", [hex: :comparable, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, ">= 0.8.1 and < 1.0.0-0", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.1.18 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "92436ab7c465d8a8706383cb9cfd9fbf074d4bd8632b86895a6e6bf3b9eee2cd"},
"ash_sql": {:hex, :ash_sql, "0.1.3", "c9acc4809b7f253aad31764024aee0cd632077a32cff6bea3b105c7b8d9015b7", [:mix], [{:ash, "~> 3.0", [hex: :ash, repo: "hexpm", optional: false]}, {:ecto, "~> 3.9", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.9", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "d2d3d1044f0fa48454d0cdaeb22d55a2de3210d48a2208fd2eecf6f3007a5216"}, "ash_sql": {:hex, :ash_sql, "0.2.0", "9a80af47d31e0e0f0c8596fadb4daeb3ea322d00de710b12006137f9c7bee859", [:mix], [{:ash, "~> 3.0", [hex: :ash, repo: "hexpm", optional: false]}, {:ecto, "~> 3.9", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.9", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "bc8997b6fdf52a0144c17969aef88bd2dc22958c8d1b1c18fbcfb4bec3b849f1"},
"benchee": {:hex, :benchee, "1.3.0", "f64e3b64ad3563fa9838146ddefb2d2f94cf5b473bdfd63f5ca4d0657bf96694", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "34f4294068c11b2bd2ebf2c59aac9c7da26ffa0068afdf3419f1b176e16c5f81"}, "benchee": {:hex, :benchee, "1.3.0", "f64e3b64ad3563fa9838146ddefb2d2f94cf5b473bdfd63f5ca4d0657bf96694", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "34f4294068c11b2bd2ebf2c59aac9c7da26ffa0068afdf3419f1b176e16c5f81"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"comparable": {:hex, :comparable, "1.0.0", "bb669e91cedd14ae9937053e5bcbc3c52bb2f22422611f43b6e38367d94a495f", [:mix], [{:typable, "~> 0.1", [hex: :typable, repo: "hexpm", optional: false]}], "hexpm", "277c11eeb1cd726e7cd41c6c199e7e52fa16ee6830b45ad4cdc62e51f62eb60c"}, "comparable": {:hex, :comparable, "1.0.0", "bb669e91cedd14ae9937053e5bcbc3c52bb2f22422611f43b6e38367d94a495f", [:mix], [{:typable, "~> 0.1", [hex: :typable, repo: "hexpm", optional: false]}], "hexpm", "277c11eeb1cd726e7cd41c6c199e7e52fa16ee6830b45ad4cdc62e51f62eb60c"},

View file

@ -78,6 +78,23 @@ defmodule AshPostgres.AtomicsTest do
|> Ash.update!() |> Ash.update!()
end end
test "an atomic update can be set to the value of an aggregate" do
author =
Author
|> Ash.Changeset.for_create(:create, %{first_name: "John", last_name: "Doe"})
|> Ash.create!()
post =
Post
|> Ash.Changeset.for_create(:create, %{title: "bar", author_id: author.id})
|> Ash.create!()
# just asserting that there is no exception here
post
|> Ash.Changeset.for_update(:set_title_to_sum_of_author_count_of_posts)
|> Ash.update!()
end
test "an atomic validation is based on where it appears in the action" do test "an atomic validation is based on where it appears in the action" do
post = post =
Post Post

View file

@ -241,7 +241,7 @@ defmodule AshPostgres.MigrationGeneratorTest do
assert file_contents =~ ~S{create index(:posts, ["id"]} assert file_contents =~ ~S{create index(:posts, ["id"]}
assert file_contents =~ assert file_contents =~
~S{create unique_index(:posts, ["second_title"], name: "posts_second_title_index", prefix: "example", nulls_distinct: false, where: "(second_title like '%foo%')")} ~S{create unique_index(:posts, ["second_title"], name: "posts_second_title_index", prefix: "example", nulls_distinct: false, where: "((second_title like '%foo%'))")}
# the migration adds the id, with its default # the migration adds the id, with its default
assert file_contents =~ assert file_contents =~

View file

@ -96,6 +96,10 @@ defmodule AshPostgres.Test.Post do
change(filter(expr(title == "fred"))) change(filter(expr(title == "fred")))
end end
update :set_title_to_sum_of_author_count_of_posts do
change(atomic_update(:title, expr("#{sum_of_author_count_of_posts}")))
end
destroy :destroy_with_confirm do destroy :destroy_with_confirm do
require_atomic?(false) require_atomic?(false)
argument(:confirm, :string, allow_nil?: false) argument(:confirm, :string, allow_nil?: false)