feat: support inner joins when possible (#15)

fix: better support for aggregates/calculations when delegating
This commit is contained in:
Zach Daniel 2020-09-02 00:01:34 -04:00 committed by GitHub
parent 32afbf121d
commit 52a39d0597
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 314 additions and 105 deletions

View file

@ -17,7 +17,7 @@ jobs:
matrix:
otp: ["23", "22"]
elixir: ["1.10.0"]
ash: ["master", "1.10"]
ash: ["master", "1.12", "1.13"]
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ASH_VERSION: ${{matrix.ash}}

View file

@ -233,8 +233,8 @@ defmodule AshPostgres.DataLayer do
end
@impl true
def sort(query, sort, _resource) do
query = default_bindings(query)
def sort(query, sort, resource) do
query = default_bindings(query, resource)
sort
|> sanitize_sort()
@ -270,33 +270,83 @@ defmodule AshPostgres.DataLayer do
{:ok, Map.put(impossible_query, :__impossible__, true)}
end
def filter(query, filter, resource) do
def filter(query, filter, _resource) do
relationship_paths =
filter
|> Filter.relationship_paths()
|> Enum.map(fn path ->
relationship_path_to_relationships(resource, path)
if can_inner_join?(path, filter) do
{:inner, relationship_path_to_relationships(filter.resource, path)}
else
{:left, relationship_path_to_relationships(filter.resource, path)}
end
end)
new_query =
query
|> join_all_relationships(resource, relationship_paths)
|> join_all_relationships(relationship_paths)
|> add_filter_expression(filter)
{:ok, new_query}
end
defp default_bindings(query) do
defp default_bindings(query, resource) do
Map.put_new(query, :__ash_bindings__, %{
current: Enum.count(query.joins) + 1,
aggregates: %{},
bindings: %{0 => %{path: [], type: :root}}
bindings: %{0 => %{path: [], type: :root, source: resource}}
})
end
defp can_inner_join?(path, expr, seen_an_or? \\ false)
defp can_inner_join?(path, %{expression: expr}, seen_an_or?),
do: can_inner_join?(path, expr, seen_an_or?)
defp can_inner_join?(_path, expr, _seen_an_or?) when expr in [nil, true, false], do: true
defp can_inner_join?(path, %Expression{op: :and, left: left, right: right}, seen_an_or?) do
can_inner_join?(path, left, seen_an_or?) || can_inner_join?(path, right, seen_an_or?)
end
defp can_inner_join?(path, %Expression{op: :or, left: left, right: right}, _) do
can_inner_join?(path, left, true) && can_inner_join?(path, right, true)
end
defp can_inner_join?(
path,
%Not{expression: %Expression{op: :or, left: left, right: right}},
seen_an_or?
) do
can_inner_join?(
path,
%Expression{
op: :and,
left: %Not{expression: left},
right: %Not{expression: right}
},
seen_an_or?
)
end
defp can_inner_join?(path, %Not{expression: expression}, seen_an_or?) do
can_inner_join?(path, expression, seen_an_or?)
end
defp can_inner_join?(_path, %Predicate{predicate: %IsNil{nil?: true}}, seen_an_or?),
do: not seen_an_or?
defp can_inner_join?(search_path, %Predicate{relationship_path: relationship_path}, seen_an_or?)
when search_path == relationship_path do
not seen_an_or?
end
defp can_inner_join?(_, _, _), do: true
@impl true
def add_aggregate(query, aggregate, resource) do
query = default_bindings(query)
def add_aggregate(query, aggregate, _resource) do
resource = aggregate.resource
query = default_bindings(query, resource)
{query, binding} =
case get_binding(resource, aggregate.relationship_path, query, :aggregate) do
@ -305,10 +355,12 @@ defmodule AshPostgres.DataLayer do
subquery = aggregate_subquery(relationship, aggregate)
new_query =
join_relationship(
join_all_relationships(
query,
relationship_path_to_relationships(resource, aggregate.relationship_path),
{:aggregate, aggregate.name, subquery}
[
{{:aggregate, aggregate.name, subquery},
relationship_path_to_relationships(resource, aggregate.relationship_path)}
]
)
{new_query, get_binding(resource, aggregate.relationship_path, new_query, :aggregate)}
@ -436,7 +488,7 @@ defmodule AshPostgres.DataLayer do
end
defp add_subquery_aggregate_select(query, %{kind: :count} = aggregate, resource) do
query = default_bindings(query)
query = default_bindings(query, aggregate.resource)
key_to_count = List.first(Ash.Resource.primary_key(resource))
type = Ash.Type.ecto_type(aggregate.type)
@ -472,43 +524,62 @@ defmodule AshPostgres.DataLayer do
relationship_path_to_relationships(relationship.destination, rest, [relationship | acc])
end
defp join_all_relationships(query, resource, relationship_paths, path \\ []) do
query = default_bindings(query)
defp join_all_relationships(query, relationship_paths, path \\ [], source \\ nil) do
query = default_bindings(query, source)
Enum.reduce(relationship_paths, query, fn [relationship | rest_rels], query ->
# Eventually this will not be a constant
join_type = :left
current_path = [relationship | path]
if has_binding?(resource, Enum.reverse(current_path), query, :aggregate) do
Enum.reduce(relationship_paths, query, fn
{_join_type, []}, query ->
query
else
joined_query = join_relationship(query, current_path, join_type)
joined_query_with_distinct = add_distinct(relationship, join_type, joined_query)
{join_type, [relationship | rest_rels]}, query ->
source = source || relationship.source
join_all_relationships(
joined_query_with_distinct,
relationship.destination,
rest_rels,
current_path
)
end
current_path = path ++ [relationship]
current_join_type =
case join_type do
{:aggregate, _name, _agg} when rest_rels != [] ->
:left
other ->
other
end
if has_binding?(source, Enum.reverse(current_path), query, current_join_type) do
query
else
joined_query =
join_relationship(
query,
relationship,
Enum.map(path, & &1.name),
current_join_type,
source
)
joined_query_with_distinct = add_distinct(relationship, join_type, joined_query)
join_all_relationships(
joined_query_with_distinct,
[{join_type, rest_rels}],
current_path,
source
)
end
end)
end
defp has_binding?(resource, path, %{__ash_bindings__: _} = query, type) do
paths =
Enum.flat_map(query.__ash_bindings__.bindings, fn
{_, %{path: path, type: ^type}} ->
[path]
defp has_binding?(resource, path, query, {:aggregate, _, _}),
do: has_binding?(resource, path, query, :aggregate)
_ ->
[]
end)
defp has_binding?(resource, candidate_path, %{__ash_bindings__: _} = query, type) do
Enum.any?(query.__ash_bindings__.bindings, fn
{_, %{path: path, source: source, type: ^type}} ->
Ash.SatSolver.synonymous_relationship_paths?(resource, path, candidate_path, source)
Enum.any?(paths, &Ash.SatSolver.synonymous_relationship_paths?(resource, &1, path))
_ ->
false
end)
end
defp has_binding?(_, _, _, _), do: false
@ -540,39 +611,35 @@ defmodule AshPostgres.DataLayer do
end
end
defp join_relationship(query, path, join_type) do
path_names = Enum.map(path, & &1.name)
case Map.get(query.__ash_bindings__.bindings, path_names) do
defp join_relationship(query, relationship, path, join_type, source) do
case Map.get(query.__ash_bindings__.bindings, path) do
%{type: existing_join_type} when join_type != existing_join_type ->
raise "unreachable?"
nil ->
do_join_relationship(query, path, join_type)
do_join_relationship(query, relationship, path, join_type, source)
_ ->
query
end
end
defp do_join_relationship(query, relationships, kind, path \\ [])
defp do_join_relationship(_query, [], {:aggregate, _, subquery}, _path) do
{:left_lateral, subquery}
end
defp do_join_relationship(_, [], _, _), do: nil
defp do_join_relationship(query, [%{type: :many_to_many} = relationship | rest], kind, path) do
defp do_join_relationship(query, %{type: :many_to_many} = relationship, path, kind, source) do
relationship_through = maybe_get_resource_query(relationship.through)
relationship_destination =
do_join_relationship(query, rest, kind, [relationship.name | path]) ||
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
current_binding =
Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} ->
if data.type == kind && data.path == Enum.reverse(path) do
binding
end
end)
new_query =
case relationship_destination do
{:left_lateral, subquery} ->
case kind do
{:aggregate, _, subquery} ->
subquery =
subquery(
from(destination in subquery,
@ -585,7 +652,7 @@ defmodule AshPostgres.DataLayer do
)
)
from(row in query,
from([{row, current_binding}] in query,
left_join: through in ^relationship_through,
as: :rel_through,
on:
@ -597,8 +664,20 @@ defmodule AshPostgres.DataLayer do
field(through, ^relationship.destination_field_on_join_table)
)
relationship_destination ->
from(row in query,
:inner ->
from([{row, current_binding}] in query,
join: through in ^relationship_through,
on:
field(row, ^relationship.source_field) ==
field(through, ^relationship.source_field_on_join_table),
join: destination in ^relationship_destination,
on:
field(destination, ^relationship.destination_field) ==
field(through, ^relationship.destination_field_on_join_table)
)
_ ->
from([{row, current_binding}] in query,
left_join: through in ^relationship_through,
on:
field(row, ^relationship.source_field) ==
@ -616,25 +695,33 @@ defmodule AshPostgres.DataLayer do
full_path = Enum.reverse([relationship.name | path])
binding_data =
case {kind, rest} do
{{:aggregate, name, _agg}, []} -> %{type: :aggregate, name: name, path: full_path}
_ -> %{type: :left, path: full_path}
case kind do
{:aggregate, name, _agg} ->
%{type: :aggregate, name: name, path: full_path, source: source}
_ ->
%{type: kind, path: full_path, source: source}
end
new_query
|> add_binding(%{path: join_path, type: :left})
|> add_binding(%{path: join_path, type: :left, source: source})
|> add_binding(binding_data)
|> merge_bindings(relationship_destination)
end
defp do_join_relationship(query, [relationship | rest], kind, path) do
defp do_join_relationship(query, relationship, path, kind, source) do
relationship_destination =
do_join_relationship(query, rest, kind, [relationship.name | path]) ||
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
current_binding =
Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} ->
if data.type == kind && data.path == Enum.reverse(path) do
binding
end
end)
new_query =
case relationship_destination do
{:left_lateral, subquery} ->
case kind do
{:aggregate, _, subquery} ->
subquery =
from(
sub in subquery(
@ -647,7 +734,7 @@ defmodule AshPostgres.DataLayer do
select: field(sub, ^relationship.destination_field)
)
from(row in query,
from([{row, current_binding}] in query,
as: :rel_source,
left_lateral_join: destination in ^subquery,
on:
@ -655,8 +742,16 @@ defmodule AshPostgres.DataLayer do
field(destination, ^relationship.destination_field)
)
relationship_destination ->
from(row in query,
:inner ->
from([{row, current_binding}] in query,
join: destination in ^relationship_destination,
on:
field(row, ^relationship.source_field) ==
field(destination, ^relationship.destination_field)
)
_ ->
from([{row, current_binding}] in query,
left_join: destination in ^relationship_destination,
on:
field(row, ^relationship.source_field) ==
@ -667,32 +762,159 @@ defmodule AshPostgres.DataLayer do
full_path = Enum.reverse([relationship.name | path])
binding_data =
case {kind, rest} do
{{:aggregate, name, _agg}, []} -> %{type: :aggregate, name: name, path: full_path}
_ -> %{type: :left, path: full_path}
case kind do
{:aggregate, name, _agg} ->
%{type: :aggregate, name: name, path: full_path, source: source}
_ ->
%{type: kind, path: full_path, source: source}
end
new_query
|> add_binding(binding_data)
|> merge_bindings(relationship_destination)
end
defp add_filter_expression(query, filter) do
{params, expr} = filter_to_expr(filter, query.__ash_bindings__.bindings, [])
filter
|> split_and_statements()
|> Enum.reduce(query, fn filter, query ->
clause = filter_to_dynamic_expr(filter, query.__ash_bindings__.bindings)
if expr do
boolean_expr = %Ecto.Query.BooleanExpr{
expr: expr,
op: :and,
params: params
}
from(row in query,
where: ^clause
)
end)
end
%{query | wheres: [boolean_expr | query.wheres]}
else
query
defp split_and_statements(%Filter{expression: expression}) do
split_and_statements(expression)
end
defp split_and_statements(%Expression{op: :and, left: left, right: right}) do
split_and_statements(left) ++ split_and_statements(right)
end
defp split_and_statements(%Not{expression: %Not{expression: expression}}) do
split_and_statements(expression)
end
defp split_and_statements(%Not{expression: %Expression{op: :or, left: left, right: right}}) do
split_and_statements(%Expression{
op: :and,
left: %Not{expression: left},
right: %Not{expression: right}
})
end
defp split_and_statements(other), do: [other]
defp filter_to_dynamic_expr(%Filter{expression: expression}, bindings) do
filter_to_dynamic_expr(expression, bindings)
end
defp filter_to_dynamic_expr(nil, _), do: true
defp filter_to_dynamic_expr(true, _), do: true
defp filter_to_dynamic_expr(false, _), do: false
defp filter_to_dynamic_expr(%Expression{op: :and, left: left, right: right}, bindings) do
left = filter_to_dynamic_expr(left, bindings)
right = filter_to_dynamic_expr(right, bindings)
Ecto.Query.dynamic([row], ^left and ^right)
end
defp filter_to_dynamic_expr(%Expression{op: :or, left: left, right: right}, bindings) do
left = filter_to_dynamic_expr(left, bindings)
right = filter_to_dynamic_expr(right, bindings)
Ecto.Query.dynamic([row], ^left or ^right)
end
defp filter_to_dynamic_expr(%Not{expression: expression}, bindings) do
expression = filter_to_dynamic_expr(expression, bindings)
Ecto.Query.dynamic([row], not (^expression))
end
defp filter_to_dynamic_expr(%Predicate{} = pred, bindings) do
%{predicate: predicate, relationship_path: relationship_path, attribute: attribute} = pred
current_binding =
case attribute do
%Ash.Resource.Attribute{} ->
Enum.find_value(bindings, fn {binding, data} ->
data.path == relationship_path && data.type in [:left, :inner, :root] && binding
end)
%Ash.Query.Aggregate{} = aggregate ->
Enum.find_value(bindings, fn {binding, data} ->
data.path == aggregate.relationship_path && data.type == :aggregate && binding
end)
end
type = Ash.Type.ecto_type(attribute.type)
filter_value_to_dynamic_expr(attribute, predicate, type, current_binding)
end
defp filter_value_to_dynamic_expr(attribute, %Eq{value: value}, _type, current_binding) do
Ecto.Query.dynamic([{row, current_binding}], field(row, ^attribute.name) == ^value)
end
defp filter_value_to_dynamic_expr(attribute, %LessThan{value: value}, _type, current_binding) do
Ecto.Query.dynamic([{row, current_binding}], field(row, ^attribute.name) < ^value)
end
defp filter_value_to_dynamic_expr(attribute, %GreaterThan{value: value}, _type, current_binding) do
Ecto.Query.dynamic([{row, current_binding}], field(row, ^attribute.name) > ^value)
end
defp filter_value_to_dynamic_expr(attribute, %In{values: values}, _type, current_binding) do
Ecto.Query.dynamic([{row, current_binding}], field(row, ^attribute.name) in ^values)
end
defp filter_value_to_dynamic_expr(attribute, %IsNil{nil?: true}, _type, current_binding) do
Ecto.Query.dynamic([{row, current_binding}], is_nil(field(row, ^attribute.name)))
end
defp filter_value_to_dynamic_expr(attribute, %IsNil{nil?: false}, _type, current_binding) do
Ecto.Query.dynamic([{row, current_binding}], not is_nil(field(row, ^attribute.name)))
end
defp filter_value_to_dynamic_expr(attribute, %Trigram{} = trigram, _type, current_binding) do
case trigram do
%{equals: nil, greater_than: greater_than, less_than: nil, text: text} ->
Ecto.Query.dynamic(
[{row, current_binding}],
fragment("similarity(?, ?) > ?", field(row, ^attribute.name), ^text, ^greater_than)
)
%{equals: nil, greater_than: nil, less_than: less_than, text: text} ->
Ecto.Query.dynamic(
[{row, current_binding}],
fragment("similarity(?, ?) < ?", field(row, ^attribute.name), ^text, ^less_than)
)
%{equals: nil, greater_than: greater_than, less_than: less_than, text: text} ->
Ecto.Query.dynamic(
[{row, current_binding}],
fragment(
"similarity(?, ?) BETWEEN ? AND ?",
field(row, ^attribute.name),
^text,
^less_than,
^greater_than
)
)
%{equals: equals, text: text} ->
Ecto.Query.dynamic(
[{row, current_binding}],
fragment("similarity(?, ?) = ?", field(row, ^attribute.name), ^text, ^equals)
)
end
end
# IMPORTANT: We need to rework this so we don't need this hacky logic.
# Specifically, we can't use dynamic expers in selects, so we need this for aggregates :(
defp filter_to_expr(%Filter{expression: expression}, bindings, params) do
filter_to_expr(expression, bindings, params)
end
@ -865,19 +1087,6 @@ defmodule AshPostgres.DataLayer do
]}}
end
defp merge_bindings(query, %{__ash_bindings__: ash_bindings}) do
ash_bindings
|> Map.get(:bindings)
|> Enum.sort_by(&elem(&1, 0))
|> Enum.reduce(query, fn {_binding, data}, query ->
add_binding(query, data)
end)
end
defp merge_bindings(query, _) do
query
end
defp add_binding(query, data) do
current = query.__ash_bindings__.current
bindings = query.__ash_bindings__.bindings

View file

@ -61,7 +61,7 @@ defmodule AshPostgres.MixProject do
[
{:ecto_sql, "~> 3.4"},
{:postgrex, ">= 0.0.0"},
{:ash, ash_version("~> 1.11")},
{:ash, ash_version("~> 1.13")},
{:git_ops, "~> 2.0.1", only: :dev},
{:ex_doc, "~> 0.22", only: :dev, runtime: false},
{:ex_check, "~> 0.11.0", only: :dev},

View file

@ -1,5 +1,5 @@
%{
"ash": {:hex, :ash, "1.11.0", "cadbe38089f0545905532e393751f3a69e294d5489441102c13b5c8b0b245e6f", [:mix], [{:ecto, "~> 3.4", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8.0", [hex: :ets, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.1.4", [hex: :picosat_elixir, repo: "hexpm", optional: false]}], "hexpm", "e8893bc02f2d687778639a58c0c08ec472d3226217e7e5a3bb1c23ba7fc7cbfd"},
"ash": {:hex, :ash, "1.13.0", "7821561e4529628610ceac2b0ea0886b8bf0800ef4443cc50c806de7552ec7c3", [:mix], [{:ecto, "~> 3.4", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8.0", [hex: :ets, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.1.4", [hex: :picosat_elixir, repo: "hexpm", optional: false]}], "hexpm", "be9302effcebcb2fbc1f79a133e8ee789678c8b6938dbd6355630d98cc88b687"},
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"certifi": {:hex, :certifi, "2.5.2", "b7cfeae9d2ed395695dd8201c57a2d019c0c43ecaf8b8bcb9320b40d6662f340", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "3b3b5f36493004ac3455966991eaf6e768ce9884693d9968055aeeeb1e575040"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"},