ash_postgres/lib/aggregate.ex

1339 lines
37 KiB
Elixir
Raw Normal View History

2021-12-21 16:19:24 +13:00
defmodule AshPostgres.Aggregate do
@moduledoc false
import Ecto.Query, only: [from: 2, subquery: 1]
2021-12-21 16:19:24 +13:00
require Ecto.Query
@next_aggregate_names Enum.reduce(0..999, %{}, fn i, acc ->
Map.put(acc, :"aggregate_#{i}", :"aggregate_#{i + 1}")
end)
def add_aggregates(
query,
aggregates,
resource,
select?,
source_binding,
root_data \\ nil
)
def add_aggregates(query, [], _, _, _, _), do: {:ok, query}
def add_aggregates(query, aggregates, resource, select?, source_binding, root_data) do
case resource_aggregates_to_aggregates(resource, aggregates) do
{:ok, aggregates} ->
query = AshPostgres.DataLayer.default_bindings(query, resource)
{query, aggregates} =
Enum.reduce(
aggregates,
{query, []},
fn aggregate, {query, aggregates} ->
if is_atom(aggregate.name) do
{query, [aggregate | aggregates]}
else
{query, name} = use_aggregate_name(query, aggregate.name)
{query, [%{aggregate | name: name} | aggregates]}
end
end
)
aggregates =
Enum.reject(aggregates, fn aggregate ->
Map.has_key?(query.__ash_bindings__.aggregate_defs, aggregate.name)
end)
query =
query
|> Map.update!(:__ash_bindings__, fn bindings ->
bindings
|> Map.update!(:aggregate_defs, fn aggregate_defs ->
Map.merge(aggregate_defs, Map.new(aggregates, &{&1.name, &1}))
end)
end)
result =
aggregates
|> Enum.reject(&already_added?(&1, query.__ash_bindings__))
|> Enum.group_by(&{&1.relationship_path, &1.join_filters})
|> Enum.flat_map(fn {{path, join_filters}, aggregates} ->
{can_group, cant_group} = Enum.split_with(aggregates, &can_group?(resource, &1))
[{{path, join_filters}, can_group}] ++
Enum.map(cant_group, &{{path, join_filters}, [&1]})
end)
|> Enum.filter(fn
{_, []} ->
false
_ ->
true
end)
|> Enum.reduce_while(
{:ok, query, []},
fn {{[first_relationship | relationship_path], join_filters}, aggregates},
{:ok, query, dynamics} ->
first_relationship =
case Ash.Resource.Info.relationship(resource, first_relationship) do
nil ->
raise "No such relationship for #{inspect(first_relationship)} aggregates #{inspect(aggregates)}"
first_relationship ->
first_relationship
end
is_single? = match?([_], aggregates)
cond do
is_single? &&
optimizable_first_aggregate?(resource, Enum.at(aggregates, 0)) ->
case add_first_join_aggregate(
query,
resource,
hd(aggregates),
root_data,
first_relationship
) do
{:ok, query, dynamic} ->
query =
if select? do
select_or_merge(query, hd(aggregates).name, dynamic)
else
query
end
{:cont, {:ok, query, dynamics}}
{:error, error} ->
{:halt, {:error, error}}
end
is_single? && Enum.at(aggregates, 0).kind == :exists ->
[aggregate] = aggregates
expr =
if is_nil(Map.get(aggregate.query, :filter)) do
true
else
Map.get(aggregate.query, :filter)
end
{exists, acc} =
AshPostgres.Expr.dynamic_expr(
query,
%Ash.Query.Exists{path: aggregate.relationship_path, expr: expr},
query.__ash_bindings__
)
{:cont,
{:ok, AshPostgres.DataLayer.merge_expr_accumulator(query, acc),
[{aggregate.load, aggregate.name, exists} | dynamics]}}
true ->
root_data_path =
case root_data do
{_, path} ->
path
_ ->
[]
end
with {:ok, agg_root_query, acc} <-
AshPostgres.Join.maybe_get_resource_query(
first_relationship.destination,
first_relationship,
query,
false,
[first_relationship.name],
nil,
nil,
true,
true
),
{:ok, agg_root_query, acc} <-
apply_first_relationship_join_filters(
agg_root_query,
query,
acc,
first_relationship,
join_filters
),
agg_root_query <-
set_in_group(
agg_root_query,
resource
),
{:ok, joined} <-
join_all_relationships(
agg_root_query,
aggregates,
relationship_path,
first_relationship,
is_single?,
join_filters
),
{:ok, filtered} <-
maybe_filter_subquery(
joined,
first_relationship,
relationship_path,
aggregates,
is_single?,
source_binding
),
with_subquery_select <-
select_all_aggregates(
aggregates,
filtered,
relationship_path,
query,
is_single?,
Ash.Resource.Info.related(
first_relationship.destination,
relationship_path
),
first_relationship
),
query <-
join_subquery(
query,
with_subquery_select,
first_relationship,
relationship_path,
aggregates,
source_binding,
root_data_path
) do
query = AshPostgres.DataLayer.merge_expr_accumulator(query, acc)
if select? do
new_dynamics =
Enum.map(
aggregates,
&{&1.load, &1.name,
select_dynamic(resource, query, &1, query.__ash_bindings__.current - 1)}
)
{:cont, {:ok, query, new_dynamics ++ dynamics}}
else
{:cont, {:ok, query, dynamics}}
end
end
end
end
)
case result do
{:ok, query, dynamics} ->
{:ok, add_aggregate_selects(query, dynamics)}
{:error, error} ->
{:error, error}
2022-01-14 08:11:30 +13:00
end
2021-12-21 16:19:24 +13:00
{:error, error} ->
{:error, error}
end
end
defp set_in_group(%{__ash_bindings__: _} = query, _resource) do
Map.update!(
query,
:__ash_bindings__,
&Map.put(&1, :in_group?, true)
)
end
defp set_in_group(%Ecto.SubQuery{} = subquery, resource) do
subquery = from(row in subquery, [])
subquery
|> AshPostgres.DataLayer.default_bindings(resource)
|> Map.update!(
:__ash_bindings__,
&Map.put(&1, :in_group?, true)
)
end
defp apply_first_relationship_join_filters(
agg_root_query,
query,
acc,
first_relationship,
join_filters
) do
case join_filters[[first_relationship]] do
nil ->
{:ok, agg_root_query, acc}
filter ->
with {:ok, agg_root_query} <-
AshPostgres.Join.join_all_relationships(agg_root_query, filter) do
agg_root_query =
AshPostgres.Expr.set_parent_path(
agg_root_query,
query
)
{query, acc} =
AshPostgres.Join.maybe_apply_filter(
agg_root_query,
agg_root_query,
agg_root_query.__ash_bindings__,
filter,
acc
)
{:ok, query, acc}
end
end
end
defp use_aggregate_name(query, aggregate_name) do
{%{
query
| __ash_bindings__: %{
query.__ash_bindings__
| current_aggregate_name:
next_aggregate_name(query.__ash_bindings__.current_aggregate_name),
aggregate_names:
Map.put(
query.__ash_bindings__.aggregate_names,
aggregate_name,
query.__ash_bindings__.current_aggregate_name
)
}
}, query.__ash_bindings__.current_aggregate_name}
end
defp resource_aggregates_to_aggregates(resource, aggregates) do
aggregates
|> Enum.reduce_while({:ok, []}, fn
%Ash.Query.Aggregate{} = aggregate, {:ok, aggregates} ->
{:cont, {:ok, [aggregate | aggregates]}}
aggregate, {:ok, aggregates} ->
related = Ash.Resource.Info.related(resource, aggregate.relationship_path)
read_action =
aggregate.read_action || Ash.Resource.Info.primary_action!(related, :read).name
with %{valid?: true} = aggregate_query <- Ash.Query.for_read(related, read_action),
%{valid?: true} = aggregate_query <-
Ash.Query.build(aggregate_query, filter: aggregate.filter, sort: aggregate.sort) do
Ash.Query.Aggregate.new(
resource,
aggregate.name,
aggregate.kind,
path: aggregate.relationship_path,
query: aggregate_query,
field: aggregate.field,
default: aggregate.default,
filterable?: aggregate.filterable?,
type: aggregate.type,
constraints: aggregate.constraints,
implementation: aggregate.implementation,
uniq?: aggregate.uniq?,
read_action:
aggregate.read_action ||
Ash.Resource.Info.primary_action!(
Ash.Resource.Info.related(resource, aggregate.relationship_path),
:read
).name,
authorize?: aggregate.authorize?
)
else
%{valid?: false, errors: errors} ->
{:error, errors}
{:error, error} ->
{:error, error}
end
|> case do
{:ok, aggregate} ->
aggregate = Map.put(aggregate, :load, aggregate.name)
{:cont, {:ok, [aggregate | aggregates]}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
end
defp add_first_join_aggregate(query, resource, aggregate, root_data, first_relationship) do
{resource, path} =
case root_data do
{resource, path} ->
{resource, path}
_ ->
{resource, []}
end
case AshPostgres.Join.join_all_relationships(
query,
nil,
[],
[
{:left,
AshPostgres.Join.relationship_path_to_relationships(
resource,
path ++ aggregate.relationship_path
)}
],
[],
nil,
false
) do
{:ok, query} ->
ref =
aggregate_field_ref(
aggregate,
Ash.Resource.Info.related(resource, path ++ aggregate.relationship_path),
path ++ aggregate.relationship_path,
query,
first_relationship
)
{value, acc} = AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__, false)
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
with_default =
if aggregate.default_value do
if type do
Ecto.Query.dynamic(coalesce(^value, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^value, ^aggregate.default_value))
end
else
value
end
casted =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
{:ok, AshPostgres.DataLayer.merge_expr_accumulator(query, acc), casted}
{:error, error} ->
{:error, error}
end
end
defp already_added?(aggregate, bindings) do
Enum.any?(bindings.bindings, fn
{_, %{type: :aggregate, aggregates: aggregates}} ->
aggregate in aggregates
_ ->
false
end)
end
2023-01-01 03:22:12 +13:00
defp maybe_filter_subquery(
agg_query,
first_relationship,
relationship_path,
aggregates,
is_single?,
_source_binding
2023-01-01 03:22:12 +13:00
) do
Enum.reduce_while(aggregates, {:ok, agg_query}, fn aggregate, {:ok, agg_query} ->
filter =
if aggregate.query.filter do
Ash.Filter.move_to_relationship_path(
aggregate.query.filter,
relationship_path
)
|> Map.put(:resource, first_relationship.destination)
else
aggregate.query.filter
end
related = Ash.Resource.Info.related(first_relationship.destination, relationship_path)
agg_query =
case Ash.Resource.Info.field(related, aggregate.field) do
%Ash.Resource.Aggregate{} = aggregate ->
{:ok, agg_query} =
add_aggregates(agg_query, [aggregate], related, false, 0, {
first_relationship.destination,
[first_relationship.name]
})
agg_query
%Ash.Resource.Calculation{
name: name,
calculation: {module, opts},
type: type,
constraints: constraints
} ->
2023-01-01 03:22:12 +13:00
{:ok, new_calc} = Ash.Query.Calculation.new(name, module, opts, {type, constraints})
expression = module.expression(opts, aggregate.context)
expression =
Ash.Filter.build_filter_from_template(
expression,
aggregate.context[:actor],
aggregate.context,
aggregate.context
)
2023-01-01 03:22:12 +13:00
{:ok, expression} =
Ash.Filter.hydrate_refs(expression, %{
resource: related,
public?: false
})
{:ok, agg_query} =
AshPostgres.DataLayer.add_calculations(
agg_query,
[{new_calc, expression}],
agg_query.__ash_bindings__.resource,
false
)
agg_query
2023-01-01 03:22:12 +13:00
_ ->
agg_query
end
if has_filter?(aggregate.query) && is_single? do
{:cont,
AshPostgres.DataLayer.filter(agg_query, filter, agg_query.__ash_bindings__.resource)}
else
{:cont, {:ok, agg_query}}
end
end)
end
defp join_subquery(
query,
subquery,
%{manual: {module, opts}} = first_relationship,
_relationship_path,
aggregates,
source_binding,
root_data_path
) do
field = first_relationship.destination_attribute
new_subquery =
from(row in subquery, distinct: true)
new_subquery =
if Map.get(first_relationship, :no_attributes?) do
new_subquery
else
from(row in new_subquery,
group_by: field(row, ^field),
select_merge: map(row, ^[field])
)
end
{:ok, subquery} =
module.ash_postgres_subquery(
opts,
source_binding,
subquery.__ash_bindings__.current - 1,
new_subquery
)
subquery = AshPostgres.Join.set_join_prefix(subquery, query, first_relationship.destination)
query =
from(row in query,
left_lateral_join: sub in subquery(subquery_if_distinct(subquery)),
as: ^query.__ash_bindings__.current,
on: true
)
AshPostgres.DataLayer.add_binding(
query,
%{
path: root_data_path,
type: :aggregate,
aggregates: aggregates
}
)
end
defp join_subquery(
query,
subquery,
%{type: :many_to_many, join_relationship: join_relationship, source: source} =
first_relationship,
_relationship_path,
aggregates,
source_binding,
root_data_path
) do
join_relationship_struct = Ash.Resource.Info.relationship(source, join_relationship)
{:ok, through, acc} =
AshPostgres.Join.maybe_get_resource_query(
join_relationship_struct.destination,
join_relationship_struct,
query,
false,
[join_relationship],
nil,
subquery.__ash_bindings__.current,
true,
true
)
field = first_relationship.source_attribute_on_join_resource
subquery =
from(sub in subquery,
join: through in ^through,
as: ^subquery.__ash_bindings__.current,
on:
field(through, ^first_relationship.destination_attribute_on_join_resource) ==
field(sub, ^first_relationship.destination_attribute),
select_merge: map(through, ^[field]),
group_by: field(through, ^first_relationship.source_attribute_on_join_resource),
distinct: field(through, ^first_relationship.source_attribute_on_join_resource),
where:
field(
parent_as(^source_binding),
^first_relationship.source_attribute
) ==
field(
through,
^first_relationship.source_attribute_on_join_resource
)
)
subquery = AshPostgres.Join.set_join_prefix(subquery, query, first_relationship.destination)
query =
from(row in query,
left_lateral_join: agg in subquery(subquery_if_distinct(subquery)),
as: ^query.__ash_bindings__.current,
on: true
)
query
|> AshPostgres.DataLayer.add_binding(%{
path: root_data_path,
type: :aggregate,
aggregates: aggregates
})
|> AshPostgres.DataLayer.merge_expr_accumulator(acc)
end
defp join_subquery(
query,
subquery,
first_relationship,
_relationship_path,
aggregates,
source_binding,
root_data_path
) do
field = first_relationship.destination_attribute
subquery =
if Map.get(first_relationship, :no_attributes?) do
subquery
else
from(row in subquery,
group_by: field(row, ^field),
select_merge: map(row, ^[field]),
where:
field(parent_as(^source_binding), ^first_relationship.source_attribute) ==
field(as(^0), ^first_relationship.destination_attribute)
)
end
subquery = AshPostgres.Join.set_join_prefix(subquery, query, first_relationship.destination)
query =
from(row in query,
left_lateral_join: agg in subquery(subquery_if_distinct(subquery)),
as: ^query.__ash_bindings__.current,
on: true
)
AshPostgres.DataLayer.add_binding(
query,
%{
path: root_data_path,
type: :aggregate,
aggregates: aggregates
}
)
end
def next_aggregate_name(i) do
@next_aggregate_names[i] ||
raise Ash.Error.Framework.AssumptionFailed,
message: """
All 1000 static names for aggregates have been used in a single query.
Congratulations, this means that you have gone so wildly beyond our imagination
of how much can fit into a single quer. Please file an issue and we will raise the limit.
"""
end
defp subquery_if_distinct(%{distinct: nil} = query), do: query
defp subquery_if_distinct(subquery) do
from(row in subquery(subquery),
select: row
)
end
defp select_all_aggregates(
aggregates,
joined,
relationship_path,
_query,
is_single?,
resource,
first_relationship
) do
Enum.reduce(aggregates, joined, fn aggregate, joined ->
add_subquery_aggregate_select(
joined,
relationship_path,
aggregate,
resource,
is_single?,
first_relationship
)
end)
end
defp join_all_relationships(
agg_root_query,
_aggregates,
relationship_path,
first_relationship,
_is_single?,
join_filters
) do
if Enum.empty?(relationship_path) do
{:ok, agg_root_query}
else
join_filters =
Enum.reduce(join_filters, %{}, fn {key, value}, acc ->
if List.starts_with?(key, [first_relationship.name]) do
Map.put(acc, Enum.drop(key, 1), value)
else
acc
end
end)
AshPostgres.Join.join_all_relationships(
agg_root_query,
Map.values(join_filters),
2023-01-05 06:36:01 +13:00
[],
[
{:inner,
AshPostgres.Join.relationship_path_to_relationships(
first_relationship.destination,
relationship_path
)}
],
[],
nil,
false,
join_filters,
agg_root_query
)
end
end
defp can_group?(_, %{kind: :exists}), do: false
defp can_group?(_, %{kind: :list}), do: false
defp can_group?(resource, aggregate) do
can_group_kind?(aggregate, resource) && !has_exists?(aggregate) &&
!references_relationships?(aggregate)
end
# We can potentially optimize this. We don't have to prevent aggregates that reference
# relationships from joining, we can
# 1. group up the ones that do join relationships by the relationships they join
# 2. potentially group them all up that join to relationships and just join to all the relationships
# but this method is predictable and easy so we're starting by just not grouping them
defp references_relationships?(aggregate) do
!!Ash.Filter.find(aggregate.query && aggregate.query.filter, fn
%Ash.Query.Ref{relationship_path: relationship_path} when relationship_path != [] ->
true
_ ->
false
end)
end
defp can_group_kind?(aggregate, resource) do
if aggregate.kind == :first do
if array_type?(resource, aggregate) || optimizable_first_aggregate?(resource, aggregate) do
false
else
true
end
else
true
end
end
@doc false
def optimizable_first_aggregate?(resource, %{
name: name,
kind: :first,
relationship_path: relationship_path,
join_filters: join_filters
}) do
name in AshPostgres.DataLayer.Info.simple_join_first_aggregates(resource) ||
(join_filters in [nil, %{}, []] &&
single_path?(resource, relationship_path))
end
def optimizable_first_aggregate?(_, _), do: false
defp array_type?(resource, aggregate) do
related = Ash.Resource.Info.related(resource, aggregate.relationship_path)
case Ash.Resource.Info.field(related, aggregate.field).type do
{:array, _} ->
false
_ ->
true
end
end
defp has_exists?(aggregate) do
!!Ash.Filter.find(aggregate.query && aggregate.query.filter, fn
%Ash.Query.Exists{} -> true
_ -> false
end)
end
defp add_aggregate_selects(query, dynamics) do
2022-01-14 08:11:30 +13:00
{in_aggregates, in_body} =
Enum.split_with(dynamics, fn {load, _name, _dynamic} -> is_nil(load) end)
aggs =
in_body
|> Map.new(fn {load, _, dynamic} ->
{load, dynamic}
end)
aggs =
if Enum.empty?(in_aggregates) do
aggs
2022-01-14 08:11:30 +13:00
else
Map.put(
aggs,
:aggregates,
Map.new(in_aggregates, fn {_, name, dynamic} ->
{name, dynamic}
end)
)
2022-01-14 08:11:30 +13:00
end
Ecto.Query.select_merge(query, ^aggs)
2022-01-14 08:11:30 +13:00
end
defp select_dynamic(_resource, _query, aggregate, binding) do
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
2022-01-25 11:59:31 +13:00
2021-12-21 16:19:24 +13:00
field =
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(
type(
field(as(^binding), ^aggregate.name),
^type
)
2021-12-21 16:19:24 +13:00
)
2022-02-17 16:04:54 +13:00
else
Ecto.Query.dynamic(field(as(^binding), ^aggregate.name))
end
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
coalesced =
if is_nil(aggregate.default_value) do
field
else
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(
coalesce(
^field,
type(
^aggregate.default_value,
^type
)
2022-01-25 11:59:31 +13:00
)
2021-12-21 16:19:24 +13:00
)
2022-02-17 16:04:54 +13:00
else
Ecto.Query.dynamic(
coalesce(
^field,
^aggregate.default_value
)
)
end
2022-01-25 11:59:31 +13:00
end
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(type(^coalesced, ^type))
else
coalesced
end
2021-12-21 16:19:24 +13:00
end
defp has_filter?(nil), do: false
defp has_filter?(%{filter: nil}), do: false
defp has_filter?(%{filter: %Ash.Filter{expression: nil}}), do: false
defp has_filter?(%{filter: %Ash.Filter{}}), do: true
defp has_filter?(_), do: false
2021-12-21 16:19:24 +13:00
defp has_sort?(nil), do: false
defp has_sort?(%{sort: nil}), do: false
defp has_sort?(%{sort: []}), do: false
defp has_sort?(%{sort: _}), do: true
defp has_sort?(_), do: false
2021-12-21 16:19:24 +13:00
def add_subquery_aggregate_select(
query,
relationship_path,
%{kind: :first} = aggregate,
resource,
is_single?,
first_relationship
) do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
ref =
aggregate_field_ref(
aggregate,
resource,
relationship_path,
query,
first_relationship
)
2022-01-25 11:59:31 +13:00
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
binding =
AshPostgres.DataLayer.get_binding(
query.__ash_bindings__.resource,
relationship_path,
query,
[:left, :inner, :root]
)
2021-12-21 16:19:24 +13:00
{field, acc} = AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__, false)
has_sort? = has_sort?(aggregate.query)
{sorted, query} =
if has_sort? || first_relationship.sort not in [nil, []] do
{sort, binding} =
if has_sort? do
{aggregate.query.sort, binding}
else
{List.wrap(first_relationship.sort), 0}
end
{:ok, sort_expr, query} =
AshPostgres.Sort.sort(
query,
sort,
Ash.Resource.Info.related(
query.__ash_bindings__.resource,
relationship_path
),
relationship_path,
binding,
:return
)
2022-01-25 11:59:31 +13:00
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
{:ok, expr} =
AshPostgres.Functions.Fragment.casted_new(
["array_agg(? ORDER BY #{question_marks})", field] ++ sort_expr
)
2022-01-14 08:11:30 +13:00
{sort_expr, acc} =
AshPostgres.Expr.dynamic_expr(query, expr, query.__ash_bindings__, false)
query =
AshPostgres.DataLayer.merge_expr_accumulator(query, acc)
{sort_expr, query}
2021-12-21 16:19:24 +13:00
else
{Ecto.Query.dynamic(
[row],
fragment("array_agg(?)", ^field)
), query}
2021-12-21 16:19:24 +13:00
end
{query, filtered} = filter_field(sorted, query, aggregate, relationship_path, is_single?)
2021-12-21 16:19:24 +13:00
value = Ecto.Query.dynamic(fragment("(?)[1]", ^filtered))
2021-12-21 16:19:24 +13:00
with_default =
if aggregate.default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(coalesce(^value, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^value, ^aggregate.default_value))
end
2021-12-21 16:19:24 +13:00
else
value
end
2022-02-17 16:04:54 +13:00
casted =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
2021-12-21 16:19:24 +13:00
select_or_merge(
AshPostgres.DataLayer.merge_expr_accumulator(query, acc),
aggregate.name,
casted
)
2021-12-21 16:19:24 +13:00
end
def add_subquery_aggregate_select(
query,
relationship_path,
%{kind: :list} = aggregate,
resource,
is_single?,
first_relationship
) do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
binding =
AshPostgres.DataLayer.get_binding(
query.__ash_bindings__.resource,
relationship_path,
query,
[:left, :inner, :root]
)
2021-12-21 16:19:24 +13:00
ref =
aggregate_field_ref(
aggregate,
resource,
relationship_path,
query,
first_relationship
)
{field, acc} = AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__, false)
has_sort? = has_sort?(aggregate.query)
{sorted, query} =
if has_sort? || first_relationship.sort not in [nil, []] do
{sort, binding} =
if has_sort? do
{aggregate.query.sort, binding}
else
{List.wrap(first_relationship.sort), 0}
end
{:ok, sort_expr, query} =
AshPostgres.Sort.sort(
query,
sort,
Ash.Resource.Info.related(
query.__ash_bindings__.resource,
relationship_path
),
relationship_path,
binding,
:return
)
2022-01-25 11:59:31 +13:00
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
2021-12-21 16:19:24 +13:00
distinct =
if Map.get(aggregate, :uniq?) do
"DISTINCT "
else
""
end
2022-01-25 11:59:31 +13:00
{:ok, expr} =
AshPostgres.Functions.Fragment.casted_new(
["array_agg(#{distinct}? ORDER BY #{question_marks})", field] ++ sort_expr
2022-01-25 11:59:31 +13:00
)
2022-01-14 08:11:30 +13:00
{expr, acc} =
AshPostgres.Expr.dynamic_expr(query, expr, query.__ash_bindings__, false)
query =
AshPostgres.DataLayer.merge_expr_accumulator(query, acc)
{expr, query}
2021-12-21 16:19:24 +13:00
else
if Map.get(aggregate, :uniq?) do
{Ecto.Query.dynamic(
[row],
fragment("array_agg(DISTINCT ?)", ^field)
), query}
else
{Ecto.Query.dynamic(
[row],
fragment("array_agg(?)", ^field)
), query}
end
2021-12-21 16:19:24 +13:00
end
{query, filtered} = filter_field(sorted, query, aggregate, relationship_path, is_single?)
2021-12-21 16:19:24 +13:00
with_default =
if aggregate.default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
end
2021-12-21 16:19:24 +13:00
else
filtered
end
2022-02-17 16:04:54 +13:00
cast =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
2022-02-17 16:04:54 +13:00
else
with_default
end
2021-12-21 16:19:24 +13:00
select_or_merge(
AshPostgres.DataLayer.merge_expr_accumulator(query, acc),
aggregate.name,
cast
)
2021-12-21 16:19:24 +13:00
end
def add_subquery_aggregate_select(
query,
relationship_path,
%{kind: kind} = aggregate,
resource,
is_single?,
first_relationship
)
when kind in [:count, :sum, :avg, :max, :min, :custom] do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
ref =
aggregate_field_ref(
aggregate,
resource,
relationship_path,
query,
first_relationship
)
{field, query} =
if kind == :custom do
# we won't use this if its custom so don't try to make one
{nil, query}
else
{expr, acc} = AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__, false)
{expr, AshPostgres.DataLayer.merge_expr_accumulator(query, acc)}
end
type = AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
binding =
AshPostgres.DataLayer.get_binding(
query.__ash_bindings__.resource,
relationship_path,
query,
[:left, :inner, :root]
)
2021-12-21 16:19:24 +13:00
field =
case kind do
:count ->
if Map.get(aggregate, :uniq?) do
Ecto.Query.dynamic([row], count(^field, :distinct))
else
Ecto.Query.dynamic([row], count(^field))
end
2021-12-21 16:19:24 +13:00
:sum ->
Ecto.Query.dynamic([row], sum(^field))
2021-12-21 16:19:24 +13:00
:avg ->
Ecto.Query.dynamic([row], avg(^field))
2021-12-21 16:19:24 +13:00
:max ->
Ecto.Query.dynamic([row], max(^field))
:min ->
Ecto.Query.dynamic([row], min(^field))
:custom ->
{module, opts} = aggregate.implementation
module.dynamic(opts, binding)
2021-12-21 16:19:24 +13:00
end
{query, filtered} = filter_field(field, query, aggregate, relationship_path, is_single?)
with_default =
if aggregate.default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
else
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
end
else
filtered
end
2022-02-17 16:04:54 +13:00
cast =
if type do
Ecto.Query.dynamic(type(^with_default, ^type))
else
with_default
end
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
select_or_merge(query, aggregate.name, cast)
2021-12-21 16:19:24 +13:00
end
defp filter_field(field, query, _aggregate, _relationship_path, true) do
{query, field}
2021-12-21 16:19:24 +13:00
end
defp filter_field(field, query, aggregate, relationship_path, _is_single?) do
if has_filter?(aggregate.query) do
filter =
Ash.Filter.move_to_relationship_path(
aggregate.query.filter,
relationship_path
)
2021-12-21 16:19:24 +13:00
used_aggregates = Ash.Filter.used_aggregates(filter, [])
{:ok, query} =
AshPostgres.Join.join_all_relationships(query, filter)
{:ok, query} =
AshPostgres.Aggregate.add_aggregates(
query,
used_aggregates,
query.__ash_bindings__.resource,
false,
0
)
{expr, acc} =
AshPostgres.Expr.dynamic_expr(
query,
filter,
query.__ash_bindings__,
false,
AshPostgres.Types.parameterized_type(aggregate.type, aggregate.constraints)
)
2021-12-21 16:19:24 +13:00
{AshPostgres.DataLayer.merge_expr_accumulator(query, acc),
Ecto.Query.dynamic(filter(^field, ^expr))}
2021-12-21 16:19:24 +13:00
else
{query, field}
2021-12-21 16:19:24 +13:00
end
end
defp select_or_merge(query, aggregate_name, casted) do
query =
if query.select do
query
else
Ecto.Query.select(query, %{})
end
Ecto.Query.select_merge(query, ^%{aggregate_name => casted})
2021-12-21 16:19:24 +13:00
end
def aggregate_field_ref(aggregate, resource, relationship_path, query, first_relationship) do
%Ash.Query.Ref{
attribute: aggregate_field(aggregate, resource, relationship_path, query),
relationship_path: relationship_path,
resource: query.__ash_bindings__.resource
}
|> case do
%{attribute: %Ash.Resource.Aggregate{}} = ref ->
%{ref | relationship_path: [first_relationship.name | ref.relationship_path]}
other ->
other
end
end
defp single_path?(_, []), do: true
defp single_path?(resource, [relationship | rest]) do
relationship = Ash.Resource.Info.relationship(resource, relationship)
(relationship.type == :belongs_to ||
has_one_with_identity?(relationship)) &&
single_path?(relationship.destination, rest)
end
defp has_one_with_identity?(%{type: :has_one} = relationship) do
relationship.destination
|> Ash.Resource.Info.identities()
|> Enum.any?(fn %{keys: keys} ->
2023-11-22 03:44:10 +13:00
keys == [relationship.destination_attribute]
end)
end
defp has_one_with_identity?(_), do: false
@doc false
def aggregate_field(aggregate, resource, _relationship_path, query) do
case Ash.Resource.Info.field(
resource,
aggregate.field || List.first(Ash.Resource.Info.primary_key(resource))
) do
%Ash.Resource.Calculation{calculation: {module, opts}} = calculation ->
calc_type =
AshPostgres.Types.parameterized_type(
calculation.type,
Map.get(calculation, :constraints, [])
)
AshPostgres.Expr.validate_type!(query, calc_type, "#{inspect(calculation.name)}")
{:ok, query_calc} =
Ash.Query.Calculation.new(
calculation.name,
module,
opts,
calculation.type,
Map.get(aggregate, :context, %{})
)
query_calc
other ->
other
end
end
2021-12-21 16:19:24 +13:00
end