ash_postgres/lib/data_layer.ex

1598 lines
42 KiB
Elixir
Raw Normal View History

defmodule AshPostgres.DataLayer do
2020-10-29 15:26:45 +13:00
@manage_tenant %Ash.Dsl.Section{
name: :manage_tenant,
describe: """
Configuration for the behavior of a resource that manages a tenant
""",
2020-12-27 19:20:12 +13:00
examples: [
"""
manage_tenant do
template ["organization_", :id]
create? true
update? false
end
"""
],
2020-10-29 15:26:45 +13:00
schema: [
template: [
type: {:custom, __MODULE__, :tenant_template, []},
required: true,
doc: """
A template that will cause the resource to create/manage the specified schema.
Use this if you have a resource that, when created, it should create a new tenant
for you. For example, if you have a `customer` resource, and you want to create
a schema for each customer based on their id, e.g `customer_10` set this option
to `["customer_", :id]`. Then, when this is created, it will create a schema called
`["customer_", :id]`, and run your tenant migrations on it. Then, if you were to change
that customer's id to `20`, it would rename the schema to `customer_20`. Generally speaking
you should avoid changing the tenant id.
"""
],
create?: [
type: :boolean,
default: true,
doc: "Whether or not to automatically create a tenant when a record is created"
],
update?: [
type: :boolean,
default: true,
doc: "Whether or not to automatically update the tenant name if the record is udpated"
]
]
}
2020-10-29 16:53:28 +13:00
@postgres %Ash.Dsl.Section{
name: :postgres,
describe: """
Postgres data layer configuration
""",
2020-10-29 15:26:45 +13:00
sections: [
@manage_tenant
],
modules: [
:repo
],
2020-12-27 19:20:12 +13:00
examples: [
"""
postgres do
repo MyApp.Repo
table "organizations"
end
"""
],
schema: [
repo: [
2020-10-29 16:53:28 +13:00
type: :atom,
required: true,
doc:
2020-09-03 20:18:11 +12:00
"The repo that will be used to fetch your data. See the `AshPostgres.Repo` documentation for more"
],
migrate?: [
type: :boolean,
default: true,
doc:
"Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`"
],
2020-09-20 10:08:09 +12:00
base_filter_sql: [
type: :string,
doc:
"A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter"
],
skip_unique_indexes: [
type: {:custom, __MODULE__, :validate_skip_unique_indexes, []},
default: false,
doc: "Skip generating unique indexes when generating migrations"
],
unique_index_names: [
type: :any,
default: [],
doc: """
A list of unique index names that could raise errors, or an mfa to a function that takes a changeset
and returns a list of names.
"""
],
table: [
type: :string,
required: true,
doc: "The table to store and read the resource from"
]
]
}
2020-06-19 15:04:41 +12:00
alias Ash.Filter
alias Ash.Query.{BooleanExpression, Not, Ref}
2021-01-24 16:45:15 +13:00
alias Ash.Query.Function.{Ago, Contains}
alias Ash.Query.Operator.IsNil
alias AshPostgres.Functions.{Fragment, TrigramSimilarity, Type}
import AshPostgres, only: [table: 1, repo: 1]
@behaviour Ash.DataLayer
2020-12-27 19:20:12 +13:00
@sections [@postgres]
@moduledoc """
A postgres data layer that levereges Ecto's postgres capabilities.
# Table of Contents
#{Ash.Dsl.Extension.doc_index(@sections)}
#{Ash.Dsl.Extension.doc(@sections)}
"""
2020-10-29 16:53:28 +13:00
use Ash.Dsl.Extension,
2020-12-27 19:20:12 +13:00
sections: @sections,
2020-10-29 16:53:28 +13:00
transformers: [AshPostgres.Transformers.VerifyRepo]
2020-10-29 15:26:45 +13:00
@doc false
def tenant_template(value) do
value = List.wrap(value)
if Enum.all?(value, &(is_binary(&1) || is_atom(&1))) do
{:ok, value}
else
{:error, "Expected all values for `manages_tenant` to be strings or atoms"}
end
end
2020-09-20 10:08:09 +12:00
@doc false
def validate_skip_unique_indexes(indexes) do
indexes = List.wrap(indexes)
if Enum.all?(indexes, &is_atom/1) do
{:ok, indexes}
else
{:error, "All indexes to skip must be atoms"}
end
end
2020-07-23 17:13:47 +12:00
import Ecto.Query, only: [from: 2, subquery: 1]
@impl true
2020-06-19 15:04:41 +12:00
def can?(_, :async_engine), do: true
def can?(_, :transact), do: true
def can?(_, :composite_primary_key), do: true
def can?(_, :upsert), do: true
2020-08-26 16:28:55 +12:00
def can?(resource, {:join, other_resource}) do
data_layer = Ash.Resource.data_layer(resource)
other_data_layer = Ash.Resource.data_layer(other_resource)
data_layer == other_data_layer and repo(data_layer) == repo(other_data_layer)
end
def can?(resource, {:lateral_join, other_resource}) do
data_layer = Ash.Resource.data_layer(resource)
other_data_layer = Ash.Resource.data_layer(other_resource)
data_layer == other_data_layer and repo(data_layer) == repo(other_data_layer)
end
2020-06-29 14:29:38 +12:00
def can?(_, :boolean_filter), do: true
2020-07-25 11:27:34 +12:00
def can?(_, {:aggregate, :count}), do: true
2020-07-23 17:13:47 +12:00
def can?(_, :aggregate_filter), do: true
def can?(_, :aggregate_sort), do: true
2020-08-19 16:52:23 +12:00
def can?(_, :create), do: true
def can?(_, :read), do: true
def can?(_, :update), do: true
def can?(_, :destroy), do: true
def can?(_, :filter), do: true
def can?(_, :limit), do: true
def can?(_, :offset), do: true
2020-10-29 15:26:45 +13:00
def can?(_, :multitenancy), do: true
def can?(_, {:filter_expr, _}), do: true
def can?(_, :nested_expressions), do: true
2020-10-18 12:13:51 +13:00
def can?(_, {:query_aggregate, :count}), do: true
2020-08-19 17:18:52 +12:00
def can?(_, :sort), do: true
2020-07-23 17:13:47 +12:00
def can?(_, {:sort, _}), do: true
2020-08-17 18:46:59 +12:00
def can?(_, _), do: false
@impl true
def in_transaction?(resource) do
repo(resource).in_transaction?()
end
@impl true
def limit(query, nil, _), do: {:ok, query}
def limit(query, limit, _resource) do
{:ok, from(row in query, limit: ^limit)}
end
2020-07-08 12:01:01 +12:00
@impl true
def source(resource) do
table(resource)
end
@impl true
def offset(query, nil, _), do: query
2020-09-20 10:08:09 +12:00
def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do
{:ok, query}
end
def offset(query, offset, _resource) do
{:ok, from(row in query, offset: ^offset)}
end
@impl true
def run_query(query, resource) do
2020-10-29 15:26:45 +13:00
{:ok, repo(resource).all(query, repo_opts(query))}
end
defp repo_opts(%Ash.Changeset{tenant: tenant, resource: resource}) do
repo_opts(%{tenant: tenant, resource: resource})
end
2020-10-29 15:26:45 +13:00
defp repo_opts(%{tenant: tenant, resource: resource}) when not is_nil(tenant) do
if Ash.Resource.multitenancy_strategy(resource) == :context do
[prefix: tenant]
else
[]
end
end
defp repo_opts(_), do: []
@impl true
def functions(resource) do
config = repo(resource).config()
functions = [AshPostgres.Functions.Type, AshPostgres.Functions.Fragment]
if "pg_trgm" in (config[:installed_extensions] || []) do
functions ++
[
AshPostgres.Functions.TrigramSimilarity
]
else
functions
end
end
2020-10-18 12:13:51 +13:00
@impl true
def run_aggregate_query(query, aggregates, resource) do
subquery = from(row in subquery(query), select: %{})
query =
Enum.reduce(
aggregates,
subquery,
&add_subquery_aggregate_select(&2, &1, resource)
)
2020-10-29 15:26:45 +13:00
{:ok, repo(resource).one(query, repo_opts(query))}
end
@impl true
def set_tenant(_resource, query, tenant) do
{:ok, Ecto.Query.put_query_prefix(query, to_string(tenant))}
2020-10-18 12:13:51 +13:00
end
@impl true
def run_aggregate_query_with_lateral_join(
query,
aggregates,
root_data,
source_resource,
destination_resource,
source_field,
destination_field
) do
lateral_join_query =
lateral_join_query(
query,
root_data,
source_resource,
source_field,
destination_field
)
subquery = from(row in subquery(lateral_join_query), select: %{})
query =
Enum.reduce(
aggregates,
subquery,
&add_subquery_aggregate_select(&2, &1, destination_resource)
)
2020-10-29 15:26:45 +13:00
{:ok, repo(source_resource).one(query, repo_opts(:query))}
2020-10-18 12:13:51 +13:00
end
2020-08-26 16:28:55 +12:00
@impl true
def run_query_with_lateral_join(
query,
root_data,
source_resource,
_destination_resource,
source_field,
destination_field
) do
2020-10-18 12:13:51 +13:00
query =
lateral_join_query(
query,
root_data,
source_resource,
source_field,
destination_field
)
2020-10-29 15:26:45 +13:00
{:ok, repo(source_resource).all(query, repo_opts(query))}
2020-10-18 12:13:51 +13:00
end
defp lateral_join_query(
query,
root_data,
source_resource,
source_field,
destination_field
) do
2020-08-26 16:28:55 +12:00
source_values = Enum.map(root_data, &Map.get(&1, source_field))
subquery =
subquery(
from(destination in query,
where:
field(destination, ^destination_field) ==
field(parent_as(:source_record), ^source_field)
)
)
source_resource
|> Ash.Query.new()
|> Ash.Query.data_layer_query()
|> case do
{:ok, data_layer_query} ->
from(source in data_layer_query,
as: :source_record,
where: field(source, ^source_field) in ^source_values,
inner_lateral_join: destination in ^subquery,
on: field(source, ^source_field) == field(destination, ^destination_field),
select: destination
)
{:error, error} ->
{:error, error}
end
2020-08-26 16:28:55 +12:00
end
@impl true
def resource_to_query(resource, _),
do: Ecto.Queryable.to_query({table(resource), resource})
@impl true
def create(resource, changeset) do
2020-07-13 16:41:38 +12:00
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource)))
|> ecto_changeset(changeset)
2020-10-29 15:26:45 +13:00
|> repo(resource).insert(repo_opts(changeset))
|> handle_errors()
2020-10-29 15:26:45 +13:00
|> case do
{:ok, result} ->
case maybe_create_tenant(resource, result) do
:ok ->
{:ok, result}
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
rescue
e ->
{:error, e}
end
2020-10-29 15:26:45 +13:00
defp maybe_create_tenant(resource, result) do
if AshPostgres.manage_tenant_create?(resource) do
tenant_name = tenant_name(resource, result)
AshPostgres.MultiTenancy.create_tenant(tenant_name, repo(resource))
else
:ok
end
end
defp maybe_update_tenant(resource, changeset, result) do
if AshPostgres.manage_tenant_update?(resource) do
changing_tenant_name? =
resource
|> AshPostgres.manage_tenant_template()
|> Enum.filter(&is_atom/1)
|> Enum.any?(&Ash.Changeset.changing_attribute?(changeset, &1))
if changing_tenant_name? do
old_tenant_name = tenant_name(resource, changeset.data)
new_tenant_name = tenant_name(resource, result)
AshPostgres.MultiTenancy.rename_tenant(repo(resource), old_tenant_name, new_tenant_name)
end
end
:ok
end
defp tenant_name(resource, result) do
resource
|> AshPostgres.manage_tenant_template()
|> Enum.map_join(fn item ->
if is_binary(item) do
item
else
result
|> Map.get(item)
|> to_string()
end
end)
end
defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
{:error, Enum.map(errors, &to_ash_error/1)}
end
defp handle_errors({:ok, val}), do: {:ok, val}
defp to_ash_error({field, {message, vars}}) do
Ash.Error.Changes.InvalidAttribute.exception(field: field, message: message, vars: vars)
end
2020-07-13 16:41:38 +12:00
defp ecto_changeset(record, changeset) do
record
|> Ecto.Changeset.change(changeset.attributes)
|> add_unique_indexes(record.__struct__)
end
defp add_unique_indexes(changeset, resource) do
changeset =
resource
|> Ash.Resource.identities()
|> Enum.reduce(changeset, fn identity, changeset ->
name = "#{table(resource)}_#{identity.name}_unique_index"
Ecto.Changeset.unique_constraint(changeset, identity.keys, name: name)
end)
names =
resource
|> AshPostgres.unique_index_names()
|> case do
{m, f, a} -> apply(m, f, [changeset | a])
value -> List.wrap(value)
end
Enum.reduce(names, changeset, fn {keys, name}, changeset ->
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name)
end)
2020-07-13 16:41:38 +12:00
end
@impl true
def upsert(resource, changeset) do
2020-10-29 15:26:45 +13:00
repo_opts =
changeset
|> repo_opts()
|> Keyword.put(:on_conflict, {:replace, Map.keys(changeset.attributes)})
|> Keyword.put(:conflict_target, Ash.Resource.primary_key(resource))
if AshPostgres.manage_tenant_update?(resource) do
{:error, "Cannot currently upsert a resource that owns a tenant"}
else
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource)))
|> ecto_changeset(changeset)
|> repo(resource).insert(repo_opts)
|> handle_errors()
2020-10-29 15:26:45 +13:00
end
rescue
e ->
{:error, e}
end
@impl true
def update(resource, changeset) do
2020-07-13 16:41:38 +12:00
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource)))
|> ecto_changeset(changeset)
2020-10-29 15:26:45 +13:00
|> repo(resource).update(repo_opts(changeset))
|> handle_errors()
2020-10-29 15:26:45 +13:00
|> case do
{:ok, result} ->
maybe_update_tenant(resource, changeset, result)
{:ok, result}
{:error, error} ->
{:error, error}
end
rescue
e ->
{:error, e}
end
@impl true
2020-10-29 15:26:45 +13:00
def destroy(resource, %{data: record} = changeset) do
case repo(resource).delete(record, repo_opts(changeset)) do
{:ok, _record} ->
:ok
{:error, error} ->
handle_errors({:error, error})
end
rescue
e ->
{:error, e}
end
@impl true
def sort(query, sort, resource) do
query = default_bindings(query, resource)
2020-07-23 17:13:47 +12:00
2020-08-26 16:28:55 +12:00
sort
|> sanitize_sort()
2020-10-18 12:13:51 +13:00
|> Enum.reduce({:ok, query}, fn {order, sort}, {:ok, query} ->
2020-08-26 16:28:55 +12:00
binding =
case Map.fetch(query.__ash_bindings__.aggregates, sort) do
{:ok, binding} ->
binding
:error ->
0
end
2020-10-18 12:13:51 +13:00
new_query =
Map.update!(query, :order_bys, fn order_bys ->
order_bys = order_bys || []
sort_expr = %Ecto.Query.QueryExpr{
expr: [
{order, {{:., [], [{:&, [], [binding]}, sort]}, [], []}}
]
}
order_bys ++ [sort_expr]
end)
{:ok, new_query}
2020-08-26 16:28:55 +12:00
end)
end
defp sanitize_sort(sort) do
sort
|> List.wrap()
|> Enum.map(fn
2020-12-29 13:26:04 +13:00
{sort, :asc_nils_last} -> {:asc_nulls_last, sort}
{sort, :asc_nils_first} -> {:asc_nulls_first, sort}
{sort, :desc_nils_last} -> {:desc_nulls_last, sort}
{sort, :desc_nils_first} -> {:desc_nulls_first, sort}
{sort, order} -> {order, sort}
sort -> sort
end)
end
@impl true
2020-06-19 15:04:41 +12:00
def filter(query, %{expression: false}, _resource) do
impossible_query = from(row in query, where: false)
{:ok, Map.put(impossible_query, :__impossible__, true)}
end
def filter(query, filter, _resource) do
2020-06-19 15:04:41 +12:00
relationship_paths =
filter
|> Filter.relationship_paths()
|> Enum.map(fn path ->
if can_inner_join?(path, filter) do
{:inner, relationship_path_to_relationships(filter.resource, path)}
else
{:left, relationship_path_to_relationships(filter.resource, path)}
end
2020-06-19 15:04:41 +12:00
end)
new_query =
query
|> join_all_relationships(relationship_paths)
|> add_filter_expression(filter)
2020-06-19 15:04:41 +12:00
{:ok, new_query}
end
defp default_bindings(query, resource) do
2020-07-23 17:13:47 +12:00
Map.put_new(query, :__ash_bindings__, %{
current: Enum.count(query.joins) + 1,
aggregates: %{},
bindings: %{0 => %{path: [], type: :root, source: resource}}
2020-07-23 17:13:47 +12:00
})
end
defp can_inner_join?(path, expr, seen_an_or? \\ false)
defp can_inner_join?(path, %{expression: expr}, seen_an_or?),
do: can_inner_join?(path, expr, seen_an_or?)
defp can_inner_join?(_path, expr, _seen_an_or?) when expr in [nil, true, false], do: true
defp can_inner_join?(path, %BooleanExpression{op: :and, left: left, right: right}, seen_an_or?) do
can_inner_join?(path, left, seen_an_or?) || can_inner_join?(path, right, seen_an_or?)
end
defp can_inner_join?(path, %BooleanExpression{op: :or, left: left, right: right}, _) do
can_inner_join?(path, left, true) && can_inner_join?(path, right, true)
end
defp can_inner_join?(
path,
%Not{expression: %BooleanExpression{op: :or, left: left, right: right}},
seen_an_or?
) do
can_inner_join?(
path,
%BooleanExpression{
op: :and,
left: %Not{expression: left},
right: %Not{expression: right}
},
seen_an_or?
)
end
defp can_inner_join?(path, %Not{expression: expression}, seen_an_or?) do
can_inner_join?(path, expression, seen_an_or?)
end
defp can_inner_join?(
search_path,
%{__operator__?: true, left: %Ref{relationship_path: relationship_path}},
seen_an_or?
)
when search_path == relationship_path do
not seen_an_or?
end
defp can_inner_join?(
search_path,
%{__function__?: true, arguments: arguments},
seen_an_or?
) do
if Enum.any?(arguments, &match?(%Ref{relationship_path: ^search_path}, &1)) do
not seen_an_or?
else
true
end
end
defp can_inner_join?(_, _, _), do: true
2020-07-23 17:13:47 +12:00
@impl true
def add_aggregate(query, aggregate, _resource) do
resource = aggregate.resource
query = default_bindings(query, resource)
2020-07-23 17:13:47 +12:00
{query, binding} =
case get_binding(resource, aggregate.relationship_path, query, :aggregate) do
nil ->
relationship = Ash.Resource.relationship(resource, aggregate.relationship_path)
subquery = aggregate_subquery(relationship, aggregate)
new_query =
join_all_relationships(
2020-07-23 17:13:47 +12:00
query,
[
{{:aggregate, aggregate.name, subquery},
relationship_path_to_relationships(resource, aggregate.relationship_path)}
]
2020-07-23 17:13:47 +12:00
)
{new_query, get_binding(resource, aggregate.relationship_path, new_query, :aggregate)}
binding ->
{query, binding}
end
query_with_aggregate_binding =
put_in(
query.__ash_bindings__.aggregates,
Map.put(query.__ash_bindings__.aggregates, aggregate.name, binding)
)
new_query =
query_with_aggregate_binding
|> add_aggregate_to_subquery(resource, aggregate, binding)
|> select_aggregate(resource, aggregate)
{:ok, new_query}
end
defp select_aggregate(query, resource, aggregate) do
binding = get_binding(resource, aggregate.relationship_path, query, :aggregate)
query =
if query.select do
query
else
from(row in query,
select: row,
select_merge: %{aggregates: %{}}
)
end
%{query | select: add_to_select(query.select, binding, aggregate)}
end
defp add_to_select(
%{expr: {:merge, _, [first, {:%{}, _, [{:aggregates, {:%{}, [], fields}}]}]}} = select,
binding,
2020-08-09 08:19:18 +12:00
%{load: nil} = aggregate
2020-07-23 17:13:47 +12:00
) do
2020-12-29 13:26:04 +13:00
accessed =
if aggregate.kind == :first do
{:fragment, [],
[
expr: {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []},
raw: "[1]"
]}
else
{{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []}
end
2020-07-23 17:13:47 +12:00
field =
{:type, [],
[
2020-12-29 13:26:04 +13:00
accessed,
2020-07-23 17:13:47 +12:00
Ash.Type.ecto_type(aggregate.type)
]}
field_with_default =
2020-12-29 13:26:04 +13:00
if is_nil(aggregate.default_value) do
field
else
2020-07-23 17:13:47 +12:00
{:coalesce, [],
[
field,
aggregate.default_value
]}
end
new_fields = [
{aggregate.name, field_with_default}
| fields
]
%{select | expr: {:merge, [], [first, {:%{}, [], [{:aggregates, {:%{}, [], new_fields}}]}]}}
end
2020-08-09 08:19:18 +12:00
defp add_to_select(
%{expr: expr} = select,
binding,
%{load: load_as} = aggregate
) do
2020-12-29 13:26:04 +13:00
accessed =
if aggregate.kind == :first do
{:fragment, [],
[
raw: "",
expr: {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []},
raw: "[1]"
]}
else
{{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []}
end
2020-08-09 08:19:18 +12:00
field =
{:type, [],
[
2020-12-29 13:26:04 +13:00
accessed,
2020-08-09 08:19:18 +12:00
Ash.Type.ecto_type(aggregate.type)
]}
field_with_default =
2020-12-29 13:26:04 +13:00
if is_nil(aggregate.default_value) do
field
else
2020-08-09 08:19:18 +12:00
{:coalesce, [],
[
field,
aggregate.default_value
]}
end
%{select | expr: {:merge, [], [expr, {:%{}, [], [{load_as, field_with_default}]}]}}
end
2020-07-23 17:13:47 +12:00
defp add_aggregate_to_subquery(query, resource, aggregate, binding) do
new_joins =
List.update_at(query.joins, binding - 1, fn join ->
aggregate_query =
if aggregate.authorization_filter do
{:ok, filter} =
filter(
join.source.from.source.query,
aggregate.authorization_filter,
Ash.Resource.related(resource, aggregate.relationship_path)
)
filter
else
join.source.from.source.query
end
new_aggregate_query = add_subquery_aggregate_select(aggregate_query, aggregate, resource)
put_in(join.source.from.source.query, new_aggregate_query)
end)
%{
query
| joins: new_joins
}
end
2020-10-29 15:26:45 +13:00
defp aggregate_subquery(relationship, aggregate) do
query =
from(row in relationship.destination,
group_by: ^relationship.destination_field,
select: field(row, ^relationship.destination_field)
)
if aggregate.query && aggregate.query.tenant do
Ecto.Query.put_query_prefix(query, aggregate.query.tenant)
else
query
end
2020-07-23 17:13:47 +12:00
end
2020-12-29 13:26:04 +13:00
defp order_to_postgres_order(dir) do
case dir do
:asc -> nil
:asc_nils_last -> " ASC NULLS LAST"
:asc_nils_first -> " ASC NULLS FIRST"
:desc -> " DESC"
:desc_nils_last -> " DESC NULLS LAST"
:desc_nils_first -> " DESC NULLS FIRST"
end
end
defp add_subquery_aggregate_select(query, %{kind: :first} = aggregate, _resource) do
query = default_bindings(query, aggregate.resource)
key = aggregate.field
type = Ash.Type.ecto_type(aggregate.type)
field =
if aggregate.query && aggregate.query.sort && aggregate.query.sort != [] do
sort_expr =
aggregate.query.sort
|> Enum.map(fn {sort, order} ->
case order_to_postgres_order(order) do
nil ->
[expr: {{:., [], [{:&, [], [0]}, sort]}, [], []}]
order ->
[expr: {{:., [], [{:&, [], [0]}, sort]}, [], []}, raw: order]
end
end)
|> Enum.intersperse(raw: ", ")
|> List.flatten()
{:fragment, [],
[
raw: "array_agg(",
expr: {{:., [], [{:&, [], [0]}, key]}, [], []},
raw: "ORDER BY "
] ++
sort_expr ++ [raw: ")"]}
else
{:fragment, [],
[
raw: "array_agg(",
expr: {{:., [], [{:&, [], [0]}, key]}, [], []},
raw: ")"
]}
end
{params, filtered} =
if aggregate.query && aggregate.query.filter &&
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
{params, expr} =
filter_to_expr(
aggregate.query.filter,
query.__ash_bindings__.bindings,
query.select.params
)
{params, {:filter, [], [field, expr]}}
else
{[], field}
end
cast = {:type, [], [filtered, {:array, type}]}
new_expr = {:merge, [], [query.select.expr, {:%{}, [], [{aggregate.name, cast}]}]}
%{query | select: %{query.select | expr: new_expr, params: params}}
end
2020-07-23 17:13:47 +12:00
defp add_subquery_aggregate_select(query, %{kind: :count} = aggregate, resource) do
query = default_bindings(query, aggregate.resource)
2020-12-29 13:26:04 +13:00
key = aggregate.field || List.first(Ash.Resource.primary_key(resource))
2020-07-23 17:13:47 +12:00
type = Ash.Type.ecto_type(aggregate.type)
2020-12-29 13:26:04 +13:00
field = {:count, [], [{{:., [], [{:&, [], [0]}, key]}, [], []}]}
2020-07-23 17:13:47 +12:00
{params, filtered} =
2020-12-29 13:26:04 +13:00
if aggregate.query && aggregate.query.filter &&
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
2020-07-23 17:13:47 +12:00
{params, expr} =
filter_to_expr(
aggregate.query.filter,
query.__ash_bindings__.bindings,
query.select.params
)
{params, {:filter, [], [field, expr]}}
else
{[], field}
end
cast = {:type, [], [filtered, type]}
new_expr = {:merge, [], [query.select.expr, {:%{}, [], [{aggregate.name, cast}]}]}
%{query | select: %{query.select | expr: new_expr, params: params}}
end
2020-06-19 15:04:41 +12:00
defp relationship_path_to_relationships(resource, path, acc \\ [])
defp relationship_path_to_relationships(_resource, [], acc), do: Enum.reverse(acc)
2020-06-19 15:04:41 +12:00
defp relationship_path_to_relationships(resource, [relationship | rest], acc) do
2020-07-15 17:42:12 +12:00
relationship = Ash.Resource.relationship(resource, relationship)
2020-06-19 15:04:41 +12:00
relationship_path_to_relationships(relationship.destination, rest, [relationship | acc])
end
defp join_all_relationships(query, relationship_paths, path \\ [], source \\ nil) do
query = default_bindings(query, source)
Enum.reduce(relationship_paths, query, fn
{_join_type, []}, query ->
query
{join_type, [relationship | rest_rels]}, query ->
source = source || relationship.source
current_path = path ++ [relationship]
2020-07-23 17:13:47 +12:00
current_join_type =
case join_type do
{:aggregate, _name, _agg} when rest_rels != [] ->
:left
2020-07-23 17:13:47 +12:00
other ->
other
end
if has_binding?(source, Enum.reverse(current_path), query, current_join_type) do
query
else
joined_query =
join_relationship(
query,
relationship,
Enum.map(path, & &1.name),
current_join_type,
source
)
joined_query_with_distinct = add_distinct(relationship, join_type, joined_query)
join_all_relationships(
joined_query_with_distinct,
[{join_type, rest_rels}],
current_path,
source
)
end
2020-07-23 17:13:47 +12:00
end)
end
defp has_binding?(resource, path, query, {:aggregate, _, _}),
do: has_binding?(resource, path, query, :aggregate)
defp has_binding?(resource, candidate_path, %{__ash_bindings__: _} = query, type) do
Enum.any?(query.__ash_bindings__.bindings, fn
{_, %{path: path, source: source, type: ^type}} ->
Ash.SatSolver.synonymous_relationship_paths?(resource, path, candidate_path, source)
2020-07-23 17:13:47 +12:00
_ ->
false
end)
2020-07-23 17:13:47 +12:00
end
defp has_binding?(_, _, _, _), do: false
defp get_binding(resource, path, %{__ash_bindings__: _} = query, type) do
paths =
Enum.flat_map(query.__ash_bindings__.bindings, fn
{binding, %{path: path, type: ^type}} ->
[{binding, path}]
_ ->
[]
end)
Enum.find_value(paths, fn {binding, candidate_path} ->
Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) && binding
end)
end
2020-07-23 17:13:47 +12:00
defp get_binding(_, _, _, _), do: nil
defp add_distinct(relationship, join_type, joined_query) do
if relationship.cardinality == :many and join_type == :left && !joined_query.distinct do
from(row in joined_query,
2020-07-15 17:42:12 +12:00
distinct: ^Ash.Resource.primary_key(relationship.destination)
)
else
joined_query
end
end
defp join_relationship(query, relationship, path, join_type, source) do
case Map.get(query.__ash_bindings__.bindings, path) do
%{type: existing_join_type} when join_type != existing_join_type ->
raise "unreachable?"
nil ->
do_join_relationship(query, relationship, path, join_type, source)
_ ->
query
end
end
defp do_join_relationship(query, %{type: :many_to_many} = relationship, path, kind, source) do
relationship_through = maybe_get_resource_query(relationship.through)
relationship_destination =
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
current_binding =
Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} ->
if data.type == kind && data.path == Enum.reverse(path) do
binding
end
end)
new_query =
case kind do
{:aggregate, _, subquery} ->
2020-07-23 17:13:47 +12:00
subquery =
subquery(
from(destination in subquery,
where:
field(destination, ^relationship.destination_field) ==
field(
parent_as(:rel_through),
^relationship.destination_field_on_join_table
)
)
)
from([{row, current_binding}] in query,
2020-07-23 17:13:47 +12:00
left_join: through in ^relationship_through,
as: :rel_through,
on:
field(row, ^relationship.source_field) ==
field(through, ^relationship.source_field_on_join_table),
left_lateral_join: destination in ^subquery,
on:
field(destination, ^relationship.destination_field) ==
field(through, ^relationship.destination_field_on_join_table)
)
:inner ->
from([{row, current_binding}] in query,
join: through in ^relationship_through,
on:
field(row, ^relationship.source_field) ==
field(through, ^relationship.source_field_on_join_table),
join: destination in ^relationship_destination,
on:
field(destination, ^relationship.destination_field) ==
field(through, ^relationship.destination_field_on_join_table)
)
_ ->
from([{row, current_binding}] in query,
2020-07-23 17:13:47 +12:00
left_join: through in ^relationship_through,
on:
field(row, ^relationship.source_field) ==
field(through, ^relationship.source_field_on_join_table),
left_join: destination in ^relationship_destination,
on:
field(destination, ^relationship.destination_field) ==
field(through, ^relationship.destination_field_on_join_table)
)
end
join_path =
Enum.reverse([String.to_existing_atom(to_string(relationship.name) <> "_join_assoc") | path])
full_path = Enum.reverse([relationship.name | path])
2020-07-23 17:13:47 +12:00
binding_data =
case kind do
{:aggregate, name, _agg} ->
%{type: :aggregate, name: name, path: full_path, source: source}
_ ->
%{type: kind, path: full_path, source: source}
2020-07-23 17:13:47 +12:00
end
new_query
|> add_binding(%{path: join_path, type: :left, source: source})
2020-07-23 17:13:47 +12:00
|> add_binding(binding_data)
end
defp do_join_relationship(query, relationship, path, kind, source) do
relationship_destination =
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
current_binding =
Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} ->
if data.type == kind && data.path == Enum.reverse(path) do
binding
end
end)
new_query =
case kind do
{:aggregate, _, subquery} ->
2020-07-23 17:13:47 +12:00
subquery =
from(
sub in subquery(
from(destination in subquery,
where:
field(destination, ^relationship.destination_field) ==
field(parent_as(:rel_source), ^relationship.source_field)
)
),
select: field(sub, ^relationship.destination_field)
)
from([{row, current_binding}] in query,
2020-07-23 17:13:47 +12:00
as: :rel_source,
left_lateral_join: destination in ^subquery,
on:
field(row, ^relationship.source_field) ==
field(destination, ^relationship.destination_field)
)
:inner ->
from([{row, current_binding}] in query,
join: destination in ^relationship_destination,
on:
field(row, ^relationship.source_field) ==
field(destination, ^relationship.destination_field)
)
_ ->
from([{row, current_binding}] in query,
2020-07-23 17:13:47 +12:00
left_join: destination in ^relationship_destination,
on:
field(row, ^relationship.source_field) ==
field(destination, ^relationship.destination_field)
)
end
full_path = Enum.reverse([relationship.name | path])
2020-07-23 17:13:47 +12:00
binding_data =
case kind do
{:aggregate, name, _agg} ->
%{type: :aggregate, name: name, path: full_path, source: source}
_ ->
%{type: kind, path: full_path, source: source}
2020-07-23 17:13:47 +12:00
end
new_query
2020-07-23 17:13:47 +12:00
|> add_binding(binding_data)
end
defp add_filter_expression(query, filter) do
2020-09-20 10:08:09 +12:00
wheres =
filter
|> split_and_statements()
|> Enum.map(fn filter ->
{params, expr} = filter_to_expr(filter, query.__ash_bindings__.bindings, [])
%Ecto.Query.BooleanExpr{
expr: expr,
op: :and,
params: params
}
end)
2020-09-20 10:08:09 +12:00
%{query | wheres: query.wheres ++ wheres}
end
defp split_and_statements(%Filter{expression: expression}) do
split_and_statements(expression)
end
defp split_and_statements(%BooleanExpression{op: :and, left: left, right: right}) do
split_and_statements(left) ++ split_and_statements(right)
end
defp split_and_statements(%Not{expression: %Not{expression: expression}}) do
split_and_statements(expression)
end
defp split_and_statements(%Not{
expression: %BooleanExpression{op: :or, left: left, right: right}
}) do
split_and_statements(%BooleanExpression{
op: :and,
left: %Not{expression: left},
right: %Not{expression: right}
})
end
defp split_and_statements(other), do: [other]
defp filter_to_expr(expr, bindings, params, embedded? \\ false, type \\ nil)
defp filter_to_expr(%Filter{expression: expression}, bindings, params, embedded?, type) do
filter_to_expr(expression, bindings, params, embedded?, type)
end
2020-06-19 15:04:41 +12:00
# A nil filter means "everything"
defp filter_to_expr(nil, _, _, _, _), do: {[], true}
2020-06-19 15:04:41 +12:00
# A true filter means "everything"
defp filter_to_expr(true, _, _, _, _), do: {[], true}
2020-06-19 15:04:41 +12:00
# A false filter means "nothing"
defp filter_to_expr(false, _, _, _, _), do: {[], false}
2020-06-19 15:04:41 +12:00
defp filter_to_expr(
%BooleanExpression{op: op, left: left, right: right},
bindings,
params,
embedded?,
_type
) do
{params, left_expr} = filter_to_expr(left, bindings, params, embedded?)
{params, right_expr} = filter_to_expr(right, bindings, params, embedded?)
2020-06-19 15:04:41 +12:00
{params, {op, [], [left_expr, right_expr]}}
end
defp filter_to_expr(%Not{expression: expression}, bindings, params, embedded?, _type) do
{params, new_expression} = filter_to_expr(expression, bindings, params, embedded?)
2020-06-19 15:04:41 +12:00
{params, {:not, [], [new_expression]}}
end
defp filter_to_expr(
%TrigramSimilarity{arguments: [arg1, arg2], embedded?: pred_embedded?},
bindings,
params,
embedded?,
_type
) do
{params, arg1} = filter_to_expr(arg1, bindings, params, pred_embedded? || embedded?)
{params, arg2} = filter_to_expr(arg2, bindings, params, pred_embedded? || embedded?)
{params, {:fragment, [], [raw: "similarity(", expr: arg1, raw: ", ", expr: arg2, raw: ")"]}}
end
2020-09-29 01:10:13 +13:00
defp filter_to_expr(
%Type{arguments: [arg1, arg2], embedded?: pred_embedded?},
2020-09-29 01:10:13 +13:00
bindings,
params,
embedded?,
_type
2020-09-29 01:10:13 +13:00
) do
{params, arg1} = filter_to_expr(arg1, bindings, params, pred_embedded? || embedded?)
{params, arg2} = filter_to_expr(arg2, bindings, params, pred_embedded? || embedded?)
{params, {:type, [], [arg1, arg2]}}
2020-09-29 01:10:13 +13:00
end
defp filter_to_expr(
%Fragment{arguments: arguments, embedded?: pred_embedded?},
bindings,
params,
embedded?,
_type
2020-09-20 10:08:09 +12:00
) do
{params, fragment_data} =
Enum.reduce(arguments, {params, []}, fn
{:raw, str}, {params, fragment_data} ->
{params, fragment_data ++ [{:raw, str}]}
{:expr, expr}, {params, fragment_data} ->
{params, expr} = filter_to_expr(expr, bindings, params, pred_embedded? || embedded?)
{params, fragment_data ++ [{:expr, expr}]}
end)
{params, {:fragment, [], fragment_data}}
2020-06-29 15:47:07 +12:00
end
defp filter_to_expr(
%IsNil{left: left, right: right, embedded?: pred_embedded?},
bindings,
params,
embedded?,
_type
2020-09-20 10:08:09 +12:00
) do
{params, left_expr} = filter_to_expr(left, bindings, params, pred_embedded? || embedded?)
{params, right_expr} = filter_to_expr(right, bindings, params, pred_embedded? || embedded?)
{params,
{:==, [],
[
{:is_nil, [], [left_expr]},
right_expr
]}}
end
defp filter_to_expr(
%Ago{arguments: [left, right], embedded?: _pred_embedded?},
_bindings,
params,
_embedded?,
_type
)
when is_integer(left) and (is_binary(right) or is_atom(right)) do
{params ++ [{DateTime.utc_now(), {:param, :any_datetime}}],
{:datetime_add, [], [{:^, [], [Enum.count(params)]}, left, to_string(right)]}}
end
2021-01-24 16:45:15 +13:00
defp filter_to_expr(
%Contains{arguments: [left, %Ash.CiString{} = right], embedded?: pred_embedded?},
bindings,
params,
embedded?,
type
) do
filter_to_expr(
%Fragment{
embedded?: pred_embedded?,
arguments: [
raw: "strpos(",
expr: left,
raw: "::citext, ",
expr: right,
raw: ") > 0"
]
},
bindings,
params,
embedded?,
type
)
end
defp filter_to_expr(
%Contains{arguments: [left, right], embedded?: pred_embedded?},
bindings,
params,
embedded?,
type
) do
filter_to_expr(
%Fragment{
embedded?: pred_embedded?,
arguments: [
raw: "strpos(",
expr: left,
raw: ", ",
expr: right,
raw: ") > 0"
]
},
bindings,
params,
embedded?,
type
)
end
defp filter_to_expr(
%mod{
__predicate__?: _,
left: left,
right: right,
embedded?: pred_embedded?,
operator: op
},
bindings,
params,
embedded?,
_type
2020-09-20 10:08:09 +12:00
) do
{left_type, right_type} =
case determine_type(mod, left) do
nil ->
case determine_type(mod, right) do
nil ->
{nil, nil}
2020-09-20 10:08:09 +12:00
left_type ->
{left_type, nil}
end
right_type ->
if vague?(right_type) do
case determine_type(mod, right) do
nil ->
{nil, right_type}
left_type ->
{left_type, nil}
end
else
{nil, right_type}
end
end
{params, left_expr} =
filter_to_expr(left, bindings, params, pred_embedded? || embedded?, left_type)
{params, right_expr} =
filter_to_expr(right, bindings, params, pred_embedded? || embedded?, right_type)
{params,
{op, [],
[
left_expr,
right_expr
]}}
end
defp filter_to_expr(
%Ref{attribute: %{name: name}} = ref,
bindings,
params,
_embedded?,
_type
) do
{params, {{:., [], [{:&, [], [ref_binding(ref, bindings)]}, name]}, [], []}}
2020-06-29 15:47:07 +12:00
end
defp filter_to_expr({:embed, other}, _bindings, params, _true, _type) do
{params, other}
2020-09-20 10:08:09 +12:00
end
2021-01-24 16:45:15 +13:00
defp filter_to_expr(%Ash.CiString{string: string}, bindings, params, embedded?, type) do
filter_to_expr(
%Fragment{
embedded?: embedded?,
arguments: [
raw: "",
expr: string,
raw: "::citext"
]
},
bindings,
params,
embedded?,
type
)
end
defp filter_to_expr(other, _bindings, params, true, _type) do
{params, other}
2020-09-24 03:20:26 +12:00
end
defp filter_to_expr(value, _bindings, params, false, type) do
type = type || :any
value = last_ditch_cast(value, type)
{params ++ [{value, type}], {:^, [], [Enum.count(params)]}}
2020-09-24 03:20:26 +12:00
end
defp last_ditch_cast(value, :string) when is_atom(value) do
to_string(value)
2020-09-20 10:08:09 +12:00
end
defp last_ditch_cast(value, _type) do
value
2020-09-20 10:08:09 +12:00
end
defp determine_type(mod, %Ref{attribute: %{type: type}}) do
mod.types
|> Enum.flat_map(fn
:any ->
[]
:same ->
[type]
{:array, :any} ->
[{:in, :any}]
{:array, :same} ->
[{:in, type}]
{:array, type} ->
[{:in, type}]
type ->
[type]
end)
|> Enum.sort_by(&vague?/1)
|> Enum.at(0)
|> case do
nil ->
nil
{:in, :any} ->
{:in, :any}
{:in, type} ->
if Ash.Type.ash_type?(type) do
Ash.Type.storage_type(type)
else
type
end
type ->
if Ash.Type.ash_type?(type) do
Ash.Type.storage_type(type)
else
type
end
end
end
defp determine_type(_mod, _), do: nil
defp vague?({:in, :any}), do: true
defp vague?(:any), do: true
defp vague?(_), do: false
defp ref_binding(ref, bindings) do
case ref.attribute do
%Ash.Resource.Attribute{} ->
Enum.find_value(bindings, fn {binding, data} ->
data.path == ref.relationship_path && data.type in [:inner, :left, :root] && binding
end)
%Ash.Query.Aggregate{} = aggregate ->
Enum.find_value(bindings, fn {binding, data} ->
data.path == aggregate.relationship_path && data.type == :aggregate && binding
end)
2020-09-20 10:08:09 +12:00
end
end
2020-07-23 17:13:47 +12:00
defp add_binding(query, data) do
current = query.__ash_bindings__.current
bindings = query.__ash_bindings__.bindings
new_ash_bindings = %{
query.__ash_bindings__
2020-07-23 17:13:47 +12:00
| bindings: Map.put(bindings, current, data),
current: current + 1
}
%{query | __ash_bindings__: new_ash_bindings}
end
@impl true
2020-07-08 12:01:01 +12:00
def transaction(resource, func) do
repo(resource).transaction(func)
end
@impl true
2020-07-08 12:01:01 +12:00
def rollback(resource, term) do
repo(resource).rollback(term)
end
defp maybe_get_resource_query(resource) do
2020-09-20 10:08:09 +12:00
case Ash.Query.data_layer_query(Ash.Query.new(resource), only_validate_filter?: false) do
{:ok, query} -> query
{:error, error} -> {:error, error}
end
end
end