ash_postgres/lib/aggregate.ex

704 lines
20 KiB
Elixir
Raw Normal View History

2021-12-21 16:19:24 +13:00
defmodule AshPostgres.Aggregate do
@moduledoc false
import Ecto.Query, only: [from: 2, subquery: 1]
require Ecto.Query
def add_aggregates(query, aggregates, resource, select? \\ true)
def add_aggregates(query, [], _, _), do: {:ok, query}
2022-01-14 08:11:30 +13:00
def add_aggregates(query, aggregates, resource, select?) do
2022-01-14 08:11:30 +13:00
resource = resource
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer.default_bindings(query, resource)
2022-01-14 08:11:30 +13:00
result =
Enum.reduce_while(aggregates, {:ok, query, []}, fn aggregate, {:ok, query, dynamics} ->
if aggregate.query && !aggregate.query.valid? do
{:halt, {:error, aggregate.query.errors}}
else
has_exists? =
Ash.Filter.find(aggregate.query && aggregate.query.filter, fn
%Ash.Query.Exists{} -> true
_ -> false
end)
name_match =
if has_exists? do
aggregate.name
end
query_and_binding =
case AshPostgres.DataLayer.get_binding(
resource,
aggregate.relationship_path,
query,
:aggregate,
name_match
) do
nil ->
relationship =
Ash.Resource.Info.relationship(resource, aggregate.relationship_path)
if relationship.type == :many_to_many do
subquery = aggregate_subquery(relationship, aggregate, query, has_exists?)
case AshPostgres.Join.join_all_relationships(
query,
nil,
[
{{:aggregate, aggregate.name, subquery, has_exists?},
AshPostgres.Join.relationship_path_to_relationships(
resource,
aggregate.relationship_path
)}
2022-06-30 07:08:49 +12:00
]
) do
{:ok, new_query} ->
{:ok,
{new_query,
AshPostgres.DataLayer.get_binding(
resource,
aggregate.relationship_path,
new_query,
:aggregate,
name_match
)}}
{:error, error} ->
{:error, error}
end
else
subquery = aggregate_subquery(relationship, aggregate, query, has_exists?)
case AshPostgres.Join.join_all_relationships(
query,
nil,
[
{{:aggregate, aggregate.name, subquery, has_exists?},
AshPostgres.Join.relationship_path_to_relationships(
resource,
aggregate.relationship_path
)}
2022-06-30 07:08:49 +12:00
]
) do
{:ok, new_query} ->
{:ok,
{new_query,
AshPostgres.DataLayer.get_binding(
resource,
aggregate.relationship_path,
new_query,
:aggregate,
name_match
)}}
{:error, error} ->
{:error, error}
end
2022-01-14 08:11:30 +13:00
end
binding ->
{:ok, {query, binding}}
end
2021-12-21 16:19:24 +13:00
case query_and_binding do
{:ok, {query, binding}} ->
query_with_aggregate_binding =
put_in(
query.__ash_bindings__.aggregates,
Map.put(query.__ash_bindings__.aggregates, aggregate.name, binding)
)
2021-12-21 16:19:24 +13:00
query_with_aggregate_defs =
put_in(
2022-01-14 08:11:30 +13:00
query_with_aggregate_binding.__ash_bindings__.aggregate_defs,
Map.put(
query_with_aggregate_binding.__ash_bindings__.aggregate_defs,
aggregate.name,
aggregate
)
2022-01-14 08:11:30 +13:00
)
2021-12-21 16:19:24 +13:00
new_query =
query_with_aggregate_defs
|> add_aggregate_to_subquery(resource, aggregate, binding, has_exists?)
2022-01-14 08:11:30 +13:00
if select? do
dynamic = select_dynamic(resource, query, aggregate, name_match)
{:cont, {:ok, new_query, [{aggregate.load, aggregate.name, dynamic} | dynamics]}}
else
{:cont, {:ok, new_query, dynamics}}
end
2022-01-14 08:11:30 +13:00
{:error, error} ->
{:halt, {:error, error}}
end
2022-01-14 08:11:30 +13:00
end
end)
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
case result do
{:ok, query, dynamics} ->
{:ok, add_aggregate_selects(query, dynamics)}
2021-12-21 16:19:24 +13:00
{:error, error} ->
{:error, error}
end
end
2022-01-14 08:11:30 +13:00
defp add_aggregate_selects(query, dynamics) do
{in_aggregates, in_body} =
Enum.split_with(dynamics, fn {load, _name, _dynamic} -> is_nil(load) end)
aggs =
in_body
|> Map.new(fn {load, _, dynamic} ->
{load, dynamic}
end)
aggs =
if Enum.empty?(in_aggregates) do
aggs
2022-01-14 08:11:30 +13:00
else
Map.put(
aggs,
:aggregates,
Map.new(in_aggregates, fn {_, name, dynamic} ->
{name, dynamic}
end)
)
2022-01-14 08:11:30 +13:00
end
Ecto.Query.select_merge(query, ^aggs)
2022-01-14 08:11:30 +13:00
end
def agg_subquery_for_lateral_join(
current_binding,
query,
subquery,
%{
manual: {module, opts}
} = relationship
) do
case module.ash_postgres_subquery(
opts,
current_binding,
0,
subquery
) do
{:ok, inner_sub} ->
{:ok,
from(sub in subquery(inner_sub), [])
|> AshPostgres.Join.set_join_prefix(query, relationship.destination)}
other ->
other
end
rescue
e in UndefinedFunctionError ->
if e.function == :ash_postgres_subquery do
reraise """
Cannot join to a manual relationship #{inspect(module)} that does not implement the `AshPostgres.ManualRelationship` behaviour.
""",
__STACKTRACE__
else
reraise e, __STACKTRACE__
end
end
2021-12-21 16:19:24 +13:00
def agg_subquery_for_lateral_join(current_binding, query, subquery, relationship) do
{dest_binding, dest_field} =
case relationship.type do
:many_to_many ->
2022-08-19 06:56:36 +12:00
{1, relationship.source_attribute_on_join_resource}
2021-12-21 16:19:24 +13:00
_ ->
2022-08-19 06:56:36 +12:00
{0, relationship.destination_attribute}
2021-12-21 16:19:24 +13:00
end
inner_sub =
2022-08-19 06:56:36 +12:00
if Map.get(relationship, :no_attributes?) do
subquery
else
from(destination in subquery,
where:
field(as(^dest_binding), ^dest_field) ==
field(parent_as(^current_binding), ^relationship.source_attribute)
)
end
2021-12-21 16:19:24 +13:00
{:ok,
from(sub in subquery(inner_sub), [])
|> AshPostgres.Join.set_join_prefix(query, relationship.destination)}
2021-12-21 16:19:24 +13:00
end
defp select_dynamic(resource, query, aggregate, name_match) do
2021-12-21 16:19:24 +13:00
binding =
AshPostgres.DataLayer.get_binding(
resource,
aggregate.relationship_path,
query,
:aggregate,
name_match
)
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
2021-12-21 16:19:24 +13:00
field =
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(
type(
field(as(^binding), ^aggregate.name),
^type
)
2021-12-21 16:19:24 +13:00
)
2022-02-17 16:04:54 +13:00
else
Ecto.Query.dynamic(field(as(^binding), ^aggregate.name))
end
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
coalesced =
if is_nil(aggregate.default_value) do
field
else
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(
coalesce(
^field,
type(
^aggregate.default_value,
^type
)
2022-01-25 11:59:31 +13:00
)
2021-12-21 16:19:24 +13:00
)
2022-02-17 16:04:54 +13:00
else
Ecto.Query.dynamic(
coalesce(
^field,
^aggregate.default_value
)
)
end
2022-01-25 11:59:31 +13:00
end
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(type(^coalesced, ^type))
else
coalesced
end
2021-12-21 16:19:24 +13:00
end
defp add_aggregate_to_subquery(query, resource, aggregate, binding, has_exists?) do
2021-12-21 16:19:24 +13:00
new_joins =
List.update_at(query.joins, binding - 1, fn join ->
aggregate_query =
if aggregate.authorization_filter do
{:ok, filtered} =
2021-12-21 16:19:24 +13:00
AshPostgres.DataLayer.filter(
join.source.from.source.query,
aggregate.authorization_filter,
Ash.Resource.Info.related(resource, aggregate.relationship_path)
)
filtered
2021-12-21 16:19:24 +13:00
else
join.source.from.source.query
end
{:ok, aggregate_query} =
AshPostgres.Aggregate.add_aggregates(
aggregate_query,
Map.values(aggregate.query.aggregates || %{}),
Ash.Resource.Info.related(resource, aggregate.relationship_path),
false
)
2022-02-12 10:06:51 +13:00
{:ok, aggregate_query} =
if aggregate.query && aggregate.query.filter do
AshPostgres.Join.join_all_relationships(
aggregate_query,
2022-06-30 07:08:49 +12:00
aggregate.query.filter
2022-02-12 10:06:51 +13:00
)
else
{:ok, aggregate_query}
end
new_aggregate_query =
add_subquery_aggregate_select(aggregate_query, aggregate, resource, has_exists?)
2021-12-21 16:19:24 +13:00
put_in(join.source.from.source.query, new_aggregate_query)
end)
%{
query
| joins: new_joins
}
end
def used_aggregates(filter, relationship, used_calculations, path) do
2022-02-12 10:06:51 +13:00
Ash.Filter.used_aggregates(filter, path) ++
2021-12-21 16:19:24 +13:00
Enum.flat_map(
used_calculations,
fn calculation ->
case Ash.Filter.hydrate_refs(
calculation.module.expression(calculation.opts, calculation.context),
%{
resource: relationship.destination,
aggregates: %{},
calculations: %{},
public?: false
}
) do
{:ok, hydrated} ->
Ash.Filter.used_aggregates(hydrated)
_ ->
[]
end
end
)
end
def add_subquery_aggregate_select(query, %{kind: :first} = aggregate, _resource, has_exists?) do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
key = aggregate.field
2022-01-25 11:59:31 +13:00
2021-12-21 16:19:24 +13:00
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
field =
if aggregate.query && aggregate.query.sort && aggregate.query.sort != [] do
2022-01-25 11:59:31 +13:00
sort_expr = AshPostgres.Sort.order_to_ecto(aggregate.query.sort)
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
field = Ecto.Query.dynamic([{^0, row}], field(row, ^key))
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
{:ok, expr} =
AshPostgres.Functions.Fragment.casted_new(
["array_agg(? ORDER BY #{question_marks})", field] ++ sort_expr
)
2022-01-14 08:11:30 +13:00
AshPostgres.Expr.dynamic_expr(
query,
expr,
query.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type({:array, aggregate.type}, [])
)
2021-12-21 16:19:24 +13:00
else
2022-01-14 08:11:30 +13:00
Ecto.Query.dynamic(
[row],
fragment("array_agg(?)", field(row, ^key))
)
2021-12-21 16:19:24 +13:00
end
filtered =
if !has_exists? && aggregate.query && aggregate.query.filter &&
2021-12-21 16:19:24 +13:00
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
expr =
AshPostgres.Expr.dynamic_expr(
2022-01-25 11:59:31 +13:00
query,
2021-12-21 16:19:24 +13:00
aggregate.query.filter,
query.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, [])
2021-12-21 16:19:24 +13:00
)
Ecto.Query.dynamic(filter(^field, ^expr))
else
field
end
value = Ecto.Query.dynamic(fragment("(?)[1]", ^filtered))
with_default =
if aggregate.default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(coalesce(^value, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^value, ^aggregate.default_value))
end
2021-12-21 16:19:24 +13:00
else
value
end
2022-02-17 16:04:54 +13:00
casted =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
select_or_merge(query, aggregate.name, casted)
2021-12-21 16:19:24 +13:00
end
def add_subquery_aggregate_select(query, %{kind: :list} = aggregate, _resource, has_exists?) do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
key = aggregate.field
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
field =
if aggregate.query && aggregate.query.sort && aggregate.query.sort != [] do
2022-01-25 11:59:31 +13:00
sort_expr = AshPostgres.Sort.order_to_ecto(aggregate.query.sort)
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
field = Ecto.Query.dynamic([row], field(row, ^key))
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
{:ok, expr} =
AshPostgres.Functions.Fragment.casted_new(
["array_agg(? ORDER BY #{question_marks})", field] ++ sort_expr
)
2022-01-14 08:11:30 +13:00
AshPostgres.Expr.dynamic_expr(
query,
expr,
query.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, [])
)
2021-12-21 16:19:24 +13:00
else
2022-01-14 08:11:30 +13:00
Ecto.Query.dynamic(
[row],
fragment("array_agg(?)", field(row, ^key))
)
2021-12-21 16:19:24 +13:00
end
filtered =
if !has_exists? && aggregate.query && aggregate.query.filter &&
2021-12-21 16:19:24 +13:00
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
expr =
AshPostgres.Expr.dynamic_expr(
2022-01-25 11:59:31 +13:00
query,
2021-12-21 16:19:24 +13:00
aggregate.query.filter,
query.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, [])
2021-12-21 16:19:24 +13:00
)
Ecto.Query.dynamic(filter(^field, ^expr))
else
field
end
with_default =
if aggregate.default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
end
2021-12-21 16:19:24 +13:00
else
filtered
end
2022-02-17 16:04:54 +13:00
cast =
if type do
Ecto.Query.dynamic(type(^with_default, ^{:array, type}))
else
with_default
end
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
select_or_merge(query, aggregate.name, cast)
2021-12-21 16:19:24 +13:00
end
def add_subquery_aggregate_select(query, %{kind: kind} = aggregate, resource, has_exists?)
2021-12-21 16:19:24 +13:00
when kind in [:count, :sum] do
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
key = aggregate.field || List.first(Ash.Resource.Info.primary_key(resource))
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
field =
case kind do
:count ->
Ecto.Query.dynamic([row], count(field(row, ^key)))
:sum ->
Ecto.Query.dynamic([row], sum(field(row, ^key)))
end
filtered =
if !has_exists? && aggregate.query && aggregate.query.filter &&
2021-12-21 16:19:24 +13:00
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
expr =
AshPostgres.Expr.dynamic_expr(
2022-01-25 11:59:31 +13:00
query,
2021-12-21 16:19:24 +13:00
aggregate.query.filter,
query.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, [])
2021-12-21 16:19:24 +13:00
)
Ecto.Query.dynamic(filter(^field, ^expr))
else
field
end
with_default =
if aggregate.default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
end
else
filtered
end
2022-02-17 16:04:54 +13:00
cast =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
select_or_merge(query, aggregate.name, cast)
2021-12-21 16:19:24 +13:00
end
2022-01-14 08:11:30 +13:00
defp select_or_merge(query, aggregate_name, casted) do
2022-01-25 11:59:31 +13:00
query =
if query.select do
query
else
Ecto.Query.select(query, %{})
end
Ecto.Query.select_merge(query, ^%{aggregate_name => casted})
2021-12-21 16:19:24 +13:00
end
defp aggregate_subquery(
%{type: :many_to_many} = relationship,
aggregate,
root_query,
has_exists?
) do
2021-12-21 16:19:24 +13:00
destination =
case AshPostgres.Join.maybe_get_resource_query(
relationship.destination,
relationship,
root_query
) do
{:ok, query} ->
query
_ ->
relationship.destination
end
join_relationship =
Ash.Resource.Info.relationship(relationship.source, relationship.join_relationship)
through =
case AshPostgres.Join.maybe_get_resource_query(
relationship.through,
join_relationship,
root_query
) do
{:ok, query} ->
query
_ ->
relationship.through
end
destination =
if has_exists? && aggregate.query && aggregate.query.filter &&
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
expr =
AshPostgres.Expr.dynamic_expr(
destination,
aggregate.query.filter,
destination.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, [])
)
Ecto.Query.where(destination, ^expr)
else
destination
end
2021-12-21 16:19:24 +13:00
query =
from(destination in destination,
join: through in ^through,
2022-01-25 11:59:31 +13:00
as: ^1,
2021-12-21 16:19:24 +13:00
on:
2022-08-19 06:56:36 +12:00
field(through, ^relationship.destination_attribute_on_join_resource) ==
field(destination, ^relationship.destination_attribute),
group_by: field(through, ^relationship.source_attribute_on_join_resource)
2021-12-21 16:19:24 +13:00
)
query_tenant = aggregate.query && aggregate.query.tenant
root_tenant = root_query.prefix
if Ash.Resource.Info.multitenancy_strategy(relationship.destination) &&
(root_tenant ||
query_tenant) do
Ecto.Query.put_query_prefix(
query,
2022-08-24 11:56:46 +12:00
query_tenant || root_tenant || AshPostgres.DataLayer.Info.schema(relationship.destination)
)
2021-12-21 16:19:24 +13:00
else
%{
query
| prefix:
2022-08-24 11:56:46 +12:00
AshPostgres.DataLayer.Info.schema(relationship.destination) ||
AshPostgres.DataLayer.Info.repo(relationship.destination).config()[:default_prefix] ||
"public"
2021-12-21 16:19:24 +13:00
}
end
end
defp aggregate_subquery(relationship, aggregate, root_query, has_exists?) do
2021-12-21 16:19:24 +13:00
destination =
case AshPostgres.Join.maybe_get_resource_query(
relationship.destination,
relationship,
root_query
) do
{:ok, query} ->
query
_ ->
relationship.destination
end
destination =
if has_exists? && aggregate.query && aggregate.query.filter &&
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
expr =
AshPostgres.Expr.dynamic_expr(
destination,
aggregate.query.filter,
destination.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, [])
)
Ecto.Query.where(destination, ^expr)
else
destination
end
2021-12-21 16:19:24 +13:00
query =
from(row in destination,
2022-08-19 06:56:36 +12:00
group_by: ^relationship.destination_attribute,
select: %{}
2021-12-21 16:19:24 +13:00
)
query_tenant = aggregate.query && aggregate.query.tenant
root_tenant = root_query.prefix
if Ash.Resource.Info.multitenancy_strategy(relationship.destination) &&
(root_tenant ||
query_tenant) do
Ecto.Query.put_query_prefix(
query,
2022-08-24 11:56:46 +12:00
query_tenant || root_tenant || AshPostgres.DataLayer.Info.schema(relationship.destination)
)
2021-12-21 16:19:24 +13:00
else
%{
query
| prefix:
2022-08-24 11:56:46 +12:00
AshPostgres.DataLayer.Info.schema(relationship.destination) ||
AshPostgres.DataLayer.Info.repo(relationship.destination).config()[:default_prefix] ||
"public"
2021-12-21 16:19:24 +13:00
}
end
end
end