2021-12-21 16:19:24 +13:00
|
|
|
defmodule AshPostgres.Aggregate do
|
|
|
|
@moduledoc false
|
|
|
|
|
|
|
|
import Ecto.Query, only: [from: 2, subquery: 1]
|
|
|
|
require Ecto.Query
|
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
def add_aggregates(query, aggregates, resource, select? \\ true)
|
|
|
|
def add_aggregates(query, [], _, _), do: {:ok, query}
|
2022-01-14 08:11:30 +13:00
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
def add_aggregates(query, aggregates, resource, select?) do
|
2022-01-14 08:11:30 +13:00
|
|
|
resource = resource
|
2021-12-21 16:19:24 +13:00
|
|
|
query = AshPostgres.DataLayer.default_bindings(query, resource)
|
|
|
|
|
2022-01-14 08:11:30 +13:00
|
|
|
result =
|
|
|
|
Enum.reduce_while(aggregates, {:ok, query, []}, fn aggregate, {:ok, query, dynamics} ->
|
2022-02-10 05:49:19 +13:00
|
|
|
if aggregate.query && !aggregate.query.valid? do
|
|
|
|
{:halt, {:error, aggregate.query.errors}}
|
|
|
|
else
|
2022-09-29 11:01:20 +13:00
|
|
|
has_exists? =
|
|
|
|
Ash.Filter.find(aggregate.query && aggregate.query.filter, fn
|
|
|
|
%Ash.Query.Exists{} -> true
|
|
|
|
_ -> false
|
|
|
|
end)
|
|
|
|
|
|
|
|
name_match =
|
|
|
|
if has_exists? do
|
|
|
|
aggregate.name
|
|
|
|
end
|
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
query_and_binding =
|
|
|
|
case AshPostgres.DataLayer.get_binding(
|
|
|
|
resource,
|
|
|
|
aggregate.relationship_path,
|
|
|
|
query,
|
2022-09-29 11:01:20 +13:00
|
|
|
:aggregate,
|
|
|
|
name_match
|
2022-02-10 05:49:19 +13:00
|
|
|
) do
|
|
|
|
nil ->
|
|
|
|
relationship =
|
|
|
|
Ash.Resource.Info.relationship(resource, aggregate.relationship_path)
|
|
|
|
|
|
|
|
if relationship.type == :many_to_many do
|
2022-09-29 11:01:20 +13:00
|
|
|
subquery = aggregate_subquery(relationship, aggregate, query, has_exists?)
|
2022-02-10 05:49:19 +13:00
|
|
|
|
|
|
|
case AshPostgres.Join.join_all_relationships(
|
|
|
|
query,
|
|
|
|
nil,
|
|
|
|
[
|
2022-09-29 11:01:20 +13:00
|
|
|
{{:aggregate, aggregate.name, subquery, has_exists?},
|
2022-02-10 05:49:19 +13:00
|
|
|
AshPostgres.Join.relationship_path_to_relationships(
|
|
|
|
resource,
|
|
|
|
aggregate.relationship_path
|
|
|
|
)}
|
2022-06-30 07:08:49 +12:00
|
|
|
]
|
2022-02-10 05:49:19 +13:00
|
|
|
) do
|
|
|
|
{:ok, new_query} ->
|
|
|
|
{:ok,
|
|
|
|
{new_query,
|
|
|
|
AshPostgres.DataLayer.get_binding(
|
|
|
|
resource,
|
|
|
|
aggregate.relationship_path,
|
|
|
|
new_query,
|
2022-09-29 11:01:20 +13:00
|
|
|
:aggregate,
|
|
|
|
name_match
|
2022-02-10 05:49:19 +13:00
|
|
|
)}}
|
|
|
|
|
|
|
|
{:error, error} ->
|
|
|
|
{:error, error}
|
|
|
|
end
|
|
|
|
else
|
2022-09-29 11:01:20 +13:00
|
|
|
subquery = aggregate_subquery(relationship, aggregate, query, has_exists?)
|
2022-02-10 05:49:19 +13:00
|
|
|
|
|
|
|
case AshPostgres.Join.join_all_relationships(
|
|
|
|
query,
|
|
|
|
nil,
|
|
|
|
[
|
2022-09-29 11:01:20 +13:00
|
|
|
{{:aggregate, aggregate.name, subquery, has_exists?},
|
2022-02-10 05:49:19 +13:00
|
|
|
AshPostgres.Join.relationship_path_to_relationships(
|
|
|
|
resource,
|
|
|
|
aggregate.relationship_path
|
|
|
|
)}
|
2022-06-30 07:08:49 +12:00
|
|
|
]
|
2022-02-10 05:49:19 +13:00
|
|
|
) do
|
|
|
|
{:ok, new_query} ->
|
|
|
|
{:ok,
|
|
|
|
{new_query,
|
|
|
|
AshPostgres.DataLayer.get_binding(
|
|
|
|
resource,
|
|
|
|
aggregate.relationship_path,
|
|
|
|
new_query,
|
2022-09-29 11:01:20 +13:00
|
|
|
:aggregate,
|
|
|
|
name_match
|
2022-02-10 05:49:19 +13:00
|
|
|
)}}
|
|
|
|
|
|
|
|
{:error, error} ->
|
|
|
|
{:error, error}
|
|
|
|
end
|
2022-01-14 08:11:30 +13:00
|
|
|
end
|
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
binding ->
|
|
|
|
{:ok, {query, binding}}
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
case query_and_binding do
|
|
|
|
{:ok, {query, binding}} ->
|
|
|
|
query_with_aggregate_binding =
|
|
|
|
put_in(
|
|
|
|
query.__ash_bindings__.aggregates,
|
|
|
|
Map.put(query.__ash_bindings__.aggregates, aggregate.name, binding)
|
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
query_with_aggregate_defs =
|
|
|
|
put_in(
|
2022-01-14 08:11:30 +13:00
|
|
|
query_with_aggregate_binding.__ash_bindings__.aggregate_defs,
|
2022-02-10 05:49:19 +13:00
|
|
|
Map.put(
|
|
|
|
query_with_aggregate_binding.__ash_bindings__.aggregate_defs,
|
|
|
|
aggregate.name,
|
|
|
|
aggregate
|
|
|
|
)
|
2022-01-14 08:11:30 +13:00
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
new_query =
|
|
|
|
query_with_aggregate_defs
|
2022-09-29 11:01:20 +13:00
|
|
|
|> add_aggregate_to_subquery(resource, aggregate, binding, has_exists?)
|
2022-01-14 08:11:30 +13:00
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
if select? do
|
2022-09-29 11:01:20 +13:00
|
|
|
dynamic = select_dynamic(resource, query, aggregate, name_match)
|
2022-02-10 05:49:19 +13:00
|
|
|
{:cont, {:ok, new_query, [{aggregate.load, aggregate.name, dynamic} | dynamics]}}
|
|
|
|
else
|
|
|
|
{:cont, {:ok, new_query, dynamics}}
|
|
|
|
end
|
2022-01-14 08:11:30 +13:00
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
{:error, error} ->
|
|
|
|
{:halt, {:error, error}}
|
|
|
|
end
|
2022-01-14 08:11:30 +13:00
|
|
|
end
|
|
|
|
end)
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-14 08:11:30 +13:00
|
|
|
case result do
|
|
|
|
{:ok, query, dynamics} ->
|
|
|
|
{:ok, add_aggregate_selects(query, dynamics)}
|
2021-12-21 16:19:24 +13:00
|
|
|
|
|
|
|
{:error, error} ->
|
|
|
|
{:error, error}
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2022-01-14 08:11:30 +13:00
|
|
|
defp add_aggregate_selects(query, dynamics) do
|
|
|
|
{in_aggregates, in_body} =
|
|
|
|
Enum.split_with(dynamics, fn {load, _name, _dynamic} -> is_nil(load) end)
|
|
|
|
|
2022-10-08 08:50:20 +13:00
|
|
|
aggs =
|
|
|
|
in_body
|
|
|
|
|> Map.new(fn {load, _, dynamic} ->
|
|
|
|
{load, dynamic}
|
|
|
|
end)
|
|
|
|
|
|
|
|
aggs =
|
|
|
|
if Enum.empty?(in_aggregates) do
|
|
|
|
aggs
|
2022-01-14 08:11:30 +13:00
|
|
|
else
|
2022-10-08 08:50:20 +13:00
|
|
|
Map.put(
|
|
|
|
aggs,
|
|
|
|
:aggregates,
|
|
|
|
Map.new(in_aggregates, fn {_, name, dynamic} ->
|
|
|
|
{name, dynamic}
|
|
|
|
end)
|
|
|
|
)
|
2022-01-14 08:11:30 +13:00
|
|
|
end
|
|
|
|
|
2022-10-08 08:50:20 +13:00
|
|
|
Ecto.Query.select_merge(query, ^aggs)
|
2022-01-14 08:11:30 +13:00
|
|
|
end
|
|
|
|
|
2022-09-14 08:27:39 +12:00
|
|
|
def agg_subquery_for_lateral_join(
|
|
|
|
current_binding,
|
|
|
|
query,
|
|
|
|
subquery,
|
|
|
|
%{
|
|
|
|
manual: {module, opts}
|
|
|
|
} = relationship
|
|
|
|
) do
|
|
|
|
case module.ash_postgres_subquery(
|
|
|
|
opts,
|
|
|
|
current_binding,
|
|
|
|
0,
|
|
|
|
subquery
|
|
|
|
) do
|
|
|
|
{:ok, inner_sub} ->
|
|
|
|
{:ok,
|
|
|
|
from(sub in subquery(inner_sub), [])
|
|
|
|
|> AshPostgres.Join.set_join_prefix(query, relationship.destination)}
|
|
|
|
|
|
|
|
other ->
|
|
|
|
other
|
|
|
|
end
|
|
|
|
rescue
|
|
|
|
e in UndefinedFunctionError ->
|
|
|
|
if e.function == :ash_postgres_subquery do
|
|
|
|
reraise """
|
|
|
|
Cannot join to a manual relationship #{inspect(module)} that does not implement the `AshPostgres.ManualRelationship` behaviour.
|
|
|
|
""",
|
|
|
|
__STACKTRACE__
|
|
|
|
else
|
|
|
|
reraise e, __STACKTRACE__
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2021-12-21 16:19:24 +13:00
|
|
|
def agg_subquery_for_lateral_join(current_binding, query, subquery, relationship) do
|
|
|
|
{dest_binding, dest_field} =
|
|
|
|
case relationship.type do
|
|
|
|
:many_to_many ->
|
2022-08-19 06:56:36 +12:00
|
|
|
{1, relationship.source_attribute_on_join_resource}
|
2021-12-21 16:19:24 +13:00
|
|
|
|
|
|
|
_ ->
|
2022-08-19 06:56:36 +12:00
|
|
|
{0, relationship.destination_attribute}
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
|
|
|
inner_sub =
|
2022-08-19 06:56:36 +12:00
|
|
|
if Map.get(relationship, :no_attributes?) do
|
|
|
|
subquery
|
|
|
|
else
|
|
|
|
from(destination in subquery,
|
|
|
|
where:
|
|
|
|
field(as(^dest_binding), ^dest_field) ==
|
|
|
|
field(parent_as(^current_binding), ^relationship.source_attribute)
|
|
|
|
)
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-09-14 08:27:39 +12:00
|
|
|
{:ok,
|
|
|
|
from(sub in subquery(inner_sub), [])
|
|
|
|
|> AshPostgres.Join.set_join_prefix(query, relationship.destination)}
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
defp select_dynamic(resource, query, aggregate, name_match) do
|
2021-12-21 16:19:24 +13:00
|
|
|
binding =
|
2022-09-29 11:01:20 +13:00
|
|
|
AshPostgres.DataLayer.get_binding(
|
|
|
|
resource,
|
|
|
|
aggregate.relationship_path,
|
|
|
|
query,
|
|
|
|
:aggregate,
|
|
|
|
name_match
|
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-25 11:59:31 +13:00
|
|
|
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
|
|
|
|
|
2021-12-21 16:19:24 +13:00
|
|
|
field =
|
2022-02-17 16:04:54 +13:00
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(
|
|
|
|
type(
|
|
|
|
field(as(^binding), ^aggregate.name),
|
|
|
|
^type
|
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
)
|
2022-02-17 16:04:54 +13:00
|
|
|
else
|
|
|
|
Ecto.Query.dynamic(field(as(^binding), ^aggregate.name))
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-25 11:59:31 +13:00
|
|
|
coalesced =
|
|
|
|
if is_nil(aggregate.default_value) do
|
|
|
|
field
|
|
|
|
else
|
2022-02-17 16:04:54 +13:00
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(
|
|
|
|
coalesce(
|
|
|
|
^field,
|
|
|
|
type(
|
|
|
|
^aggregate.default_value,
|
|
|
|
^type
|
|
|
|
)
|
2022-01-25 11:59:31 +13:00
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
)
|
2022-02-17 16:04:54 +13:00
|
|
|
else
|
|
|
|
Ecto.Query.dynamic(
|
|
|
|
coalesce(
|
|
|
|
^field,
|
|
|
|
^aggregate.default_value
|
|
|
|
)
|
|
|
|
)
|
|
|
|
end
|
2022-01-25 11:59:31 +13:00
|
|
|
end
|
|
|
|
|
2022-02-17 16:04:54 +13:00
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(type(^coalesced, ^type))
|
|
|
|
else
|
|
|
|
coalesced
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
defp add_aggregate_to_subquery(query, resource, aggregate, binding, has_exists?) do
|
2021-12-21 16:19:24 +13:00
|
|
|
new_joins =
|
|
|
|
List.update_at(query.joins, binding - 1, fn join ->
|
|
|
|
aggregate_query =
|
|
|
|
if aggregate.authorization_filter do
|
2022-02-10 05:49:19 +13:00
|
|
|
{:ok, filtered} =
|
2021-12-21 16:19:24 +13:00
|
|
|
AshPostgres.DataLayer.filter(
|
|
|
|
join.source.from.source.query,
|
|
|
|
aggregate.authorization_filter,
|
|
|
|
Ash.Resource.Info.related(resource, aggregate.relationship_path)
|
|
|
|
)
|
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
filtered
|
2021-12-21 16:19:24 +13:00
|
|
|
else
|
|
|
|
join.source.from.source.query
|
|
|
|
end
|
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
{:ok, aggregate_query} =
|
|
|
|
AshPostgres.Aggregate.add_aggregates(
|
|
|
|
aggregate_query,
|
|
|
|
Map.values(aggregate.query.aggregates || %{}),
|
|
|
|
Ash.Resource.Info.related(resource, aggregate.relationship_path),
|
|
|
|
false
|
|
|
|
)
|
|
|
|
|
2022-02-12 10:06:51 +13:00
|
|
|
{:ok, aggregate_query} =
|
|
|
|
if aggregate.query && aggregate.query.filter do
|
|
|
|
AshPostgres.Join.join_all_relationships(
|
|
|
|
aggregate_query,
|
2022-06-30 07:08:49 +12:00
|
|
|
aggregate.query.filter
|
2022-02-12 10:06:51 +13:00
|
|
|
)
|
|
|
|
else
|
|
|
|
{:ok, aggregate_query}
|
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
new_aggregate_query =
|
|
|
|
add_subquery_aggregate_select(aggregate_query, aggregate, resource, has_exists?)
|
2021-12-21 16:19:24 +13:00
|
|
|
|
|
|
|
put_in(join.source.from.source.query, new_aggregate_query)
|
|
|
|
end)
|
|
|
|
|
|
|
|
%{
|
|
|
|
query
|
|
|
|
| joins: new_joins
|
|
|
|
}
|
|
|
|
end
|
|
|
|
|
|
|
|
def used_aggregates(filter, relationship, used_calculations, path) do
|
2022-02-12 10:06:51 +13:00
|
|
|
Ash.Filter.used_aggregates(filter, path) ++
|
2021-12-21 16:19:24 +13:00
|
|
|
Enum.flat_map(
|
|
|
|
used_calculations,
|
|
|
|
fn calculation ->
|
|
|
|
case Ash.Filter.hydrate_refs(
|
|
|
|
calculation.module.expression(calculation.opts, calculation.context),
|
|
|
|
%{
|
|
|
|
resource: relationship.destination,
|
|
|
|
aggregates: %{},
|
|
|
|
calculations: %{},
|
|
|
|
public?: false
|
|
|
|
}
|
|
|
|
) do
|
|
|
|
{:ok, hydrated} ->
|
|
|
|
Ash.Filter.used_aggregates(hydrated)
|
|
|
|
|
|
|
|
_ ->
|
|
|
|
[]
|
|
|
|
end
|
|
|
|
end
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
def add_subquery_aggregate_select(query, %{kind: :first} = aggregate, _resource, has_exists?) do
|
2021-12-21 16:19:24 +13:00
|
|
|
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
|
|
|
|
key = aggregate.field
|
2022-01-25 11:59:31 +13:00
|
|
|
|
2021-12-21 16:19:24 +13:00
|
|
|
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
|
|
|
|
|
|
|
|
field =
|
|
|
|
if aggregate.query && aggregate.query.sort && aggregate.query.sort != [] do
|
2022-01-25 11:59:31 +13:00
|
|
|
sort_expr = AshPostgres.Sort.order_to_ecto(aggregate.query.sort)
|
|
|
|
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
|
2022-10-11 05:06:54 +13:00
|
|
|
field = Ecto.Query.dynamic([{^0, row}], type(field(row, ^key), ^type))
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-25 11:59:31 +13:00
|
|
|
{:ok, expr} =
|
|
|
|
AshPostgres.Functions.Fragment.casted_new(
|
|
|
|
["array_agg(? ORDER BY #{question_marks})", field] ++ sort_expr
|
|
|
|
)
|
2022-01-14 08:11:30 +13:00
|
|
|
|
2022-05-21 05:22:32 +12:00
|
|
|
AshPostgres.Expr.dynamic_expr(
|
|
|
|
query,
|
|
|
|
expr,
|
|
|
|
query.__ash_bindings__,
|
2022-10-11 05:06:54 +13:00
|
|
|
false
|
2022-05-21 05:22:32 +12:00
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
else
|
2022-01-14 08:11:30 +13:00
|
|
|
Ecto.Query.dynamic(
|
|
|
|
[row],
|
|
|
|
fragment("array_agg(?)", field(row, ^key))
|
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
|
|
|
filtered =
|
2022-09-29 11:01:20 +13:00
|
|
|
if !has_exists? && aggregate.query && aggregate.query.filter &&
|
2021-12-21 16:19:24 +13:00
|
|
|
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
|
|
|
|
expr =
|
|
|
|
AshPostgres.Expr.dynamic_expr(
|
2022-01-25 11:59:31 +13:00
|
|
|
query,
|
2021-12-21 16:19:24 +13:00
|
|
|
aggregate.query.filter,
|
2022-10-11 05:06:54 +13:00
|
|
|
query.__ash_bindings__
|
2021-12-21 16:19:24 +13:00
|
|
|
)
|
|
|
|
|
|
|
|
Ecto.Query.dynamic(filter(^field, ^expr))
|
|
|
|
else
|
|
|
|
field
|
|
|
|
end
|
|
|
|
|
|
|
|
value = Ecto.Query.dynamic(fragment("(?)[1]", ^filtered))
|
|
|
|
|
|
|
|
with_default =
|
|
|
|
if aggregate.default_value do
|
2022-02-17 16:04:54 +13:00
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(coalesce(^value, type(^aggregate.default_value, ^type)))
|
|
|
|
else
|
|
|
|
Ecto.Query.dynamic(coalesce(^value, ^aggregate.default_value))
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
else
|
|
|
|
value
|
|
|
|
end
|
|
|
|
|
2022-02-17 16:04:54 +13:00
|
|
|
casted =
|
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(type(^with_default, ^type))
|
|
|
|
else
|
|
|
|
with_default
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-14 08:11:30 +13:00
|
|
|
select_or_merge(query, aggregate.name, casted)
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
def add_subquery_aggregate_select(query, %{kind: :list} = aggregate, _resource, has_exists?) do
|
2021-12-21 16:19:24 +13:00
|
|
|
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
|
|
|
|
key = aggregate.field
|
|
|
|
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
|
|
|
|
|
|
|
|
field =
|
|
|
|
if aggregate.query && aggregate.query.sort && aggregate.query.sort != [] do
|
2022-01-25 11:59:31 +13:00
|
|
|
sort_expr = AshPostgres.Sort.order_to_ecto(aggregate.query.sort)
|
|
|
|
question_marks = Enum.map(sort_expr, fn _ -> " ? " end)
|
2022-10-11 05:06:54 +13:00
|
|
|
{:array, item_type} = aggregate.type
|
|
|
|
item_type = AshPostgres.Types.parameterized_type(item_type, [])
|
|
|
|
field = Ecto.Query.dynamic([row], type(field(row, ^key), ^item_type))
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-25 11:59:31 +13:00
|
|
|
{:ok, expr} =
|
|
|
|
AshPostgres.Functions.Fragment.casted_new(
|
|
|
|
["array_agg(? ORDER BY #{question_marks})", field] ++ sort_expr
|
|
|
|
)
|
2022-01-14 08:11:30 +13:00
|
|
|
|
2022-05-21 05:22:32 +12:00
|
|
|
AshPostgres.Expr.dynamic_expr(
|
|
|
|
query,
|
|
|
|
expr,
|
2022-10-11 05:06:54 +13:00
|
|
|
query.__ash_bindings__
|
2022-05-21 05:22:32 +12:00
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
else
|
2022-01-14 08:11:30 +13:00
|
|
|
Ecto.Query.dynamic(
|
|
|
|
[row],
|
|
|
|
fragment("array_agg(?)", field(row, ^key))
|
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
|
|
|
filtered =
|
2022-09-29 11:01:20 +13:00
|
|
|
if !has_exists? && aggregate.query && aggregate.query.filter &&
|
2021-12-21 16:19:24 +13:00
|
|
|
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
|
|
|
|
expr =
|
|
|
|
AshPostgres.Expr.dynamic_expr(
|
2022-01-25 11:59:31 +13:00
|
|
|
query,
|
2021-12-21 16:19:24 +13:00
|
|
|
aggregate.query.filter,
|
2022-05-21 05:22:32 +12:00
|
|
|
query.__ash_bindings__,
|
|
|
|
false,
|
|
|
|
AshPostgres.Types.parameterized_type(aggregate.type, [])
|
2021-12-21 16:19:24 +13:00
|
|
|
)
|
|
|
|
|
|
|
|
Ecto.Query.dynamic(filter(^field, ^expr))
|
|
|
|
else
|
|
|
|
field
|
|
|
|
end
|
|
|
|
|
|
|
|
with_default =
|
|
|
|
if aggregate.default_value do
|
2022-02-17 16:04:54 +13:00
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
|
|
|
|
else
|
|
|
|
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
else
|
|
|
|
filtered
|
|
|
|
end
|
|
|
|
|
2022-02-17 16:04:54 +13:00
|
|
|
cast =
|
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(type(^with_default, ^{:array, type}))
|
|
|
|
else
|
|
|
|
with_default
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-14 08:11:30 +13:00
|
|
|
select_or_merge(query, aggregate.name, cast)
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
def add_subquery_aggregate_select(query, %{kind: kind} = aggregate, resource, has_exists?)
|
2021-12-21 16:19:24 +13:00
|
|
|
when kind in [:count, :sum] do
|
|
|
|
query = AshPostgres.DataLayer.default_bindings(query, aggregate.resource)
|
|
|
|
key = aggregate.field || List.first(Ash.Resource.Info.primary_key(resource))
|
|
|
|
type = AshPostgres.Types.parameterized_type(aggregate.type, [])
|
|
|
|
|
|
|
|
field =
|
|
|
|
case kind do
|
|
|
|
:count ->
|
|
|
|
Ecto.Query.dynamic([row], count(field(row, ^key)))
|
|
|
|
|
|
|
|
:sum ->
|
|
|
|
Ecto.Query.dynamic([row], sum(field(row, ^key)))
|
|
|
|
end
|
|
|
|
|
|
|
|
filtered =
|
2022-09-29 11:01:20 +13:00
|
|
|
if !has_exists? && aggregate.query && aggregate.query.filter &&
|
2021-12-21 16:19:24 +13:00
|
|
|
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
|
|
|
|
expr =
|
|
|
|
AshPostgres.Expr.dynamic_expr(
|
2022-01-25 11:59:31 +13:00
|
|
|
query,
|
2021-12-21 16:19:24 +13:00
|
|
|
aggregate.query.filter,
|
2022-10-11 05:12:17 +13:00
|
|
|
query.__ash_bindings__
|
2021-12-21 16:19:24 +13:00
|
|
|
)
|
|
|
|
|
|
|
|
Ecto.Query.dynamic(filter(^field, ^expr))
|
|
|
|
else
|
|
|
|
field
|
|
|
|
end
|
|
|
|
|
2022-02-10 05:49:19 +13:00
|
|
|
with_default =
|
|
|
|
if aggregate.default_value do
|
2022-02-17 16:04:54 +13:00
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(coalesce(^filtered, type(^aggregate.default_value, ^type)))
|
|
|
|
else
|
|
|
|
Ecto.Query.dynamic(coalesce(^filtered, ^aggregate.default_value))
|
|
|
|
end
|
2022-02-10 05:49:19 +13:00
|
|
|
else
|
|
|
|
filtered
|
|
|
|
end
|
|
|
|
|
2022-02-17 16:04:54 +13:00
|
|
|
cast =
|
|
|
|
if type do
|
|
|
|
Ecto.Query.dynamic(type(^with_default, ^type))
|
|
|
|
else
|
|
|
|
with_default
|
|
|
|
end
|
2021-12-21 16:19:24 +13:00
|
|
|
|
2022-01-14 08:11:30 +13:00
|
|
|
select_or_merge(query, aggregate.name, cast)
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
2022-01-14 08:11:30 +13:00
|
|
|
defp select_or_merge(query, aggregate_name, casted) do
|
2022-01-25 11:59:31 +13:00
|
|
|
query =
|
|
|
|
if query.select do
|
|
|
|
query
|
|
|
|
else
|
|
|
|
Ecto.Query.select(query, %{})
|
|
|
|
end
|
|
|
|
|
2022-10-08 08:50:20 +13:00
|
|
|
Ecto.Query.select_merge(query, ^%{aggregate_name => casted})
|
2021-12-21 16:19:24 +13:00
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
defp aggregate_subquery(
|
|
|
|
%{type: :many_to_many} = relationship,
|
|
|
|
aggregate,
|
|
|
|
root_query,
|
|
|
|
has_exists?
|
|
|
|
) do
|
2021-12-21 16:19:24 +13:00
|
|
|
destination =
|
|
|
|
case AshPostgres.Join.maybe_get_resource_query(
|
|
|
|
relationship.destination,
|
|
|
|
relationship,
|
|
|
|
root_query
|
|
|
|
) do
|
|
|
|
{:ok, query} ->
|
|
|
|
query
|
|
|
|
|
|
|
|
_ ->
|
|
|
|
relationship.destination
|
|
|
|
end
|
|
|
|
|
|
|
|
join_relationship =
|
|
|
|
Ash.Resource.Info.relationship(relationship.source, relationship.join_relationship)
|
|
|
|
|
|
|
|
through =
|
|
|
|
case AshPostgres.Join.maybe_get_resource_query(
|
|
|
|
relationship.through,
|
|
|
|
join_relationship,
|
|
|
|
root_query
|
|
|
|
) do
|
|
|
|
{:ok, query} ->
|
|
|
|
query
|
|
|
|
|
|
|
|
_ ->
|
|
|
|
relationship.through
|
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
destination =
|
|
|
|
if has_exists? && aggregate.query && aggregate.query.filter &&
|
|
|
|
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
|
|
|
|
expr =
|
|
|
|
AshPostgres.Expr.dynamic_expr(
|
|
|
|
destination,
|
|
|
|
aggregate.query.filter,
|
|
|
|
destination.__ash_bindings__,
|
|
|
|
false,
|
|
|
|
AshPostgres.Types.parameterized_type(aggregate.type, [])
|
|
|
|
)
|
|
|
|
|
|
|
|
Ecto.Query.where(destination, ^expr)
|
|
|
|
else
|
|
|
|
destination
|
|
|
|
end
|
|
|
|
|
2021-12-21 16:19:24 +13:00
|
|
|
query =
|
|
|
|
from(destination in destination,
|
|
|
|
join: through in ^through,
|
2022-01-25 11:59:31 +13:00
|
|
|
as: ^1,
|
2021-12-21 16:19:24 +13:00
|
|
|
on:
|
2022-08-19 06:56:36 +12:00
|
|
|
field(through, ^relationship.destination_attribute_on_join_resource) ==
|
|
|
|
field(destination, ^relationship.destination_attribute),
|
|
|
|
group_by: field(through, ^relationship.source_attribute_on_join_resource)
|
2021-12-21 16:19:24 +13:00
|
|
|
)
|
|
|
|
|
|
|
|
query_tenant = aggregate.query && aggregate.query.tenant
|
|
|
|
root_tenant = root_query.prefix
|
|
|
|
|
|
|
|
if Ash.Resource.Info.multitenancy_strategy(relationship.destination) &&
|
|
|
|
(root_tenant ||
|
|
|
|
query_tenant) do
|
2022-05-23 10:30:20 +12:00
|
|
|
Ecto.Query.put_query_prefix(
|
|
|
|
query,
|
2022-08-24 11:56:46 +12:00
|
|
|
query_tenant || root_tenant || AshPostgres.DataLayer.Info.schema(relationship.destination)
|
2022-05-23 10:30:20 +12:00
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
else
|
|
|
|
%{
|
|
|
|
query
|
2022-05-23 10:30:20 +12:00
|
|
|
| prefix:
|
2022-08-24 11:56:46 +12:00
|
|
|
AshPostgres.DataLayer.Info.schema(relationship.destination) ||
|
|
|
|
AshPostgres.DataLayer.Info.repo(relationship.destination).config()[:default_prefix] ||
|
|
|
|
"public"
|
2021-12-21 16:19:24 +13:00
|
|
|
}
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
defp aggregate_subquery(relationship, aggregate, root_query, has_exists?) do
|
2021-12-21 16:19:24 +13:00
|
|
|
destination =
|
|
|
|
case AshPostgres.Join.maybe_get_resource_query(
|
|
|
|
relationship.destination,
|
|
|
|
relationship,
|
|
|
|
root_query
|
|
|
|
) do
|
|
|
|
{:ok, query} ->
|
|
|
|
query
|
|
|
|
|
|
|
|
_ ->
|
|
|
|
relationship.destination
|
|
|
|
end
|
|
|
|
|
2022-09-29 11:01:20 +13:00
|
|
|
destination =
|
|
|
|
if has_exists? && aggregate.query && aggregate.query.filter &&
|
|
|
|
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
|
|
|
|
expr =
|
|
|
|
AshPostgres.Expr.dynamic_expr(
|
|
|
|
destination,
|
|
|
|
aggregate.query.filter,
|
|
|
|
destination.__ash_bindings__,
|
|
|
|
false,
|
|
|
|
AshPostgres.Types.parameterized_type(aggregate.type, [])
|
|
|
|
)
|
|
|
|
|
|
|
|
Ecto.Query.where(destination, ^expr)
|
|
|
|
else
|
|
|
|
destination
|
|
|
|
end
|
|
|
|
|
2021-12-21 16:19:24 +13:00
|
|
|
query =
|
|
|
|
from(row in destination,
|
2022-08-19 06:56:36 +12:00
|
|
|
group_by: ^relationship.destination_attribute,
|
2022-02-10 05:49:19 +13:00
|
|
|
select: %{}
|
2021-12-21 16:19:24 +13:00
|
|
|
)
|
|
|
|
|
|
|
|
query_tenant = aggregate.query && aggregate.query.tenant
|
|
|
|
root_tenant = root_query.prefix
|
|
|
|
|
|
|
|
if Ash.Resource.Info.multitenancy_strategy(relationship.destination) &&
|
|
|
|
(root_tenant ||
|
|
|
|
query_tenant) do
|
2022-05-23 10:30:20 +12:00
|
|
|
Ecto.Query.put_query_prefix(
|
|
|
|
query,
|
2022-08-24 11:56:46 +12:00
|
|
|
query_tenant || root_tenant || AshPostgres.DataLayer.Info.schema(relationship.destination)
|
2022-05-23 10:30:20 +12:00
|
|
|
)
|
2021-12-21 16:19:24 +13:00
|
|
|
else
|
|
|
|
%{
|
|
|
|
query
|
2022-05-23 10:30:20 +12:00
|
|
|
| prefix:
|
2022-08-24 11:56:46 +12:00
|
|
|
AshPostgres.DataLayer.Info.schema(relationship.destination) ||
|
|
|
|
AshPostgres.DataLayer.Info.repo(relationship.destination).config()[:default_prefix] ||
|
|
|
|
"public"
|
2021-12-21 16:19:24 +13:00
|
|
|
}
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|