2021-12-21 16:19:24 +13:00
defmodule AshPostgres.Aggregate do
@moduledoc false
2023-01-18 03:57:05 +13:00
import Ecto.Query , only : [ from : 2 , subquery : 1 ]
2021-12-21 16:19:24 +13:00
require Ecto.Query
2023-08-23 06:51:31 +12:00
@next_aggregate_names Enum . reduce ( 0 . . 999 , %{ } , fn i , acc ->
Map . put ( acc , :" aggregate_ #{ i } " , :" aggregate_ #{ i + 1 } " )
end )
2023-03-18 10:06:06 +13:00
def add_aggregates (
query ,
aggregates ,
resource ,
2023-06-06 10:32:50 +12:00
select? ,
source_binding ,
2023-03-18 10:06:06 +13:00
root_data \\ nil
)
def add_aggregates ( query , [ ] , _ , _ , _ , _ ) , do : { :ok , query }
def add_aggregates ( query , aggregates , resource , select? , source_binding , root_data ) do
2023-07-14 15:27:08 +12:00
case resource_aggregates_to_aggregates ( resource , aggregates ) do
{ :ok , aggregates } ->
query = AshPostgres.DataLayer . default_bindings ( query , resource )
2023-08-23 06:51:31 +12:00
{ query , aggregates , aggregate_name_mapping } =
Enum . reduce ( aggregates , { query , [ ] , %{ } } , fn aggregate ,
{ query , aggregates , aggregate_name_mapping } ->
if is_atom ( aggregate . name ) do
{ query , [ aggregate | aggregates ] , aggregate_name_mapping }
else
{ query , name } = use_aggregate_name ( query , aggregate . name )
{ query , [ %{ aggregate | name : name } | aggregates ] ,
Map . put ( aggregate_name_mapping , name , aggregate . name ) }
end
end )
2023-07-14 15:27:08 +12:00
aggregates =
Enum . reject ( aggregates , fn aggregate ->
Map . has_key? ( query . __ash_bindings__ . aggregate_defs , aggregate . name )
end )
query =
query
|> Map . update! ( :__ash_bindings__ , fn bindings ->
bindings
|> Map . update! ( :aggregate_defs , fn aggregate_defs ->
Map . merge ( aggregate_defs , Map . new ( aggregates , & { &1 . name , &1 } ) )
end )
end )
result =
aggregates
|> Enum . reject ( & already_added? ( &1 , query . __ash_bindings__ ) )
|> Enum . group_by ( & &1 . relationship_path )
|> Enum . flat_map ( fn { path , aggregates } ->
{ can_group , cant_group } = Enum . split_with ( aggregates , & can_group? ( resource , &1 ) )
[ { path , can_group } ] ++ Enum . map ( cant_group , & { path , [ &1 ] } )
end )
|> Enum . filter ( fn
{ _ , [ ] } ->
false
_ ->
true
end )
2023-08-17 15:02:56 +12:00
|> Enum . reduce_while (
{ :ok , query , [ ] } ,
fn { [ first_relationship | relationship_path ] , aggregates } , { :ok , query , dynamics } ->
first_relationship = Ash.Resource.Info . relationship ( resource , first_relationship )
is_single? = match? ( [ _ ] , aggregates )
cond do
is_single? &&
optimizable_first_aggregate? ( resource , Enum . at ( aggregates , 0 ) ) ->
case add_first_join_aggregate ( query , resource , hd ( aggregates ) , root_data ) do
{ :ok , query , dynamic } ->
query =
if select? do
select_or_merge ( query , hd ( aggregates ) . name , dynamic )
else
query
end
{ :cont , { :ok , query , dynamics } }
{ :error , error } ->
{ :halt , { :error , error } }
end
is_single? && Enum . at ( aggregates , 0 ) . kind == :exists ->
[ aggregate ] = aggregates
2023-11-18 01:46:43 +13:00
expr =
if is_nil ( Map . get ( aggregate . query , :filter ) ) do
true
else
Map . get ( aggregate . query , :filter )
end
2023-08-17 15:02:56 +12:00
exists =
AshPostgres.Expr . dynamic_expr (
query ,
2023-11-18 01:46:43 +13:00
% Ash.Query.Exists { path : aggregate . relationship_path , expr : expr } ,
2023-08-17 15:02:56 +12:00
query . __ash_bindings__
)
{ :cont , { :ok , query , [ { aggregate . load , aggregate . name , exists } | dynamics ] } }
true ->
with { :ok , agg_root_query } <-
AshPostgres.Join . maybe_get_resource_query (
2023-07-14 15:27:08 +12:00
first_relationship . destination ,
2023-08-17 15:02:56 +12:00
first_relationship ,
query ,
2023-11-17 11:48:39 +13:00
false ,
2023-10-26 05:59:03 +13:00
[ first_relationship . name ] ,
nil ,
nil ,
true ,
true
2023-08-17 15:02:56 +12:00
) ,
agg_root_query <-
Map . update! (
agg_root_query ,
:__ash_bindings__ ,
& Map . put ( &1 , :in_group? , true )
) ,
{ :ok , joined } <-
join_all_relationships (
agg_root_query ,
aggregates ,
relationship_path ,
first_relationship ,
is_single?
) ,
{ :ok , filtered } <-
maybe_filter_subquery (
joined ,
first_relationship ,
relationship_path ,
aggregates ,
is_single? ,
source_binding
) ,
with_subquery_select <-
select_all_aggregates (
aggregates ,
filtered ,
relationship_path ,
query ,
is_single? ,
Ash.Resource.Info . related (
first_relationship . destination ,
relationship_path
2023-11-17 11:48:39 +13:00
) ,
first_relationship
2023-08-17 15:02:56 +12:00
) ,
query <-
join_subquery (
query ,
with_subquery_select ,
first_relationship ,
relationship_path ,
aggregates ,
source_binding
) do
if select? do
new_dynamics =
Enum . map (
aggregates ,
& { &1 . load , &1 . name ,
select_dynamic ( resource , query , &1 , query . __ash_bindings__ . current - 1 ) }
)
{ :cont , { :ok , query , new_dynamics ++ dynamics } }
else
{ :cont , { :ok , query , dynamics } }
end
2023-07-13 16:13:50 +12:00
end
2023-08-17 15:02:56 +12:00
end
2023-07-13 16:13:50 +12:00
end
2023-08-17 15:02:56 +12:00
)
2023-07-13 16:13:50 +12:00
2023-07-14 15:27:08 +12:00
case result do
{ :ok , query , dynamics } ->
2023-08-23 06:51:31 +12:00
{ :ok , add_aggregate_selects ( query , dynamics , aggregate_name_mapping ) }
2023-07-13 16:13:50 +12:00
2023-07-14 15:27:08 +12:00
{ :error , error } ->
{ :error , error }
2022-01-14 08:11:30 +13:00
end
2021-12-21 16:19:24 +13:00
{ :error , error } ->
{ :error , error }
end
end
2023-08-23 06:51:31 +12:00
defp use_aggregate_name ( query , aggregate_name ) do
{ %{
query
| __ash_bindings__ : %{
query . __ash_bindings__
| current_aggregate_name :
next_aggregate_name ( query . __ash_bindings__ . current_aggregate_name ) ,
aggregate_names :
Map . put (
query . __ash_bindings__ . aggregate_names ,
aggregate_name ,
query . __ash_bindings__ . current_aggregate_name
)
}
} , query . __ash_bindings__ . current_aggregate_name }
end
2023-07-14 15:27:08 +12:00
defp resource_aggregates_to_aggregates ( resource , aggregates ) do
aggregates
|> Enum . reduce_while ( { :ok , [ ] } , fn
% Ash.Query.Aggregate { } = aggregate , { :ok , aggregates } ->
{ :cont , { :ok , [ aggregate | aggregates ] } }
aggregate , { :ok , aggregates } ->
related = Ash.Resource.Info . related ( resource , aggregate . relationship_path )
read_action =
aggregate . read_action || Ash.Resource.Info . primary_action! ( related , :read ) . name
with %{ valid? : true } = aggregate_query <- Ash.Query . for_read ( related , read_action ) ,
%{ valid? : true } = aggregate_query <-
Ash.Query . build ( aggregate_query , filter : aggregate . filter , sort : aggregate . sort ) do
Ash.Query.Aggregate . new (
resource ,
aggregate . name ,
aggregate . kind ,
path : aggregate . relationship_path ,
query : aggregate_query ,
field : aggregate . field ,
default : aggregate . default ,
filterable? : aggregate . filterable? ,
type : aggregate . type ,
constraints : aggregate . constraints ,
implementation : aggregate . implementation ,
uniq? : aggregate . uniq? ,
read_action :
aggregate . read_action ||
Ash.Resource.Info . primary_action! (
Ash.Resource.Info . related ( resource , aggregate . relationship_path ) ,
:read
) . name ,
authorize? : aggregate . authorize?
)
else
%{ valid? : false , errors : errors } ->
{ :error , errors }
{ :error , error } ->
{ :error , error }
end
|> case do
{ :ok , aggregate } ->
{ :cont , { :ok , [ aggregate | aggregates ] } }
{ :error , error } ->
{ :halt , { :error , error } }
end
end )
end
2023-03-18 10:06:06 +13:00
defp add_first_join_aggregate ( query , resource , aggregate , root_data ) do
{ resource , path } =
case root_data do
{ resource , path } ->
{ resource , path }
2023-02-09 08:46:29 +13:00
_ ->
2023-03-18 10:06:06 +13:00
{ resource , [ ] }
2023-02-09 08:46:29 +13:00
end
case AshPostgres.Join . join_all_relationships (
query ,
nil ,
[ ] ,
[
{ :left ,
AshPostgres.Join . relationship_path_to_relationships (
resource ,
2023-03-18 10:06:06 +13:00
path ++ aggregate . relationship_path
2023-02-09 08:46:29 +13:00
) }
2023-11-17 11:48:39 +13:00
] ,
[ ] ,
nil ,
false
2023-02-09 08:46:29 +13:00
) do
{ :ok , query } ->
2023-11-17 22:01:35 +13:00
ref =
% Ash.Query.Ref {
attribute :
aggregate_field (
aggregate ,
Ash.Resource.Info . related (
resource ,
path ++ aggregate . relationship_path
) ,
path ++ aggregate . relationship_path ,
query
) ,
relationship_path : path ++ aggregate . relationship_path ,
resource : resource
}
value = AshPostgres.Expr . dynamic_expr ( query , ref , query . __ash_bindings__ , false )
type = AshPostgres.Types . parameterized_type ( aggregate . type , aggregate . constraints )
with_default =
if aggregate . default_value do
if type do
Ecto.Query . dynamic ( coalesce ( ^ value , type ( ^ aggregate . default_value , ^ type ) ) )
else
Ecto.Query . dynamic ( coalesce ( ^ value , ^ aggregate . default_value ) )
end
else
value
end
2023-02-09 08:46:29 +13:00
2023-11-17 22:01:35 +13:00
casted =
if type do
Ecto.Query . dynamic ( type ( ^ with_default , ^ type ) )
else
with_default
end
{ :ok , query , casted }
2023-02-09 08:46:29 +13:00
{ :error , error } ->
{ :error , error }
end
end
2022-12-18 20:23:39 +13:00
defp already_added? ( aggregate , bindings ) do
Enum . any? ( bindings . bindings , fn
{ _ , %{ type : :aggregate , aggregates : aggregates } } ->
aggregate in aggregates
_ ->
false
end )
end
2023-01-01 03:22:12 +13:00
defp maybe_filter_subquery (
agg_query ,
first_relationship ,
relationship_path ,
2023-08-17 15:02:56 +12:00
aggregates ,
is_single? ,
2023-01-01 03:22:12 +13:00
source_binding
) do
2023-08-17 15:02:56 +12:00
Enum . reduce_while ( aggregates , { :ok , agg_query } , fn aggregate , { :ok , agg_query } ->
2022-12-08 14:32:38 +13:00
filter =
2023-08-17 15:02:56 +12:00
if aggregate . query . filter do
Ash.Filter . move_to_relationship_path (
aggregate . query . filter ,
relationship_path
)
|> Map . put ( :resource , first_relationship . destination )
else
aggregate . query . filter
end
2022-12-08 14:32:38 +13:00
used_calculations =
Ash.Filter . used_calculations (
filter ,
first_relationship . destination ,
relationship_path
)
2022-12-29 16:08:07 +13:00
related = Ash.Resource.Info . related ( first_relationship . destination , relationship_path )
used_calculations =
2023-01-01 03:22:12 +13:00
case Ash.Resource.Info . calculation ( related , aggregate . field ) do
%{ name : name , calculation : { module , opts } , type : type , constraints : constraints } ->
{ :ok , new_calc } = Ash.Query.Calculation . new ( name , module , opts , { type , constraints } )
if new_calc in used_calculations do
used_calculations
else
[
new_calc
| used_calculations
]
end
nil ->
2022-12-29 16:08:07 +13:00
used_calculations
end
2023-11-21 01:05:47 +13:00
exprs =
Enum . map ( used_calculations , fn calculation ->
case Ash.Filter . hydrate_refs (
calculation . module . expression ( calculation . opts , calculation . context ) ,
%{
resource : aggregate . query . resource ,
aggregates : %{ } ,
calculations : %{ } ,
public? : false
}
) do
{ :ok , hydrated } ->
hydrated
{ :error , _error } ->
raise """
Failed to hydrate references for resource #{inspect(aggregate.query.resource)} in #{inspect(calculation.module.expression(calculation.opts, calculation.context))}
"""
end
end )
{ :ok , agg_query } =
AshPostgres.Join . join_all_relationships (
agg_query ,
exprs ,
[ ]
)
2022-12-08 14:32:38 +13:00
used_aggregates =
2022-12-18 20:23:39 +13:00
used_aggregates (
filter ,
first_relationship . destination ,
used_calculations ,
relationship_path
)
2022-12-08 14:32:38 +13:00
2023-01-01 03:22:12 +13:00
case add_aggregates (
agg_query ,
used_aggregates ,
first_relationship . destination ,
false ,
source_binding
) do
2022-12-08 14:32:38 +13:00
{ :ok , agg_query } ->
2023-08-17 15:02:56 +12:00
if has_filter? ( aggregate . query ) && is_single? do
{ :cont ,
AshPostgres.DataLayer . filter ( agg_query , filter , agg_query . __ash_bindings__ . resource ) }
else
{ :cont , { :ok , agg_query } }
end
2022-12-08 14:32:38 +13:00
other ->
2023-08-17 15:02:56 +12:00
{ :halt , other }
2022-12-08 14:32:38 +13:00
end
2023-08-17 15:02:56 +12:00
end )
2022-12-08 14:32:38 +13:00
end
defp join_subquery (
query ,
subquery ,
%{ manual : { module , opts } } = first_relationship ,
_relationship_path ,
aggregates ,
source_binding
) do
field = first_relationship . destination_attribute
2022-12-11 09:59:50 +13:00
new_subquery =
2023-11-17 11:48:39 +13:00
from ( row in subquery , distinct : true )
new_subquery =
if Map . get ( first_relationship , :no_attributes? ) do
new_subquery
else
from ( row in new_subquery ,
group_by : field ( row , ^ field ) ,
select_merge : map ( row , ^ [ field ] )
)
end
2022-12-08 14:32:38 +13:00
{ :ok , subquery } =
module . ash_postgres_subquery (
opts ,
source_binding ,
subquery . __ash_bindings__ . current - 1 ,
2022-12-11 09:59:50 +13:00
new_subquery
2022-12-08 14:32:38 +13:00
)
subquery = AshPostgres.Join . set_join_prefix ( subquery , query , first_relationship . destination )
query =
from ( row in query ,
2023-01-18 03:57:05 +13:00
left_lateral_join : sub in subquery ( subquery_if_distinct ( subquery ) ) ,
2023-04-13 02:44:43 +12:00
as : ^ query . __ash_bindings__ . current ,
on : true
2022-12-08 14:32:38 +13:00
)
AshPostgres.DataLayer . add_binding (
query ,
%{
path : [ ] ,
type : :aggregate ,
aggregates : aggregates
}
)
end
defp join_subquery (
query ,
subquery ,
%{ type : :many_to_many , join_relationship : join_relationship , source : source } =
first_relationship ,
_relationship_path ,
aggregates ,
source_binding
) do
join_relationship_struct = Ash.Resource.Info . relationship ( source , join_relationship )
{ :ok , through } =
AshPostgres.Join . maybe_get_resource_query (
join_relationship_struct . destination ,
join_relationship_struct ,
query ,
2023-11-17 11:48:39 +13:00
false ,
2023-07-27 09:32:09 +12:00
[ join_relationship ] ,
2022-12-08 14:32:38 +13:00
nil ,
2023-10-26 05:59:03 +13:00
subquery . __ash_bindings__ . current ,
true ,
true
2022-12-08 14:32:38 +13:00
)
field = first_relationship . source_attribute_on_join_resource
subquery =
from ( sub in subquery ,
join : through in ^ through ,
as : ^ subquery . __ash_bindings__ . current ,
on :
field ( through , ^ first_relationship . destination_attribute_on_join_resource ) ==
field ( sub , ^ first_relationship . destination_attribute ) ,
select_merge : map ( through , ^ [ field ] ) ,
group_by : field ( through , ^ first_relationship . source_attribute_on_join_resource ) ,
distinct : field ( through , ^ first_relationship . source_attribute_on_join_resource ) ,
where :
field (
parent_as ( ^ source_binding ) ,
^ first_relationship . source_attribute
) ==
field (
through ,
^ first_relationship . source_attribute_on_join_resource
)
)
subquery = AshPostgres.Join . set_join_prefix ( subquery , query , first_relationship . destination )
query =
from ( row in query ,
2023-01-18 03:57:05 +13:00
left_lateral_join : agg in subquery ( subquery_if_distinct ( subquery ) ) ,
2023-04-13 02:44:43 +12:00
as : ^ query . __ash_bindings__ . current ,
on : true
2022-12-08 14:32:38 +13:00
)
AshPostgres.DataLayer . add_binding (
query ,
%{
path : [ ] ,
type : :aggregate ,
aggregates : aggregates
}
)
end
defp join_subquery (
query ,
subquery ,
first_relationship ,
_relationship_path ,
aggregates ,
source_binding
) do
field = first_relationship . destination_attribute
2023-11-17 11:48:39 +13:00
subquery =
if Map . get ( first_relationship , :no_attributes? ) do
subquery
else
from ( row in subquery ,
group_by : field ( row , ^ field ) ,
2023-11-17 12:08:12 +13:00
select_merge : map ( row , ^ [ field ] ) ,
where :
field ( parent_as ( ^ source_binding ) , ^ first_relationship . source_attribute ) ==
field ( as ( ^ 0 ) , ^ first_relationship . destination_attribute )
2023-11-17 11:48:39 +13:00
)
end
2022-12-08 14:32:38 +13:00
subquery = AshPostgres.Join . set_join_prefix ( subquery , query , first_relationship . destination )
query =
from ( row in query ,
2023-01-18 03:57:05 +13:00
left_lateral_join : agg in subquery ( subquery_if_distinct ( subquery ) ) ,
2023-04-13 02:44:43 +12:00
as : ^ query . __ash_bindings__ . current ,
on : true
2022-12-08 14:32:38 +13:00
)
AshPostgres.DataLayer . add_binding (
query ,
%{
path : [ ] ,
type : :aggregate ,
aggregates : aggregates
}
)
end
2023-08-23 06:51:31 +12:00
def next_aggregate_name ( i ) do
@next_aggregate_names [ i ] ||
raise Ash.Error.Framework.AssumptionFailed ,
message : """
All 1000 static names for aggregates have been used in a single query .
Congratulations , this means that you have gone so wildly beyond our imagination
of how much can fit into a single quer . Please file an issue and we will raise the limit .
"""
end
2023-01-18 03:57:05 +13:00
defp subquery_if_distinct ( %{ distinct : nil } = query ) , do : query
defp subquery_if_distinct ( subquery ) do
from ( row in subquery ( subquery ) ,
select : row
)
end
2023-11-17 11:48:39 +13:00
defp select_all_aggregates (
aggregates ,
joined ,
relationship_path ,
_query ,
is_single? ,
resource ,
first_relationship
) do
2022-12-08 14:32:38 +13:00
Enum . reduce ( aggregates , joined , fn aggregate , joined ->
2023-11-17 11:48:39 +13:00
add_subquery_aggregate_select (
joined ,
relationship_path ,
aggregate ,
resource ,
is_single? ,
first_relationship
)
2022-12-08 14:32:38 +13:00
end )
end
defp join_all_relationships (
agg_root_query ,
_aggregates ,
relationship_path ,
first_relationship ,
_is_single?
) do
if Enum . empty? ( relationship_path ) do
{ :ok , agg_root_query }
else
AshPostgres.Join . join_all_relationships (
agg_root_query ,
nil ,
2023-01-05 06:36:01 +13:00
[ ] ,
2022-12-08 14:32:38 +13:00
[
{ :inner ,
AshPostgres.Join . relationship_path_to_relationships (
first_relationship . destination ,
relationship_path
) }
] ,
[ ] ,
2023-11-17 11:48:39 +13:00
nil ,
false
2022-12-08 14:32:38 +13:00
)
end
end
2023-07-13 16:13:50 +12:00
defp can_group? ( _ , %{ kind : :exists } ) , do : false
2022-12-08 14:32:38 +13:00
defp can_group? ( _ , %{ kind : :list } ) , do : false
defp can_group? ( resource , aggregate ) do
2022-12-22 10:58:12 +13:00
can_group_kind? ( aggregate , resource ) && ! has_exists? ( aggregate ) &&
! references_relationships? ( aggregate )
end
2022-12-08 14:32:38 +13:00
2022-12-22 10:58:12 +13:00
# We can potentially optimize this. We don't have to prevent aggregates that reference
# relationships from joining, we can
# 1. group up the ones that do join relationships by the relationships they join
# 2. potentially group them all up that join to relationships and just join to all the relationships
# but this method is predictable and easy so we're starting by just not grouping them
defp references_relationships? ( aggregate ) do
! ! Ash.Filter . find ( aggregate . query && aggregate . query . filter , fn
% Ash.Query.Ref { relationship_path : relationship_path } when relationship_path != [ ] ->
true
2022-12-08 14:32:38 +13:00
2022-12-22 10:58:12 +13:00
_ ->
false
end )
end
2022-12-08 14:32:38 +13:00
2022-12-22 10:58:12 +13:00
defp can_group_kind? ( aggregate , resource ) do
if aggregate . kind == :first do
2023-06-12 12:33:20 +12:00
if array_type? ( resource , aggregate ) || optimizable_first_aggregate? ( resource , aggregate ) do
2023-02-09 08:46:29 +13:00
false
else
true
2022-12-08 14:32:38 +13:00
end
2022-12-22 10:58:12 +13:00
else
true
end
end
2022-12-08 14:32:38 +13:00
2023-06-12 12:33:20 +12:00
@doc false
def optimizable_first_aggregate? ( resource , %{
name : name ,
kind : :first ,
relationship_path : relationship_path
} ) do
2023-08-01 15:50:50 +12:00
name in AshPostgres.DataLayer.Info . simple_join_first_aggregates ( resource ) ||
2023-06-12 12:33:20 +12:00
single_path? ( resource , relationship_path )
end
def optimizable_first_aggregate? ( _ , _ ) , do : false
2023-02-09 08:46:29 +13:00
defp array_type? ( resource , aggregate ) do
related = Ash.Resource.Info . related ( resource , aggregate . relationship_path )
case Ash.Resource.Info . field ( related , aggregate . field ) . type do
{ :array , _ } ->
false
_ ->
true
end
end
2022-12-22 10:58:12 +13:00
defp has_exists? ( aggregate ) do
! ! Ash.Filter . find ( aggregate . query && aggregate . query . filter , fn
% Ash.Query.Exists { } -> true
_ -> false
end )
2022-12-08 14:32:38 +13:00
end
2023-08-23 06:51:31 +12:00
defp add_aggregate_selects ( query , dynamics , name_mapping ) do
2022-01-14 08:11:30 +13:00
{ in_aggregates , in_body } =
Enum . split_with ( dynamics , fn { load , _name , _dynamic } -> is_nil ( load ) end )
2022-10-08 08:50:20 +13:00
aggs =
in_body
|> Map . new ( fn { load , _ , dynamic } ->
{ load , dynamic }
end )
aggs =
if Enum . empty? ( in_aggregates ) do
aggs
2022-01-14 08:11:30 +13:00
else
2022-10-08 08:50:20 +13:00
Map . put (
aggs ,
:aggregates ,
Map . new ( in_aggregates , fn { _ , name , dynamic } ->
2023-08-23 06:51:31 +12:00
{ name_mapping [ name ] || name , dynamic }
2022-10-08 08:50:20 +13:00
end )
)
2022-01-14 08:11:30 +13:00
end
2022-10-08 08:50:20 +13:00
Ecto.Query . select_merge ( query , ^ aggs )
2022-01-14 08:11:30 +13:00
end
2022-12-08 14:32:38 +13:00
defp select_dynamic ( _resource , _query , aggregate , binding ) do
type = AshPostgres.Types . parameterized_type ( aggregate . type , aggregate . constraints )
2022-01-25 11:59:31 +13:00
2021-12-21 16:19:24 +13:00
field =
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query . dynamic (
type (
field ( as ( ^ binding ) , ^ aggregate . name ) ,
^ type
)
2021-12-21 16:19:24 +13:00
)
2022-02-17 16:04:54 +13:00
else
Ecto.Query . dynamic ( field ( as ( ^ binding ) , ^ aggregate . name ) )
end
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
coalesced =
if is_nil ( aggregate . default_value ) do
field
else
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query . dynamic (
coalesce (
^ field ,
type (
^ aggregate . default_value ,
^ type
)
2022-01-25 11:59:31 +13:00
)
2021-12-21 16:19:24 +13:00
)
2022-02-17 16:04:54 +13:00
else
Ecto.Query . dynamic (
coalesce (
^ field ,
^ aggregate . default_value
)
)
end
2022-01-25 11:59:31 +13:00
end
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query . dynamic ( type ( ^ coalesced , ^ type ) )
else
coalesced
end
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
defp has_filter? ( nil ) , do : false
defp has_filter? ( %{ filter : nil } ) , do : false
defp has_filter? ( %{ filter : % Ash.Filter { expression : nil } } ) , do : false
defp has_filter? ( %{ filter : % Ash.Filter { } } ) , do : true
defp has_filter? ( _ ) , do : false
2021-12-21 16:19:24 +13:00
2022-12-08 14:32:38 +13:00
defp has_sort? ( nil ) , do : false
defp has_sort? ( %{ sort : nil } ) , do : false
defp has_sort? ( %{ sort : [ ] } ) , do : false
defp has_sort? ( %{ sort : _ } ) , do : true
defp has_sort? ( _ ) , do : false
2021-12-21 16:19:24 +13:00
2022-12-18 20:23:39 +13:00
def used_aggregates ( filter , resource , used_calculations , path ) do
2022-02-12 10:06:51 +13:00
Ash.Filter . used_aggregates ( filter , path ) ++
2021-12-21 16:19:24 +13:00
Enum . flat_map (
used_calculations ,
fn calculation ->
case Ash.Filter . hydrate_refs (
calculation . module . expression ( calculation . opts , calculation . context ) ,
%{
2022-12-18 20:23:39 +13:00
resource : resource ,
2021-12-21 16:19:24 +13:00
aggregates : %{ } ,
calculations : %{ } ,
public? : false
}
) do
{ :ok , hydrated } ->
Ash.Filter . used_aggregates ( hydrated )
_ ->
[ ]
end
end
)
end
2022-12-08 14:32:38 +13:00
def add_subquery_aggregate_select (
query ,
relationship_path ,
%{ kind : :first } = aggregate ,
resource ,
2023-11-17 11:48:39 +13:00
is_single? ,
first_relationship
2022-12-08 14:32:38 +13:00
) do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer . default_bindings ( query , aggregate . resource )
2022-12-29 16:08:07 +13:00
ref = % Ash.Query.Ref {
2023-07-21 13:57:44 +12:00
attribute : aggregate_field ( aggregate , resource , relationship_path , query ) ,
2022-12-29 16:08:07 +13:00
relationship_path : relationship_path ,
2023-02-10 11:49:37 +13:00
resource : query . __ash_bindings__ . resource
2022-12-29 16:08:07 +13:00
}
2022-01-25 11:59:31 +13:00
2022-12-08 14:32:38 +13:00
type = AshPostgres.Types . parameterized_type ( aggregate . type , aggregate . constraints )
binding =
AshPostgres.DataLayer . get_binding (
query . __ash_bindings__ . resource ,
relationship_path ,
query ,
[ :left , :inner , :root ]
)
2021-12-21 16:19:24 +13:00
2022-12-29 16:08:07 +13:00
field = AshPostgres.Expr . dynamic_expr ( query , ref , query . __ash_bindings__ , false )
2022-12-08 14:32:38 +13:00
2023-11-17 11:48:39 +13:00
has_sort? = has_sort? ( aggregate . query )
2022-12-08 14:32:38 +13:00
sorted =
2023-11-17 11:48:39 +13:00
if has_sort? || first_relationship . sort not in [ nil , [ ] ] do
{ sort , binding } =
if has_sort? do
{ aggregate . query . sort , binding }
else
{ List . wrap ( first_relationship . sort ) , 0 }
end
{ :ok , sort_expr , query } =
2022-12-08 14:32:38 +13:00
AshPostgres.Sort . sort (
query ,
2023-11-17 11:48:39 +13:00
sort ,
2023-02-10 11:49:37 +13:00
Ash.Resource.Info . related (
query . __ash_bindings__ . resource ,
relationship_path
) ,
2022-12-08 14:32:38 +13:00
relationship_path ,
binding ,
2023-11-17 11:48:39 +13:00
:return
2022-12-08 14:32:38 +13:00
)
2022-01-25 11:59:31 +13:00
question_marks = Enum . map ( sort_expr , fn _ -> " ? " end )
2021-12-21 16:19:24 +13:00
2022-01-25 11:59:31 +13:00
{ :ok , expr } =
AshPostgres.Functions.Fragment . casted_new (
[ " array_agg(? ORDER BY #{ question_marks } ) " , field ] ++ sort_expr
)
2022-01-14 08:11:30 +13:00
2022-12-08 14:32:38 +13:00
AshPostgres.Expr . dynamic_expr ( query , expr , query . __ash_bindings__ , false )
2021-12-21 16:19:24 +13:00
else
2022-01-14 08:11:30 +13:00
Ecto.Query . dynamic (
[ row ] ,
2022-12-08 14:32:38 +13:00
fragment ( " array_agg(?) " , ^ field )
2022-01-14 08:11:30 +13:00
)
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
filtered = filter_field ( sorted , query , aggregate , relationship_path , is_single? )
2021-12-21 16:19:24 +13:00
2022-12-29 16:08:07 +13:00
value = Ecto.Query . dynamic ( fragment ( " (?)[1] " , ^ filtered ) )
2021-12-21 16:19:24 +13:00
with_default =
if aggregate . default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query . dynamic ( coalesce ( ^ value , type ( ^ aggregate . default_value , ^ type ) ) )
else
Ecto.Query . dynamic ( coalesce ( ^ value , ^ aggregate . default_value ) )
end
2021-12-21 16:19:24 +13:00
else
value
end
2022-02-17 16:04:54 +13:00
casted =
if type do
Ecto.Query . dynamic ( type ( ^ with_default , ^ type ) )
else
with_default
end
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
select_or_merge ( query , aggregate . name , casted )
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
def add_subquery_aggregate_select (
query ,
relationship_path ,
%{ kind : :list } = aggregate ,
resource ,
2023-11-17 11:48:39 +13:00
is_single? ,
first_relationship
2022-12-08 14:32:38 +13:00
) do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer . default_bindings ( query , aggregate . resource )
2022-12-08 14:32:38 +13:00
type = AshPostgres.Types . parameterized_type ( aggregate . type , aggregate . constraints )
binding =
AshPostgres.DataLayer . get_binding (
query . __ash_bindings__ . resource ,
relationship_path ,
query ,
[ :left , :inner , :root ]
)
2021-12-21 16:19:24 +13:00
2022-12-29 16:08:07 +13:00
ref = % Ash.Query.Ref {
2023-07-21 13:57:44 +12:00
attribute : aggregate_field ( aggregate , resource , relationship_path , query ) ,
2022-12-29 16:08:07 +13:00
relationship_path : relationship_path ,
2023-02-10 11:49:37 +13:00
resource : query . __ash_bindings__ . resource
2022-12-29 16:08:07 +13:00
}
field = AshPostgres.Expr . dynamic_expr ( query , ref , query . __ash_bindings__ , false )
2022-12-08 14:32:38 +13:00
2023-11-17 11:48:39 +13:00
has_sort? = has_sort? ( aggregate . query )
2022-12-08 14:32:38 +13:00
sorted =
2023-11-17 11:48:39 +13:00
if has_sort? || first_relationship . sort not in [ nil , [ ] ] do
{ sort , binding } =
if has_sort? do
{ aggregate . query . sort , binding }
else
{ List . wrap ( first_relationship . sort ) , 0 }
end
{ :ok , sort_expr , query } =
2022-12-08 14:32:38 +13:00
AshPostgres.Sort . sort (
query ,
2023-11-17 11:48:39 +13:00
sort ,
2023-02-10 11:49:37 +13:00
Ash.Resource.Info . related (
query . __ash_bindings__ . resource ,
relationship_path
) ,
2022-12-08 14:32:38 +13:00
relationship_path ,
binding ,
2023-11-17 11:48:39 +13:00
:return
2022-12-08 14:32:38 +13:00
)
2022-01-25 11:59:31 +13:00
question_marks = Enum . map ( sort_expr , fn _ -> " ? " end )
2021-12-21 16:19:24 +13:00
2023-02-10 10:09:44 +13:00
distinct =
if Map . get ( aggregate , :uniq? ) do
" DISTINCT "
else
" "
end
2022-01-25 11:59:31 +13:00
{ :ok , expr } =
AshPostgres.Functions.Fragment . casted_new (
2023-02-10 10:09:44 +13:00
[ " array_agg( #{ distinct } ? ORDER BY #{ question_marks } ) " , field ] ++ sort_expr
2022-01-25 11:59:31 +13:00
)
2022-01-14 08:11:30 +13:00
2022-12-08 14:32:38 +13:00
AshPostgres.Expr . dynamic_expr ( query , expr , query . __ash_bindings__ , false )
2021-12-21 16:19:24 +13:00
else
2023-02-10 10:09:44 +13:00
if Map . get ( aggregate , :uniq? ) do
Ecto.Query . dynamic (
[ row ] ,
fragment ( " array_agg(DISTINCT ?) " , ^ field )
)
else
Ecto.Query . dynamic (
[ row ] ,
fragment ( " array_agg(?) " , ^ field )
)
end
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
filtered = filter_field ( sorted , query , aggregate , relationship_path , is_single? )
2021-12-21 16:19:24 +13:00
with_default =
if aggregate . default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query . dynamic ( coalesce ( ^ filtered , type ( ^ aggregate . default_value , ^ type ) ) )
else
Ecto.Query . dynamic ( coalesce ( ^ filtered , ^ aggregate . default_value ) )
end
2021-12-21 16:19:24 +13:00
else
filtered
end
2022-02-17 16:04:54 +13:00
cast =
if type do
2023-02-14 07:30:47 +13:00
Ecto.Query . dynamic ( type ( ^ with_default , ^ type ) )
2022-02-17 16:04:54 +13:00
else
with_default
end
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
select_or_merge ( query , aggregate . name , cast )
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
def add_subquery_aggregate_select (
query ,
relationship_path ,
%{ kind : kind } = aggregate ,
resource ,
2023-11-17 11:48:39 +13:00
is_single? ,
_first_relationship
2022-12-08 14:32:38 +13:00
)
when kind in [ :count , :sum , :avg , :max , :min , :custom ] do
2021-12-21 16:19:24 +13:00
query = AshPostgres.DataLayer . default_bindings ( query , aggregate . resource )
2022-12-29 16:08:07 +13:00
ref = % Ash.Query.Ref {
2023-07-21 13:57:44 +12:00
attribute : aggregate_field ( aggregate , resource , relationship_path , query ) ,
2022-12-29 16:08:07 +13:00
relationship_path : relationship_path ,
resource : resource
}
field =
if kind == :custom do
# we won't use this if its custom so don't try to make one
nil
else
AshPostgres.Expr . dynamic_expr ( query , ref , query . __ash_bindings__ , false )
end
2022-12-08 14:32:38 +13:00
type = AshPostgres.Types . parameterized_type ( aggregate . type , aggregate . constraints )
binding =
AshPostgres.DataLayer . get_binding (
query . __ash_bindings__ . resource ,
relationship_path ,
query ,
[ :left , :inner , :root ]
)
2021-12-21 16:19:24 +13:00
field =
case kind do
:count ->
2023-02-10 10:09:44 +13:00
if Map . get ( aggregate , :uniq? ) do
Ecto.Query . dynamic ( [ row ] , count ( ^ field , :distinct ) )
else
2023-02-18 09:21:50 +13:00
Ecto.Query . dynamic ( [ row ] , count ( ^ field ) )
2023-02-10 10:09:44 +13:00
end
2021-12-21 16:19:24 +13:00
:sum ->
2022-12-29 16:08:07 +13:00
Ecto.Query . dynamic ( [ row ] , sum ( ^ field ) )
2021-12-21 16:19:24 +13:00
2022-12-08 14:32:38 +13:00
:avg ->
2022-12-29 16:08:07 +13:00
Ecto.Query . dynamic ( [ row ] , avg ( ^ field ) )
2021-12-21 16:19:24 +13:00
2022-12-08 14:32:38 +13:00
:max ->
2022-12-29 16:08:07 +13:00
Ecto.Query . dynamic ( [ row ] , max ( ^ field ) )
2022-12-08 14:32:38 +13:00
:min ->
2022-12-29 16:08:07 +13:00
Ecto.Query . dynamic ( [ row ] , min ( ^ field ) )
2022-12-08 14:32:38 +13:00
:custom ->
{ module , opts } = aggregate . implementation
module . dynamic ( opts , binding )
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
filtered = filter_field ( field , query , aggregate , relationship_path , is_single? )
2022-02-10 05:49:19 +13:00
with_default =
if aggregate . default_value do
2022-02-17 16:04:54 +13:00
if type do
Ecto.Query . dynamic ( coalesce ( ^ filtered , type ( ^ aggregate . default_value , ^ type ) ) )
else
Ecto.Query . dynamic ( coalesce ( ^ filtered , ^ aggregate . default_value ) )
end
2022-02-10 05:49:19 +13:00
else
filtered
end
2022-02-17 16:04:54 +13:00
cast =
if type do
Ecto.Query . dynamic ( type ( ^ with_default , ^ type ) )
else
with_default
end
2021-12-21 16:19:24 +13:00
2022-01-14 08:11:30 +13:00
select_or_merge ( query , aggregate . name , cast )
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
defp filter_field ( field , _query , _aggregate , _relationship_path , true ) do
field
2021-12-21 16:19:24 +13:00
end
2022-12-08 14:32:38 +13:00
defp filter_field ( field , query , aggregate , relationship_path , _is_single? ) do
if has_filter? ( aggregate . query ) do
filter =
Ash.Filter . move_to_relationship_path (
aggregate . query . filter ,
relationship_path
)
2021-12-21 16:19:24 +13:00
2022-12-08 14:32:38 +13:00
expr =
AshPostgres.Expr . dynamic_expr (
query ,
filter ,
query . __ash_bindings__ ,
false ,
AshPostgres.Types . parameterized_type ( aggregate . type , aggregate . constraints )
)
2021-12-21 16:19:24 +13:00
2022-12-08 14:32:38 +13:00
Ecto.Query . dynamic ( filter ( ^ field , ^ expr ) )
2021-12-21 16:19:24 +13:00
else
2022-12-08 14:32:38 +13:00
field
2021-12-21 16:19:24 +13:00
end
end
2022-12-08 14:32:38 +13:00
defp select_or_merge ( query , aggregate_name , casted ) do
query =
if query . select do
query
2022-09-29 11:01:20 +13:00
else
2022-12-08 14:32:38 +13:00
Ecto.Query . select ( query , %{ } )
2022-09-29 11:01:20 +13:00
end
2022-12-08 14:32:38 +13:00
Ecto.Query . select_merge ( query , ^ %{ aggregate_name = > casted } )
2021-12-21 16:19:24 +13:00
end
2023-02-09 08:46:29 +13:00
2023-06-12 12:33:20 +12:00
defp single_path? ( _ , [ ] ) , do : true
2023-02-09 08:46:29 +13:00
2023-06-12 12:33:20 +12:00
defp single_path? ( resource , [ relationship | rest ] ) do
2023-02-09 08:46:29 +13:00
relationship = Ash.Resource.Info . relationship ( resource , relationship )
2023-11-21 07:47:45 +13:00
( relationship . type == :belongs_to ||
has_one_with_identity? ( relationship ) ) &&
single_path? ( relationship . destination , rest )
end
defp has_one_with_identity? ( %{ type : :has_one } = relationship ) do
relationship . destination
|> Ash.Resource.Info . identities ( )
|> Enum . any? ( fn %{ keys : keys } ->
keys == [ relationship . destination_field ]
end )
2023-02-09 08:46:29 +13:00
end
2023-07-21 13:57:44 +12:00
2023-11-21 07:47:45 +13:00
defp has_one_with_identity? ( _ ) , do : false
2023-11-17 22:01:35 +13:00
@doc false
def aggregate_field ( aggregate , resource , _relationship_path , query ) do
2023-07-21 13:57:44 +12:00
case Ash.Resource.Info . field (
resource ,
aggregate . field || List . first ( Ash.Resource.Info . primary_key ( resource ) )
) do
% Ash.Resource.Calculation { calculation : { module , opts } } = calculation ->
calc_type =
AshPostgres.Types . parameterized_type (
calculation . type ,
Map . get ( calculation , :constraints , [ ] )
)
AshPostgres.Expr . validate_type! ( query , calc_type , " #{ inspect ( calculation . name ) } " )
{ :ok , query_calc } =
Ash.Query.Calculation . new (
calculation . name ,
module ,
opts ,
calculation . type ,
Map . get ( aggregate , :context , %{ } )
)
query_calc
other ->
other
end
end
2021-12-21 16:19:24 +13:00
end