ash_postgres/lib/aggregate.ex
2023-02-17 15:21:50 -05:00

956 lines
25 KiB
Elixir

defmodule AshPostgres.Aggregate do
@moduledoc false
import Ecto.Query, only: [from: 2, subquery: 1]
require Ecto.Query
def add_aggregates(query, aggregates, resource, select? \\ true, source_binding \\ nil)
def add_aggregates(query, [], _, _, _), do: {:ok, query}
def add_aggregates(query, aggregates, resource, select?, source_binding) do
query = AshPostgres.DataLayer.default_bindings(query, resource)
aggregates =
Enum.reject(aggregates, fn aggregate ->
Map.has_key?(query.__ash_bindings__.aggregate_defs, aggregate.name)
end)
query =
query
|> Map.update!(:__ash_bindings__, fn bindings ->
bindings
|> Map.update!(:aggregate_defs, fn aggregate_defs ->
Map.merge(aggregate_defs, Map.new(aggregates, &{&1.name, &1}))
end)
end)
source_binding =
source_binding ||
query.__ash_bindings__.bindings
|> Enum.reject(fn
{_, %{aggregates: _}} ->
true
_ ->
false
end)
|> Enum.map(&elem(&1, 0))
|> Enum.max()
result =
aggregates
|> Enum.reject(&already_added?(&1, query.__ash_bindings__))
|> Enum.group_by(& &1.relationship_path)
|> Enum.flat_map(fn {path, aggregates} ->
{can_group, cant_group} = Enum.split_with(aggregates, &can_group?(resource, &1))
[{path, can_group}] ++ Enum.map(cant_group, &{path, [&1]})
end)
|> Enum.filter(fn
{_, []} ->
false
_ ->
true
end)
|> Enum.reduce_while({:ok, query, []}, fn {[first_relationship | relationship_path],
aggregates},
{:ok, query, dynamics} ->
first_relationship = Ash.Resource.Info.relationship(resource, first_relationship)
is_single? = Enum.count_until(aggregates, 2) == 1
first_can_join? =
case aggregates do
[aggregate] ->
single_path?(resource, aggregate.relationship_path)
_ ->
false
end
if first_can_join? do
case add_first_join_aggregate(query, resource, hd(aggregates)) do
{:ok, query, dynamic} ->
query =
if select? do
select_or_merge(query, hd(aggregates).name, dynamic)
else
query
end
{:cont, {:ok, query, dynamics}}
{:error, error} ->
{:halt, {:error, error}}
end
else
with {:ok, agg_root_query} <-
AshPostgres.Join.maybe_get_resource_query(
first_relationship.destination,
first_relationship,
query
),
agg_root_query <-
Map.update!(agg_root_query, :__ash_bindings__, &Map.put(&1, :in_group?, true)),
{:ok, joined} <-
join_all_relationships(
agg_root_query,
aggregates,
relationship_path,
first_relationship,
is_single?
),
{:ok, filtered} <-
maybe_filter_subquery(
joined,
first_relationship,
relationship_path,
aggregates,
is_single?,
source_binding
),
with_subquery_select <-
select_all_aggregates(
aggregates,
filtered,
relationship_path,
query,
is_single?,
Ash.Resource.Info.related(first_relationship.destination, relationship_path)
),
query <-
join_subquery(
query,
with_subquery_select,
first_relationship,
relationship_path,
aggregates,
source_binding
) do
if select? do
new_dynamics =
Enum.map(
aggregates,
&{&1.load, &1.name,
select_dynamic(resource, query, &1, query.__ash_bindings__.current - 1)}
)
{:cont, {:ok, query, new_dynamics ++ dynamics}}
else
{:cont, {:ok, query, dynamics}}
end
end
end
end)
case result do
{:ok, query, dynamics} ->
{:ok, add_aggregate_selects(query, dynamics)}
{:error, error} ->
{:error, error}
end
end
defp add_first_join_aggregate(query, resource, aggregate) do
parent_path =
case query.__ash_bindings__ do
%{parent_paths: [{path, parent_resource}]} ->
AshPostgres.Join.relationship_path_to_relationships(
parent_resource,
path
)
_ ->
[]
end
case AshPostgres.Join.join_all_relationships(
query,
nil,
[],
[
{:left,
AshPostgres.Join.relationship_path_to_relationships(
resource,
aggregate.relationship_path
)}
],
parent_path,
nil
) do
{:ok, query} ->
binding =
AshPostgres.DataLayer.get_binding(
resource,
aggregate.relationship_path,
query,
[:left, :inner]
)
{:ok, query, Ecto.Query.dynamic(field(as(^binding), ^aggregate.field))}
{:error, error} ->
{:error, error}
end
end
defp already_added?(aggregate, bindings) do
Enum.any?(bindings.bindings, fn
{_, %{type: :aggregate, aggregates: aggregates}} ->
aggregate in aggregates
_ ->
false
end)
end
defp maybe_filter_subquery(
agg_query,
_first_relationship,
_relationship_path,
[_aggregate | _rest],
false,
_source_binding
) do
{:ok, agg_query}
end
defp maybe_filter_subquery(
agg_query,
first_relationship,
relationship_path,
[aggregate],
true,
source_binding
) do
apply_agg_query(
agg_query,
aggregate,
relationship_path,
first_relationship,
source_binding
)
end
defp apply_agg_query(
agg_query,
aggregate,
relationship_path,
first_relationship,
source_binding
) do
if has_filter?(aggregate.query) do
filter =
Ash.Filter.move_to_relationship_path(
aggregate.query.filter,
relationship_path
)
|> Map.put(:resource, first_relationship.destination)
used_calculations =
Ash.Filter.used_calculations(
filter,
first_relationship.destination,
relationship_path
)
related = Ash.Resource.Info.related(first_relationship.destination, relationship_path)
used_calculations =
case Ash.Resource.Info.calculation(related, aggregate.field) do
%{name: name, calculation: {module, opts}, type: type, constraints: constraints} ->
{:ok, new_calc} = Ash.Query.Calculation.new(name, module, opts, {type, constraints})
if new_calc in used_calculations do
used_calculations
else
[
new_calc
| used_calculations
]
end
nil ->
used_calculations
end
used_aggregates =
used_aggregates(
filter,
first_relationship.destination,
used_calculations,
relationship_path
)
case add_aggregates(
agg_query,
used_aggregates,
first_relationship.destination,
false,
source_binding
) do
{:ok, agg_query} ->
AshPostgres.DataLayer.filter(agg_query, filter, agg_query.__ash_bindings__.resource)
other ->
other
end
else
{:ok, agg_query}
end
end
defp join_subquery(
query,
subquery,
%{manual: {module, opts}} = first_relationship,
_relationship_path,
aggregates,
source_binding
) do
field = first_relationship.destination_attribute
new_subquery =
from(row in subquery,
select_merge: map(row, ^[field]),
group_by: field(row, ^first_relationship.destination_attribute),
distinct: true
)
{:ok, subquery} =
module.ash_postgres_subquery(
opts,
source_binding,
subquery.__ash_bindings__.current - 1,
new_subquery
)
subquery = AshPostgres.Join.set_join_prefix(subquery, query, first_relationship.destination)
query =
from(row in query,
left_lateral_join: sub in subquery(subquery_if_distinct(subquery)),
as: ^query.__ash_bindings__.current
)
AshPostgres.DataLayer.add_binding(
query,
%{
path: [],
type: :aggregate,
aggregates: aggregates
}
)
end
defp join_subquery(
query,
subquery,
%{type: :many_to_many, join_relationship: join_relationship, source: source} =
first_relationship,
_relationship_path,
aggregates,
source_binding
) do
join_relationship_struct = Ash.Resource.Info.relationship(source, join_relationship)
{:ok, through} =
AshPostgres.Join.maybe_get_resource_query(
join_relationship_struct.destination,
join_relationship_struct,
query,
[],
nil,
subquery.__ash_bindings__.current
)
field = first_relationship.source_attribute_on_join_resource
subquery =
from(sub in subquery,
join: through in ^through,
as: ^subquery.__ash_bindings__.current,
on:
field(through, ^first_relationship.destination_attribute_on_join_resource) ==
field(sub, ^first_relationship.destination_attribute),
select_merge: map(through, ^[field]),
group_by: field(through, ^first_relationship.source_attribute_on_join_resource),
distinct: field(through, ^first_relationship.source_attribute_on_join_resource),
where:
field(
parent_as(^source_binding),
^first_relationship.source_attribute
) ==
field(
through,
^first_relationship.source_attribute_on_join_resource
)
)
subquery = AshPostgres.Join.set_join_prefix(subquery, query, first_relationship.destination)
query =
from(row in query,
left_lateral_join: agg in subquery(subquery_if_distinct(subquery)),
as: ^query.__ash_bindings__.current
)
AshPostgres.DataLayer.add_binding(
query,
%{
path: [],
type: :aggregate,
aggregates: aggregates
}
)
end
defp join_subquery(
query,
subquery,
first_relationship,
_relationship_path,
aggregates,
source_binding
) do
field = first_relationship.destination_attribute
subquery =
from(row in subquery,
group_by: field(row, ^first_relationship.destination_attribute),
select_merge: map(row, ^[field]),
where:
field(parent_as(^source_binding), ^first_relationship.source_attribute) ==
field(as(^0), ^first_relationship.destination_attribute)
)
subquery = AshPostgres.Join.set_join_prefix(subquery, query, first_relationship.destination)
query =
from(row in query,
left_lateral_join: agg in subquery(subquery_if_distinct(subquery)),
as: ^query.__ash_bindings__.current
)
AshPostgres.DataLayer.add_binding(
query,
%{
path: [],
type: :aggregate,
aggregates: aggregates
}
)
end
defp subquery_if_distinct(%{distinct: nil} = query), do: query
defp subquery_if_distinct(subquery) do
from(row in subquery(subquery),
select: row
)
end
defp select_all_aggregates(aggregates, joined, relationship_path, _query, is_single?, resource) do
Enum.reduce(aggregates, joined, fn aggregate, joined ->
add_subquery_aggregate_select(joined, relationship_path, aggregate, resource, is_single?)
end)
end
defp join_all_relationships(
agg_root_query,
_aggregates,
relationship_path,
first_relationship,
_is_single?
) do
if Enum.empty?(relationship_path) do
{:ok, agg_root_query}
else
AshPostgres.Join.join_all_relationships(
agg_root_query,
nil,
[],
[
{:inner,
AshPostgres.Join.relationship_path_to_relationships(
first_relationship.destination,
relationship_path
)}
],
[],
nil
)
end
end
defp can_group?(_, %{kind: :list}), do: false
defp can_group?(resource, aggregate) do
can_group_kind?(aggregate, resource) && !has_exists?(aggregate) &&
!references_relationships?(aggregate)
end
# We can potentially optimize this. We don't have to prevent aggregates that reference
# relationships from joining, we can
# 1. group up the ones that do join relationships by the relationships they join
# 2. potentially group them all up that join to relationships and just join to all the relationships
# but this method is predictable and easy so we're starting by just not grouping them
defp references_relationships?(aggregate) do
!!Ash.Filter.find(aggregate.query && aggregate.query.filter, fn
%Ash.Query.Ref{relationship_path: relationship_path} when relationship_path != [] ->
true
_ ->
false
end)
end
defp can_group_kind?(aggregate, resource) do
if aggregate.kind == :first do
if array_type?(resource, aggregate) || single_path?(resource, aggregate.relationship_path) do
false
else
true
end
else
true
end
end
defp array_type?(resource, aggregate) do
related = Ash.Resource.Info.related(resource, aggregate.relationship_path)
case Ash.Resource.Info.field(related, aggregate.field).type do
{:array, _} ->
false
_ ->
true
end
end
defp has_exists?(aggregate) do
!!Ash.Filter.find(aggregate.query && aggregate.query.filter, fn
%Ash.Query.Exists{} -> true
_ -> false
end)
end
defp add_aggregate_selects(query, dynamics) do
{in_aggregates, in_body} =
Enum.split_with(dynamics, fn {load, _name, _dynamic} -> is_nil(load) end)
aggs =
in_body
|> Map.new(fn {load, _, dynamic} ->
{load, dynamic}
end)
aggs =
if Enum.empty?(in_aggregates) do
aggs
else
Map.put(
aggs,
:aggregates,
Map.new(in_aggregates, fn {_, name, dynamic} ->
{name, dynamic}
end)
)
end
Ecto.Query.select_merge(query, ^aggs)
end
defp select_dynamic(_resource, _query, aggregate, binding) do
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
field =
if type do
Ecto.Query.dynamic(
type(
field(as(^binding), ^aggregate.name),
^type
)
)
else
Ecto.Query.dynamic(field(as(^binding), ^aggregate.name))
end
coalesced =
if is_nil(aggregate.default_value) do
field
else
if type do
Ecto.Query.dynamic(
coalesce(
^field,
type(
^aggregate.default_value,
^type
)
)
)
else
Ecto.Query.dynamic(
coalesce(
^field,
^aggregate.default_value
)
)
end
end
if type do
Ecto.Query.dynamic(type(^coalesced, ^type))
else
coalesced
end
end
defp has_filter?(nil), do: false
defp has_filter?(%{filter: nil}), do: false
defp has_filter?(%{filter: %Ash.Filter{expression: nil}}), do: false
defp has_filter?(%{filter: %Ash.Filter{}}), do: true
defp has_filter?(_), do: false
defp has_sort?(nil), do: false
defp has_sort?(%{sort: nil}), do: false
defp has_sort?(%{sort: []}), do: false
defp has_sort?(%{sort: _}), do: true
defp has_sort?(_), do: false
def used_aggregates(filter, resource, used_calculations, path) do
Ash.Filter.used_aggregates(filter, path) ++
Enum.flat_map(
used_calculations,
fn calculation ->
case Ash.Filter.hydrate_refs(
calculation.module.expression(calculation.opts, calculation.context),
%{
resource: resource,
aggregates: %{},
calculations: %{},
public?: false
}
) do
{:ok, hydrated} ->
Ash.Filter.used_aggregates(hydrated)
_ ->
[]
end
end
)
end
def add_subquery_aggregate_select(
query,
relationship_path,
%{kind: :first} = aggregate,
resource,
is_single?
) do
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
ref = %Ash.Query.Ref{
attribute: Ash.Resource.Info.field(resource, aggregate.field),
relationship_path: relationship_path,
resource: query.__ash_bindings__.resource
}
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
binding =
AshPostgres.DataLayer.get_binding(
query.__ash_bindings__.resource,
relationship_path,
query,
[:left, :inner, :root]
)
field = AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__, false)
sorted =
if has_sort?(aggregate.query) do
{:ok, sort_expr} =
AshPostgres.Sort.sort(
query,
aggregate.query.sort,
Ash.Resource.Info.related(
query.__ash_bindings__.resource,
relationship_path
),
relationship_path,
binding,
true
)
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
{:ok, expr} =
AshPostgres.Functions.Fragment.casted_new(
["array_agg(? ORDER BY #{question_marks})", field] ++ sort_expr
)
AshPostgres.Expr.dynamic_expr(query, expr, query.__ash_bindings__, false)
else
Ecto.Query.dynamic(
[row],
fragment("array_agg(?)", ^field)
)
end
filtered = filter_field(sorted, query, aggregate, relationship_path, is_single?)
value = Ecto.Query.dynamic(fragment("(?)[1]", ^filtered))
with_default =
if aggregate.default_value do
if type do
Ecto.Query.dynamic(coalesce(^value, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^value, ^aggregate.default_value))
end
else
value
end
casted =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
select_or_merge(query, aggregate.name, casted)
end
def add_subquery_aggregate_select(
query,
relationship_path,
%{kind: :list} = aggregate,
resource,
is_single?
) do
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
binding =
AshPostgres.DataLayer.get_binding(
query.__ash_bindings__.resource,
relationship_path,
query,
[:left, :inner, :root]
)
ref = %Ash.Query.Ref{
attribute: Ash.Resource.Info.field(resource, aggregate.field),
relationship_path: relationship_path,
resource: query.__ash_bindings__.resource
}
field = AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__, false)
sorted =
if has_sort?(aggregate.query) do
{:ok, sort_expr} =
AshPostgres.Sort.sort(
query,
aggregate.query.sort,
Ash.Resource.Info.related(
query.__ash_bindings__.resource,
relationship_path
),
relationship_path,
binding,
true
)
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
distinct =
if Map.get(aggregate, :uniq?) do
"DISTINCT "
else
""
end
{:ok, expr} =
AshPostgres.Functions.Fragment.casted_new(
["array_agg(#{distinct}? ORDER BY #{question_marks})", field] ++ sort_expr
)
AshPostgres.Expr.dynamic_expr(query, expr, query.__ash_bindings__, false)
else
if Map.get(aggregate, :uniq?) do
Ecto.Query.dynamic(
[row],
fragment("array_agg(DISTINCT ?)", ^field)
)
else
Ecto.Query.dynamic(
[row],
fragment("array_agg(?)", ^field)
)
end
end
filtered = filter_field(sorted, query, aggregate, relationship_path, is_single?)
with_default =
if aggregate.default_value do
if type do
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
end
else
filtered
end
cast =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
select_or_merge(query, aggregate.name, cast)
end
def add_subquery_aggregate_select(
query,
relationship_path,
%{kind: kind} = aggregate,
resource,
is_single?
)
when kind in [:count, :sum, :avg, :max, :min, :custom] do
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
ref = %Ash.Query.Ref{
attribute:
Ash.Resource.Info.field(
resource,
aggregate.field || List.first(Ash.Resource.Info.primary_key(resource))
),
relationship_path: relationship_path,
resource: resource
}
field =
if kind == :custom do
# we won't use this if its custom so don't try to make one
nil
else
AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__, false)
end
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
binding =
AshPostgres.DataLayer.get_binding(
query.__ash_bindings__.resource,
relationship_path,
query,
[:left, :inner, :root]
)
field =
case kind do
:count ->
if Map.get(aggregate, :uniq?) do
Ecto.Query.dynamic([row], count(^field, :distinct))
else
Ecto.Query.dynamic([row], count(^field))
end
:sum ->
Ecto.Query.dynamic([row], sum(^field))
:avg ->
Ecto.Query.dynamic([row], avg(^field))
:max ->
Ecto.Query.dynamic([row], max(^field))
:min ->
Ecto.Query.dynamic([row], min(^field))
:custom ->
{module, opts} = aggregate.implementation
module.dynamic(opts, binding)
end
filtered = filter_field(field, query, aggregate, relationship_path, is_single?)
with_default =
if aggregate.default_value do
if type do
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
end
else
filtered
end
cast =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
select_or_merge(query, aggregate.name, cast)
end
defp filter_field(field, _query, _aggregate, _relationship_path, true) do
field
end
defp filter_field(field, query, aggregate, relationship_path, _is_single?) do
if has_filter?(aggregate.query) do
filter =
Ash.Filter.move_to_relationship_path(
aggregate.query.filter,
relationship_path
)
expr =
AshPostgres.Expr.dynamic_expr(
query,
filter,
query.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
)
Ecto.Query.dynamic(filter(^field, ^expr))
else
field
end
end
defp select_or_merge(query, aggregate_name, casted) do
query =
if query.select do
query
else
Ecto.Query.select(query, %{})
end
Ecto.Query.select_merge(query, ^%{aggregate_name => casted})
end
@doc false
def single_path?(_, []), do: true
def single_path?(resource, [relationship | rest]) do
relationship = Ash.Resource.Info.relationship(resource, relationship)
relationship.type == :belongs_to && single_path?(relationship.destination, rest)
end
end