ash_postgres/lib/data_layer.ex

820 lines
23 KiB
Elixir
Raw Normal View History

defmodule AshPostgres.DataLayer do
@moduledoc """
A postgres data layer that levereges Ecto's postgres capabilities.
2020-07-08 12:01:01 +12:00
To use this data layer, you need to define an `Ecto.Repo`. Ash adds some
functionality on top of ecto repos, so you'll want to use `AshPostgres.Repo`
Then, configure your resource like so:
```
postgres do
repo MyApp.Repo
table "table_name"
end
```
"""
@postgres %Ash.Dsl.Section{
name: :postgres,
describe: """
Postgres data layer configuration
""",
schema: [
repo: [
type: {:custom, AshPostgres.DataLayer, :validate_repo, []},
required: true,
doc:
"The repo that will be used to fetch your data. See the `Ecto.Repo` documentation for more"
],
table: [
type: :string,
required: true,
doc: "The table to store and read the resource from"
]
]
}
2020-06-19 15:04:41 +12:00
alias Ash.Filter
alias Ash.Filter.{Expression, Not, Predicate}
2020-06-29 15:47:07 +12:00
alias Ash.Filter.Predicate.{Eq, GreaterThan, In, LessThan}
alias AshPostgres.Predicates.Trigram
import AshPostgres, only: [table: 1, repo: 1]
@behaviour Ash.DataLayer
use Ash.Dsl.Extension, sections: [@postgres]
@doc false
def validate_repo(repo) do
if repo.__adapter__() == Ecto.Adapters.Postgres do
{:ok, repo}
else
{:error, "Expected a repo using the postgres adapter `Ecto.Adapters.Postgres`"}
end
end
@impl true
def custom_filters(resource) do
config = repo(resource).config()
add_pg_trgm_search(%{}, config)
end
defp add_pg_trgm_search(filters, config) do
if "pg_trgm" in config[:installed_extensions] do
Map.update(filters, :string, [{:trigram, AshPostgres.Predicates.Trigram}], fn filters ->
[{:trigram, AshPostgres.Predicates.Trigram} | filters]
end)
else
filters
end
end
2020-07-23 17:13:47 +12:00
import Ecto.Query, only: [from: 2, subquery: 1]
@impl true
2020-06-19 15:04:41 +12:00
def can?(_, :async_engine), do: true
def can?(_, :transact), do: true
def can?(_, :composite_primary_key), do: true
def can?(_, :upsert), do: true
2020-06-29 14:29:38 +12:00
def can?(_, :join), do: true
def can?(_, :boolean_filter), do: true
2020-07-23 17:13:47 +12:00
def can?(_, :aggregate_filter), do: true
def can?(_, :aggregate_sort), do: true
2020-06-29 15:47:07 +12:00
def can?(_, {:filter_predicate, _, %In{}}), do: true
def can?(_, {:filter_predicate, _, %Eq{}}), do: true
def can?(_, {:filter_predicate, _, %LessThan{}}), do: true
def can?(_, {:filter_predicate, _, %GreaterThan{}}), do: true
def can?(_, {:filter_predicate, :string, %Trigram{}}), do: true
2020-06-29 14:29:38 +12:00
def can?(_, {:filter_predicate, _}), do: false
2020-07-23 17:13:47 +12:00
def can?(_, {:sort, _}), do: true
@impl true
def in_transaction?(resource) do
repo(resource).in_transaction?()
end
@impl true
def limit(query, nil, _), do: {:ok, query}
def limit(query, limit, _resource) do
{:ok, from(row in query, limit: ^limit)}
end
2020-07-08 12:01:01 +12:00
@impl true
def source(resource) do
table(resource)
end
@impl true
def offset(query, nil, _), do: query
def offset(query, offset, _resource) do
{:ok, from(row in query, offset: ^offset)}
end
@impl true
def run_query(query, resource) do
{:ok, repo(resource).all(query)}
end
@impl true
def resource_to_query(resource),
do: Ecto.Queryable.to_query({table(resource), resource})
@impl true
def create(resource, changeset) do
2020-07-13 16:41:38 +12:00
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource)))
|> ecto_changeset(changeset)
|> repo(resource).insert()
rescue
e ->
{:error, e}
end
2020-07-13 16:41:38 +12:00
defp ecto_changeset(record, changeset) do
Ecto.Changeset.change(record, changeset.attributes)
end
@impl true
def upsert(resource, changeset) do
2020-07-13 16:41:38 +12:00
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource)))
|> ecto_changeset(changeset)
|> repo(resource).insert(
on_conflict: :replace_all,
2020-07-15 17:42:12 +12:00
conflict_target: Ash.Resource.primary_key(resource)
)
rescue
e ->
{:error, e}
end
@impl true
def update(resource, changeset) do
2020-07-13 16:41:38 +12:00
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource)))
|> ecto_changeset(changeset)
|> repo(resource).update()
rescue
e ->
{:error, e}
end
@impl true
def destroy(%resource{} = record) do
case repo(resource).delete(record) do
{:ok, _record} -> :ok
{:error, error} -> {:error, error}
end
rescue
e ->
{:error, e}
end
@impl true
def sort(query, sort, _resource) do
2020-07-23 17:13:47 +12:00
query = default_bindings(query)
sort_expr =
sort
|> sanitize_sort()
|> Enum.map(fn {order, sort} ->
binding =
case Map.fetch(query.__ash_bindings__.aggregates, sort) do
{:ok, binding} ->
binding
:error ->
0
end
{order, {{:., [], [{:&, [], [binding]}, sort]}, [], []}}
end)
{:ok, %{query | order_bys: [%Ecto.Query.QueryExpr{expr: sort_expr, params: []}]}}
end
defp sanitize_sort(sort) do
sort
|> List.wrap()
|> Enum.map(fn
{sort, order} -> {order, sort}
sort -> sort
end)
end
@impl true
2020-06-19 15:04:41 +12:00
def filter(query, %{expression: false}, _resource) do
impossible_query = from(row in query, where: false)
{:ok, Map.put(impossible_query, :__impossible__, true)}
end
def filter(query, filter, resource) do
relationship_paths =
filter
|> Filter.relationship_paths()
|> Enum.map(fn path ->
relationship_path_to_relationships(resource, path)
end)
new_query =
query
2020-06-19 15:04:41 +12:00
|> join_all_relationships(resource, relationship_paths)
|> add_filter_expression(filter)
2020-06-19 15:04:41 +12:00
{:ok, new_query}
end
2020-07-23 17:13:47 +12:00
defp default_bindings(query) do
Map.put_new(query, :__ash_bindings__, %{
current: Enum.count(query.joins) + 1,
aggregates: %{},
bindings: %{0 => %{path: [], type: :root}}
})
end
@impl true
def add_aggregate(query, aggregate, resource) do
query = default_bindings(query)
{query, binding} =
case get_binding(resource, aggregate.relationship_path, query, :aggregate) do
nil ->
relationship = Ash.Resource.relationship(resource, aggregate.relationship_path)
subquery = aggregate_subquery(relationship, aggregate)
new_query =
join_relationship(
query,
relationship_path_to_relationships(resource, aggregate.relationship_path),
{:aggregate, aggregate.name, subquery}
)
{new_query, get_binding(resource, aggregate.relationship_path, new_query, :aggregate)}
binding ->
{query, binding}
end
query_with_aggregate_binding =
put_in(
query.__ash_bindings__.aggregates,
Map.put(query.__ash_bindings__.aggregates, aggregate.name, binding)
)
new_query =
query_with_aggregate_binding
|> add_aggregate_to_subquery(resource, aggregate, binding)
|> select_aggregate(resource, aggregate)
{:ok, new_query}
end
defp select_aggregate(query, resource, aggregate) do
binding = get_binding(resource, aggregate.relationship_path, query, :aggregate)
query =
if query.select do
query
else
from(row in query,
select: row,
select_merge: %{aggregates: %{}}
)
end
%{query | select: add_to_select(query.select, binding, aggregate)}
end
defp add_to_select(
%{expr: {:merge, _, [first, {:%{}, _, [{:aggregates, {:%{}, [], fields}}]}]}} = select,
binding,
aggregate
) do
field =
{:type, [],
[
{{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []},
Ash.Type.ecto_type(aggregate.type)
]}
field_with_default =
if aggregate.default_value do
{:coalesce, [],
[
field,
aggregate.default_value
]}
end
new_fields = [
{aggregate.name, field_with_default}
| fields
]
%{select | expr: {:merge, [], [first, {:%{}, [], [{:aggregates, {:%{}, [], new_fields}}]}]}}
end
defp add_aggregate_to_subquery(query, resource, aggregate, binding) do
new_joins =
List.update_at(query.joins, binding - 1, fn join ->
aggregate_query =
if aggregate.authorization_filter do
# TODO: I bailed on this, need to thread this failure state through
IO.inspect(aggregate.authorization_filter)
{:ok, filter} =
filter(
join.source.from.source.query,
aggregate.authorization_filter,
Ash.Resource.related(resource, aggregate.relationship_path)
)
filter
else
join.source.from.source.query
end
new_aggregate_query = add_subquery_aggregate_select(aggregate_query, aggregate, resource)
put_in(join.source.from.source.query, new_aggregate_query)
end)
%{
query
| joins: new_joins
}
end
defp aggregate_subquery(relationship, _aggregate) do
from(row in relationship.destination,
group_by: ^relationship.destination_field,
select: field(row, ^relationship.destination_field)
)
end
defp add_subquery_aggregate_select(query, %{kind: :count} = aggregate, resource) do
query = default_bindings(query)
key_to_count = List.first(Ash.Resource.primary_key(resource))
type = Ash.Type.ecto_type(aggregate.type)
field = {:count, [], [{{:., [], [{:&, [], [0]}, key_to_count]}, [], []}]}
{params, filtered} =
if aggregate.query do
{params, expr} =
filter_to_expr(
aggregate.query.filter,
query.__ash_bindings__.bindings,
query.select.params
)
{params, {:filter, [], [field, expr]}}
else
{[], field}
end
cast = {:type, [], [filtered, type]}
new_expr = {:merge, [], [query.select.expr, {:%{}, [], [{aggregate.name, cast}]}]}
%{query | select: %{query.select | expr: new_expr, params: params}}
end
2020-06-19 15:04:41 +12:00
defp relationship_path_to_relationships(resource, path, acc \\ [])
defp relationship_path_to_relationships(_resource, [], acc), do: Enum.reverse(acc)
2020-06-19 15:04:41 +12:00
defp relationship_path_to_relationships(resource, [relationship | rest], acc) do
2020-07-15 17:42:12 +12:00
relationship = Ash.Resource.relationship(resource, relationship)
2020-06-19 15:04:41 +12:00
relationship_path_to_relationships(relationship.destination, rest, [relationship | acc])
end
2020-07-23 17:13:47 +12:00
defp join_all_relationships(query, resource, relationship_paths, path \\ []) do
query = default_bindings(query)
2020-06-19 15:04:41 +12:00
Enum.reduce(relationship_paths, query, fn [relationship | rest_rels], query ->
# Eventually this will not be a constant
join_type = :left
2020-06-19 15:04:41 +12:00
current_path = [relationship | path]
2020-07-23 17:13:47 +12:00
if has_binding?(resource, query, Enum.reverse(current_path), :aggregate) do
query
else
joined_query = join_relationship(query, current_path, join_type)
joined_query_with_distinct = add_distinct(relationship, join_type, joined_query)
join_all_relationships(
joined_query_with_distinct,
relationship.destination,
rest_rels,
current_path
)
end
end)
end
2020-07-23 17:13:47 +12:00
defp has_binding?(resource, path, %{__ash_bindings__: _} = query, type) do
paths =
Enum.flat_map(query.__ash_bindings__.bindings, fn
{_, %{path: path, type: ^type}} ->
[path]
2020-07-23 17:13:47 +12:00
_ ->
[]
end)
Enum.any?(paths, &Ash.SatSolver.synonymous_relationship_paths?(resource, &1, path))
end
defp has_binding?(_, _, _, _), do: false
defp get_binding(resource, path, %{__ash_bindings__: _} = query, type) do
paths =
Enum.flat_map(query.__ash_bindings__.bindings, fn
{binding, %{path: path, type: ^type}} ->
[{binding, path}]
_ ->
[]
end)
Enum.find_value(paths, fn {binding, candidate_path} ->
Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) && binding
end)
end
2020-07-23 17:13:47 +12:00
defp get_binding(_, _, _, _), do: nil
defp add_distinct(relationship, join_type, joined_query) do
if relationship.cardinality == :many and join_type == :left && !joined_query.distinct do
from(row in joined_query,
2020-07-15 17:42:12 +12:00
distinct: ^Ash.Resource.primary_key(relationship.destination)
)
else
joined_query
end
end
defp join_relationship(query, path, join_type) do
path_names = Enum.map(path, & &1.name)
case Map.get(query.__ash_bindings__.bindings, path_names) do
%{type: existing_join_type} when join_type != existing_join_type ->
raise "unreachable?"
nil ->
do_join_relationship(query, path, join_type)
_ ->
query
end
end
defp do_join_relationship(query, relationships, kind, path \\ [])
2020-07-23 17:13:47 +12:00
defp do_join_relationship(_query, [], {:aggregate, _, subquery}, _path) do
{:left_lateral, subquery}
end
defp do_join_relationship(_, [], _, _), do: nil
2020-07-23 17:13:47 +12:00
defp do_join_relationship(query, [%{type: :many_to_many} = relationship | rest], kind, path) do
relationship_through = maybe_get_resource_query(relationship.through)
relationship_destination =
2020-07-23 17:13:47 +12:00
do_join_relationship(query, rest, kind, [relationship.name | path]) ||
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
new_query =
2020-07-23 17:13:47 +12:00
case relationship_destination do
{:left_lateral, subquery} ->
subquery =
subquery(
from(destination in subquery,
where:
field(destination, ^relationship.destination_field) ==
field(
parent_as(:rel_through),
^relationship.destination_field_on_join_table
)
)
)
from(row in query,
left_join: through in ^relationship_through,
as: :rel_through,
on:
field(row, ^relationship.source_field) ==
field(through, ^relationship.source_field_on_join_table),
left_lateral_join: destination in ^subquery,
on:
field(destination, ^relationship.destination_field) ==
field(through, ^relationship.destination_field_on_join_table)
)
relationship_destination ->
from(row in query,
left_join: through in ^relationship_through,
on:
field(row, ^relationship.source_field) ==
field(through, ^relationship.source_field_on_join_table),
left_join: destination in ^relationship_destination,
on:
field(destination, ^relationship.destination_field) ==
field(through, ^relationship.destination_field_on_join_table)
)
end
join_path =
Enum.reverse([String.to_existing_atom(to_string(relationship.name) <> "_join_assoc") | path])
full_path = Enum.reverse([relationship.name | path])
2020-07-23 17:13:47 +12:00
binding_data =
case {kind, rest} do
{{:aggregate, name, _agg}, []} -> %{type: :aggregate, name: name, path: full_path}
_ -> %{type: :left, path: full_path}
end
new_query
2020-07-23 17:13:47 +12:00
|> add_binding(%{path: join_path, type: :left})
|> add_binding(binding_data)
|> merge_bindings(relationship_destination)
end
2020-07-23 17:13:47 +12:00
defp do_join_relationship(query, [relationship | rest], kind, path) do
relationship_destination =
2020-07-23 17:13:47 +12:00
do_join_relationship(query, rest, kind, [relationship.name | path]) ||
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
new_query =
2020-07-23 17:13:47 +12:00
case relationship_destination do
{:left_lateral, subquery} ->
subquery =
from(
sub in subquery(
from(destination in subquery,
where:
field(destination, ^relationship.destination_field) ==
field(parent_as(:rel_source), ^relationship.source_field)
)
),
select: field(sub, ^relationship.destination_field)
)
from(row in query,
as: :rel_source,
left_lateral_join: destination in ^subquery,
on:
field(row, ^relationship.source_field) ==
field(destination, ^relationship.destination_field)
)
relationship_destination ->
from(row in query,
left_join: destination in ^relationship_destination,
on:
field(row, ^relationship.source_field) ==
field(destination, ^relationship.destination_field)
)
end
full_path = Enum.reverse([relationship.name | path])
2020-07-23 17:13:47 +12:00
binding_data =
case {kind, rest} do
{{:aggregate, name, _agg}, []} -> %{type: :aggregate, name: name, path: full_path}
_ -> %{type: :left, path: full_path}
end
new_query
2020-07-23 17:13:47 +12:00
|> add_binding(binding_data)
|> merge_bindings(relationship_destination)
end
defp add_filter_expression(query, filter) do
{params, expr} = filter_to_expr(filter, query.__ash_bindings__.bindings, [])
if expr do
boolean_expr = %Ecto.Query.BooleanExpr{
expr: expr,
op: :and,
params: params
}
%{query | wheres: [boolean_expr | query.wheres]}
else
query
end
end
2020-06-19 15:04:41 +12:00
defp filter_to_expr(%Filter{expression: expression}, bindings, params) do
filter_to_expr(expression, bindings, params)
end
2020-06-19 15:04:41 +12:00
# A nil filter means "everything"
defp filter_to_expr(nil, _, _), do: {[], true}
# A true filter means "everything"
defp filter_to_expr(true, _, _), do: true
# A false filter means "nothing"
defp filter_to_expr(false, _, _), do: {[], false}
defp filter_to_expr(%Expression{op: op, left: left, right: right}, bindings, params) do
{params, left_expr} = filter_to_expr(left, bindings, params)
{params, right_expr} = filter_to_expr(right, bindings, params)
{params, {op, [], [left_expr, right_expr]}}
end
2020-06-19 15:04:41 +12:00
defp filter_to_expr(%Not{expression: expression}, bindings, params) do
{params, new_expression} = filter_to_expr(expression, bindings, params)
{params, {:not, [], [new_expression]}}
end
2020-06-19 15:04:41 +12:00
defp filter_to_expr(%Predicate{} = pred, bindings, params) do
%{predicate: predicate, relationship_path: relationship_path, attribute: attribute} = pred
2020-07-23 17:13:47 +12:00
current_binding =
case attribute do
%Ash.Resource.Attribute{} ->
Enum.find_value(bindings, fn {binding, data} ->
data.path == relationship_path && data.type in [:left, :root] && binding
end)
%Ash.Query.Aggregate{} = aggregate ->
Enum.find_value(bindings, fn {binding, data} ->
data.path == aggregate.relationship_path && data.type == :aggregate && binding
end)
end
type = Ash.Type.ecto_type(attribute.type)
2020-07-23 17:13:47 +12:00
filter_value_to_expr(attribute.name, predicate, type, current_binding, params)
end
2020-07-23 17:13:47 +12:00
defp filter_value_to_expr(attribute, %Eq{value: value}, type, current_binding, params) do
2020-06-29 15:47:07 +12:00
simple_operator_expr(
:==,
params,
value,
2020-07-23 17:13:47 +12:00
type,
2020-06-29 15:47:07 +12:00
current_binding,
attribute
)
end
2020-07-23 17:13:47 +12:00
defp filter_value_to_expr(attribute, %LessThan{value: value}, type, current_binding, params) do
2020-06-29 15:47:07 +12:00
simple_operator_expr(
:<,
params,
value,
2020-07-23 17:13:47 +12:00
type,
2020-06-29 15:47:07 +12:00
current_binding,
attribute
)
end
2020-07-23 17:13:47 +12:00
defp filter_value_to_expr(attribute, %GreaterThan{value: value}, type, current_binding, params) do
2020-06-29 15:47:07 +12:00
simple_operator_expr(
:>,
params,
value,
2020-07-23 17:13:47 +12:00
type,
2020-06-29 15:47:07 +12:00
current_binding,
attribute
)
end
2020-07-23 17:13:47 +12:00
defp filter_value_to_expr(attribute, %In{values: values}, type, current_binding, params) do
2020-06-29 15:47:07 +12:00
simple_operator_expr(
:in,
params,
values,
2020-07-23 17:13:47 +12:00
{:in, type},
2020-06-29 15:47:07 +12:00
current_binding,
attribute
)
end
defp filter_value_to_expr(
attribute,
%Trigram{} = trigram,
2020-07-23 17:13:47 +12:00
_type,
current_binding,
params
) do
param_count = Enum.count(params)
case trigram do
%{equals: equals, greater_than: nil, less_than: nil, text: text} ->
{params ++ [{text, {current_binding, attribute}}, {equals, :float}],
{:fragment, [],
[
raw: "similarity(",
expr: {{:., [], [{:&, [], [current_binding]}, attribute]}, [], []},
raw: ", ",
expr: {:^, [], [param_count]},
raw: ") = ",
expr: {:^, [], [param_count + 1]},
raw: ""
]}}
%{equals: nil, greater_than: greater_than, less_than: nil, text: text} ->
{params ++ [{text, {current_binding, attribute}}, {greater_than, :float}],
{:fragment, [],
[
raw: "similarity(",
expr: {{:., [], [{:&, [], [current_binding]}, attribute]}, [], []},
raw: ", ",
expr: {:^, [], [param_count]},
raw: ") > ",
expr: {:^, [], [param_count + 1]},
raw: ""
]}}
%{equals: nil, greater_than: nil, less_than: less_than, text: text} ->
{params ++ [{text, {current_binding, attribute}}, {less_than, :float}],
{:fragment, [],
[
raw: "similarity(",
expr: {{:., [], [{:&, [], [current_binding]}, attribute]}, [], []},
raw: ", ",
expr: {:^, [], [param_count]},
raw: ") < ",
expr: {:^, [], [param_count + 1]},
raw: ""
]}}
%{equals: nil, greater_than: greater_than, less_than: less_than, text: text} ->
{params ++
[{text, {current_binding, attribute}}, {less_than, :float}, {greater_than, :float}],
{:fragment, [],
[
raw: "similarity(",
expr: {{:., [], [{:&, [], [current_binding]}, attribute]}, [], []},
raw: ", ",
expr: {:^, [], [param_count]},
raw: ") BETWEEN ",
expr: {:^, [], [param_count + 1]},
raw: " AND ",
expr: {:^, [], [param_count + 2]},
raw: ""
]}}
end
end
2020-06-29 15:47:07 +12:00
defp simple_operator_expr(op, params, value, type, current_binding, attribute) do
{params ++ [{value, type}],
{op, [],
[
{{:., [], [{:&, [], [current_binding]}, attribute]}, [], []},
{:^, [], [Enum.count(params)]}
]}}
end
defp merge_bindings(query, %{__ash_bindings__: ash_bindings}) do
ash_bindings
|> Map.get(:bindings)
2020-07-23 17:13:47 +12:00
|> Enum.sort_by(&elem(&1, 0))
|> Enum.reduce(query, fn {_binding, data}, query ->
add_binding(query, data)
end)
end
defp merge_bindings(query, _) do
query
end
2020-07-23 17:13:47 +12:00
defp add_binding(query, data) do
current = query.__ash_bindings__.current
bindings = query.__ash_bindings__.bindings
new_ash_bindings = %{
query.__ash_bindings__
2020-07-23 17:13:47 +12:00
| bindings: Map.put(bindings, current, data),
current: current + 1
}
%{query | __ash_bindings__: new_ash_bindings}
end
@impl true
2020-07-08 12:01:01 +12:00
def transaction(resource, func) do
repo(resource).transaction(func)
end
@impl true
2020-07-08 12:01:01 +12:00
def rollback(resource, term) do
repo(resource).rollback(term)
end
defp maybe_get_resource_query(resource) do
2020-06-28 07:11:28 +12:00
{table(resource), resource}
end
end