mirror of
https://github.com/ash-project/ash_postgres.git
synced 2024-09-20 21:43:12 +12:00
1685 lines
46 KiB
Elixir
1685 lines
46 KiB
Elixir
defmodule AshPostgres.DataLayer do
|
|
@manage_tenant %Ash.Dsl.Section{
|
|
name: :manage_tenant,
|
|
describe: """
|
|
Configuration for the behavior of a resource that manages a tenant
|
|
""",
|
|
examples: [
|
|
"""
|
|
manage_tenant do
|
|
template ["organization_", :id]
|
|
create? true
|
|
update? false
|
|
end
|
|
"""
|
|
],
|
|
schema: [
|
|
template: [
|
|
type: {:custom, __MODULE__, :tenant_template, []},
|
|
required: true,
|
|
doc: """
|
|
A template that will cause the resource to create/manage the specified schema.
|
|
|
|
Use this if you have a resource that, when created, it should create a new tenant
|
|
for you. For example, if you have a `customer` resource, and you want to create
|
|
a schema for each customer based on their id, e.g `customer_10` set this option
|
|
to `["customer_", :id]`. Then, when this is created, it will create a schema called
|
|
`["customer_", :id]`, and run your tenant migrations on it. Then, if you were to change
|
|
that customer's id to `20`, it would rename the schema to `customer_20`. Generally speaking
|
|
you should avoid changing the tenant id.
|
|
"""
|
|
],
|
|
create?: [
|
|
type: :boolean,
|
|
default: true,
|
|
doc: "Whether or not to automatically create a tenant when a record is created"
|
|
],
|
|
update?: [
|
|
type: :boolean,
|
|
default: true,
|
|
doc: "Whether or not to automatically update the tenant name if the record is udpated"
|
|
]
|
|
]
|
|
}
|
|
|
|
@postgres %Ash.Dsl.Section{
|
|
name: :postgres,
|
|
describe: """
|
|
Postgres data layer configuration
|
|
""",
|
|
sections: [
|
|
@manage_tenant
|
|
],
|
|
modules: [
|
|
:repo
|
|
],
|
|
examples: [
|
|
"""
|
|
postgres do
|
|
repo MyApp.Repo
|
|
table "organizations"
|
|
end
|
|
"""
|
|
],
|
|
schema: [
|
|
repo: [
|
|
type: :atom,
|
|
required: true,
|
|
doc:
|
|
"The repo that will be used to fetch your data. See the `AshPostgres.Repo` documentation for more"
|
|
],
|
|
migrate?: [
|
|
type: :boolean,
|
|
default: true,
|
|
doc:
|
|
"Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`"
|
|
],
|
|
base_filter_sql: [
|
|
type: :string,
|
|
doc:
|
|
"A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter"
|
|
],
|
|
skip_unique_indexes: [
|
|
type: {:custom, __MODULE__, :validate_skip_unique_indexes, []},
|
|
default: false,
|
|
doc: "Skip generating unique indexes when generating migrations"
|
|
],
|
|
unique_index_names: [
|
|
type: :any,
|
|
default: [],
|
|
doc: """
|
|
A list of unique index names that could raise errors, or an mfa to a function that takes a changeset
|
|
and returns a list of names in the format `{[:affected, :keys], "name_of_constraint"}`
|
|
"""
|
|
],
|
|
table: [
|
|
type: :string,
|
|
doc:
|
|
"The table to store and read the resource from. Required unless `polymorphic?` is true."
|
|
],
|
|
polymorphic?: [
|
|
type: :boolean,
|
|
default: false,
|
|
doc: """
|
|
Declares this resource as polymorphic.
|
|
|
|
Polymorphic resources cannot be read or updated unless the table is provided in the query/changeset context.
|
|
|
|
For example:
|
|
|
|
PolymorphicResource
|
|
|> Ash.Query.set_context(%{data_layer: %{table: "table"}})
|
|
|> MyApi.read!()
|
|
|
|
When relating to polymorphic resources, you'll need to use the `context` option on relationships,
|
|
e.g
|
|
|
|
belongs_to :polymorphic_association, PolymorphicResource,
|
|
context: %{data_layer: %{table: "table"}}
|
|
"""
|
|
]
|
|
]
|
|
}
|
|
|
|
alias Ash.Filter
|
|
alias Ash.Query.{BooleanExpression, Not, Ref}
|
|
|
|
alias Ash.Query.Function.{Ago, Contains}
|
|
alias Ash.Query.Operator.IsNil
|
|
|
|
alias AshPostgres.Functions.{Fragment, TrigramSimilarity, Type}
|
|
|
|
import AshPostgres, only: [repo: 1]
|
|
|
|
@behaviour Ash.DataLayer
|
|
|
|
@sections [@postgres]
|
|
|
|
@moduledoc """
|
|
A postgres data layer that levereges Ecto's postgres capabilities.
|
|
|
|
# Table of Contents
|
|
#{Ash.Dsl.Extension.doc_index(@sections)}
|
|
|
|
#{Ash.Dsl.Extension.doc(@sections)}
|
|
"""
|
|
|
|
use Ash.Dsl.Extension,
|
|
sections: @sections,
|
|
transformers: [
|
|
AshPostgres.Transformers.VerifyRepo,
|
|
AshPostgres.Transformers.EnsureTableOrPolymorphic
|
|
]
|
|
|
|
@doc false
|
|
def tenant_template(value) do
|
|
value = List.wrap(value)
|
|
|
|
if Enum.all?(value, &(is_binary(&1) || is_atom(&1))) do
|
|
{:ok, value}
|
|
else
|
|
{:error, "Expected all values for `manages_tenant` to be strings or atoms"}
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
def validate_skip_unique_indexes(indexes) do
|
|
indexes = List.wrap(indexes)
|
|
|
|
if Enum.all?(indexes, &is_atom/1) do
|
|
{:ok, indexes}
|
|
else
|
|
{:error, "All indexes to skip must be atoms"}
|
|
end
|
|
end
|
|
|
|
import Ecto.Query, only: [from: 2, subquery: 1]
|
|
|
|
@impl true
|
|
def can?(_, :async_engine), do: true
|
|
def can?(_, :transact), do: true
|
|
def can?(_, :composite_primary_key), do: true
|
|
def can?(_, :upsert), do: true
|
|
|
|
def can?(resource, {:join, other_resource}) do
|
|
data_layer = Ash.Resource.data_layer(resource)
|
|
other_data_layer = Ash.Resource.data_layer(other_resource)
|
|
data_layer == other_data_layer and repo(data_layer) == repo(other_data_layer)
|
|
end
|
|
|
|
def can?(resource, {:lateral_join, other_resource}) do
|
|
data_layer = Ash.Resource.data_layer(resource)
|
|
other_data_layer = Ash.Resource.data_layer(other_resource)
|
|
data_layer == other_data_layer and repo(data_layer) == repo(other_data_layer)
|
|
end
|
|
|
|
def can?(_, :boolean_filter), do: true
|
|
def can?(_, {:aggregate, :count}), do: true
|
|
def can?(_, :aggregate_filter), do: true
|
|
def can?(_, :aggregate_sort), do: true
|
|
def can?(_, :create), do: true
|
|
def can?(_, :read), do: true
|
|
def can?(_, :update), do: true
|
|
def can?(_, :destroy), do: true
|
|
def can?(_, :filter), do: true
|
|
def can?(_, :limit), do: true
|
|
def can?(_, :offset), do: true
|
|
def can?(_, :multitenancy), do: true
|
|
def can?(_, {:filter_expr, _}), do: true
|
|
def can?(_, :nested_expressions), do: true
|
|
def can?(_, {:query_aggregate, :count}), do: true
|
|
def can?(_, :sort), do: true
|
|
def can?(_, {:sort, _}), do: true
|
|
def can?(_, _), do: false
|
|
|
|
@impl true
|
|
def in_transaction?(resource) do
|
|
repo(resource).in_transaction?()
|
|
end
|
|
|
|
@impl true
|
|
def limit(query, nil, _), do: {:ok, query}
|
|
|
|
def limit(query, limit, _resource) do
|
|
{:ok, from(row in query, limit: ^limit)}
|
|
end
|
|
|
|
@impl true
|
|
def source(resource) do
|
|
AshPostgres.table(resource) || ""
|
|
end
|
|
|
|
@impl true
|
|
def set_context(resource, data_layer_query, context) do
|
|
if context[:data_layer][:table] do
|
|
{:ok,
|
|
%{
|
|
data_layer_query
|
|
| from: %{data_layer_query.from | source: {context[:data_layer][:table], resource}}
|
|
}}
|
|
else
|
|
{:ok, data_layer_query}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def offset(query, nil, _), do: query
|
|
|
|
def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do
|
|
{:ok, query}
|
|
end
|
|
|
|
def offset(query, offset, _resource) do
|
|
{:ok, from(row in query, offset: ^offset)}
|
|
end
|
|
|
|
@impl true
|
|
def run_query(query, resource) do
|
|
{:ok, repo(resource).all(query, repo_opts(query))}
|
|
end
|
|
|
|
defp repo_opts(%Ash.Changeset{tenant: tenant, resource: resource}) do
|
|
repo_opts(%{tenant: tenant, resource: resource})
|
|
end
|
|
|
|
defp repo_opts(%{tenant: tenant, resource: resource}) when not is_nil(tenant) do
|
|
if Ash.Resource.multitenancy_strategy(resource) == :context do
|
|
[prefix: tenant]
|
|
else
|
|
[]
|
|
end
|
|
end
|
|
|
|
defp repo_opts(_), do: []
|
|
|
|
@impl true
|
|
def functions(resource) do
|
|
config = repo(resource).config()
|
|
|
|
functions = [AshPostgres.Functions.Type, AshPostgres.Functions.Fragment]
|
|
|
|
if "pg_trgm" in (config[:installed_extensions] || []) do
|
|
functions ++
|
|
[
|
|
AshPostgres.Functions.TrigramSimilarity
|
|
]
|
|
else
|
|
functions
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def run_aggregate_query(query, aggregates, resource) do
|
|
subquery = from(row in subquery(query), select: %{})
|
|
|
|
query =
|
|
Enum.reduce(
|
|
aggregates,
|
|
subquery,
|
|
&add_subquery_aggregate_select(&2, &1, resource)
|
|
)
|
|
|
|
{:ok, repo(resource).one(query, repo_opts(query))}
|
|
end
|
|
|
|
@impl true
|
|
def set_tenant(_resource, query, tenant) do
|
|
{:ok, Ecto.Query.put_query_prefix(query, to_string(tenant))}
|
|
end
|
|
|
|
@impl true
|
|
def run_aggregate_query_with_lateral_join(
|
|
query,
|
|
aggregates,
|
|
root_data,
|
|
source_resource,
|
|
destination_resource,
|
|
source_field,
|
|
destination_field
|
|
) do
|
|
lateral_join_query =
|
|
lateral_join_query(
|
|
query,
|
|
root_data,
|
|
source_resource,
|
|
source_field,
|
|
destination_field
|
|
)
|
|
|
|
subquery = from(row in subquery(lateral_join_query), select: %{})
|
|
|
|
query =
|
|
Enum.reduce(
|
|
aggregates,
|
|
subquery,
|
|
&add_subquery_aggregate_select(&2, &1, destination_resource)
|
|
)
|
|
|
|
{:ok, repo(source_resource).one(query, repo_opts(:query))}
|
|
end
|
|
|
|
@impl true
|
|
def run_query_with_lateral_join(
|
|
query,
|
|
root_data,
|
|
source_resource,
|
|
_destination_resource,
|
|
source_field,
|
|
destination_field
|
|
) do
|
|
query =
|
|
lateral_join_query(
|
|
query,
|
|
root_data,
|
|
source_resource,
|
|
source_field,
|
|
destination_field
|
|
)
|
|
|
|
{:ok, repo(source_resource).all(query, repo_opts(query))}
|
|
end
|
|
|
|
defp lateral_join_query(
|
|
query,
|
|
root_data,
|
|
source_resource,
|
|
source_field,
|
|
destination_field
|
|
) do
|
|
source_values = Enum.map(root_data, &Map.get(&1, source_field))
|
|
|
|
subquery =
|
|
subquery(
|
|
from(destination in query,
|
|
where:
|
|
field(destination, ^destination_field) ==
|
|
field(parent_as(:source_record), ^source_field)
|
|
)
|
|
)
|
|
|
|
source_resource
|
|
|> Ash.Query.new()
|
|
|> Ash.Query.data_layer_query()
|
|
|> case do
|
|
{:ok, data_layer_query} ->
|
|
from(source in data_layer_query,
|
|
as: :source_record,
|
|
where: field(source, ^source_field) in ^source_values,
|
|
inner_lateral_join: destination in ^subquery,
|
|
on: field(source, ^source_field) == field(destination, ^destination_field),
|
|
select: destination
|
|
)
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def resource_to_query(resource, _),
|
|
do: Ecto.Queryable.to_query({AshPostgres.table(resource) || "", resource})
|
|
|
|
@impl true
|
|
def create(resource, changeset) do
|
|
changeset.data
|
|
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|
|
|> ecto_changeset(changeset)
|
|
|> repo(resource).insert(repo_opts(changeset))
|
|
|> handle_errors()
|
|
|> case do
|
|
{:ok, result} ->
|
|
maybe_create_tenant!(resource, result)
|
|
|
|
{:ok, result}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
defp maybe_create_tenant!(resource, result) do
|
|
if AshPostgres.manage_tenant_create?(resource) do
|
|
tenant_name = tenant_name(resource, result)
|
|
|
|
AshPostgres.MultiTenancy.create_tenant!(tenant_name, repo(resource))
|
|
else
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp maybe_update_tenant(resource, changeset, result) do
|
|
if AshPostgres.manage_tenant_update?(resource) do
|
|
changing_tenant_name? =
|
|
resource
|
|
|> AshPostgres.manage_tenant_template()
|
|
|> Enum.filter(&is_atom/1)
|
|
|> Enum.any?(&Ash.Changeset.changing_attribute?(changeset, &1))
|
|
|
|
if changing_tenant_name? do
|
|
old_tenant_name = tenant_name(resource, changeset.data)
|
|
|
|
new_tenant_name = tenant_name(resource, result)
|
|
AshPostgres.MultiTenancy.rename_tenant(repo(resource), old_tenant_name, new_tenant_name)
|
|
end
|
|
end
|
|
|
|
:ok
|
|
end
|
|
|
|
defp tenant_name(resource, result) do
|
|
resource
|
|
|> AshPostgres.manage_tenant_template()
|
|
|> Enum.map_join(fn item ->
|
|
if is_binary(item) do
|
|
item
|
|
else
|
|
result
|
|
|> Map.get(item)
|
|
|> to_string()
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
|
|
{:error, Enum.map(errors, &to_ash_error/1)}
|
|
end
|
|
|
|
defp handle_errors({:ok, val}), do: {:ok, val}
|
|
|
|
defp to_ash_error({field, {message, vars}}) do
|
|
Ash.Error.Changes.InvalidAttribute.exception(field: field, message: message, vars: vars)
|
|
end
|
|
|
|
defp ecto_changeset(record, changeset) do
|
|
record
|
|
|> set_table(changeset)
|
|
|> Ecto.Changeset.change(changeset.attributes)
|
|
|> add_unique_indexes(record.__struct__, changeset.tenant, changeset)
|
|
end
|
|
|
|
defp set_table(record, changeset) do
|
|
if AshPostgres.polymorphic?(record.__struct__) do
|
|
table = changeset.context[:data_layer][:table] || AshPostgres.table(record.__struct)
|
|
|
|
if table do
|
|
Ecto.put_meta(record, source: table)
|
|
else
|
|
raise """
|
|
Attempted to change a polymorphic resource without setting the `table` context,
|
|
and without a default table configured on the resource.
|
|
"""
|
|
end
|
|
else
|
|
record
|
|
end
|
|
end
|
|
|
|
defp add_unique_indexes(changeset, resource, tenant, ash_changeset) do
|
|
changeset =
|
|
resource
|
|
|> Ash.Resource.identities()
|
|
|> Enum.reduce(changeset, fn identity, changeset ->
|
|
name =
|
|
if tenant do
|
|
"#{tenant}_#{table(resource, ash_changeset)}_#{identity.name}_unique_index"
|
|
else
|
|
"#{table(resource, ash_changeset)}_#{identity.name}_unique_index"
|
|
end
|
|
|
|
opts =
|
|
if Map.get(identity, :message) do
|
|
[name: name, message: identity.message]
|
|
else
|
|
[name: name]
|
|
end
|
|
|
|
Ecto.Changeset.unique_constraint(changeset, identity.keys, opts)
|
|
end)
|
|
|
|
names =
|
|
resource
|
|
|> AshPostgres.unique_index_names()
|
|
|> case do
|
|
{m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
|
|
value -> List.wrap(value)
|
|
end
|
|
|
|
names = [
|
|
{Ash.Resource.primary_key(resource), table(resource, ash_changeset) <> "_pkey"} | names
|
|
]
|
|
|
|
Enum.reduce(names, changeset, fn {keys, name}, changeset ->
|
|
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name)
|
|
end)
|
|
end
|
|
|
|
@impl true
|
|
def upsert(resource, changeset) do
|
|
repo_opts =
|
|
changeset
|
|
|> repo_opts()
|
|
|> Keyword.put(:on_conflict, {:replace, Map.keys(changeset.attributes)})
|
|
|> Keyword.put(:conflict_target, Ash.Resource.primary_key(resource))
|
|
|
|
if AshPostgres.manage_tenant_update?(resource) do
|
|
{:error, "Cannot currently upsert a resource that owns a tenant"}
|
|
else
|
|
changeset.data
|
|
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|
|
|> ecto_changeset(changeset)
|
|
|> repo(resource).insert(repo_opts)
|
|
|> handle_errors()
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def update(resource, changeset) do
|
|
changeset.data
|
|
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|
|
|> ecto_changeset(changeset)
|
|
|> repo(resource).update(repo_opts(changeset))
|
|
|> handle_errors()
|
|
|> case do
|
|
{:ok, result} ->
|
|
maybe_update_tenant(resource, changeset, result)
|
|
|
|
{:ok, result}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def destroy(resource, %{data: record} = changeset) do
|
|
case repo(resource).delete(record, repo_opts(changeset)) do
|
|
{:ok, _record} ->
|
|
:ok
|
|
|
|
{:error, error} ->
|
|
handle_errors({:error, error})
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def sort(query, sort, resource) do
|
|
query = default_bindings(query, resource)
|
|
|
|
sort
|
|
|> sanitize_sort()
|
|
|> Enum.reduce({:ok, query}, fn {order, sort}, {:ok, query} ->
|
|
binding =
|
|
case Map.fetch(query.__ash_bindings__.aggregates, sort) do
|
|
{:ok, binding} ->
|
|
binding
|
|
|
|
:error ->
|
|
0
|
|
end
|
|
|
|
new_query =
|
|
Map.update!(query, :order_bys, fn order_bys ->
|
|
order_bys = order_bys || []
|
|
|
|
sort_expr = %Ecto.Query.QueryExpr{
|
|
expr: [
|
|
{order, {{:., [], [{:&, [], [binding]}, sort]}, [], []}}
|
|
]
|
|
}
|
|
|
|
order_bys ++ [sort_expr]
|
|
end)
|
|
|
|
{:ok, new_query}
|
|
end)
|
|
end
|
|
|
|
defp sanitize_sort(sort) do
|
|
sort
|
|
|> List.wrap()
|
|
|> Enum.map(fn
|
|
{sort, :asc_nils_last} -> {:asc_nulls_last, sort}
|
|
{sort, :asc_nils_first} -> {:asc_nulls_first, sort}
|
|
{sort, :desc_nils_last} -> {:desc_nulls_last, sort}
|
|
{sort, :desc_nils_first} -> {:desc_nulls_first, sort}
|
|
{sort, order} -> {order, sort}
|
|
sort -> sort
|
|
end)
|
|
end
|
|
|
|
@impl true
|
|
def filter(query, %{expression: false}, _resource) do
|
|
impossible_query = from(row in query, where: false)
|
|
{:ok, Map.put(impossible_query, :__impossible__, true)}
|
|
end
|
|
|
|
def filter(query, filter, _resource) do
|
|
relationship_paths =
|
|
filter
|
|
|> Filter.relationship_paths()
|
|
|> Enum.map(fn path ->
|
|
if can_inner_join?(path, filter) do
|
|
{:inner, relationship_path_to_relationships(filter.resource, path)}
|
|
else
|
|
{:left, relationship_path_to_relationships(filter.resource, path)}
|
|
end
|
|
end)
|
|
|
|
new_query =
|
|
query
|
|
|> join_all_relationships(relationship_paths)
|
|
|> add_filter_expression(filter)
|
|
|
|
{:ok, new_query}
|
|
end
|
|
|
|
defp default_bindings(query, resource) do
|
|
Map.put_new(query, :__ash_bindings__, %{
|
|
current: Enum.count(query.joins) + 1,
|
|
aggregates: %{},
|
|
bindings: %{0 => %{path: [], type: :root, source: resource}}
|
|
})
|
|
end
|
|
|
|
defp can_inner_join?(path, expr, seen_an_or? \\ false)
|
|
|
|
defp can_inner_join?(path, %{expression: expr}, seen_an_or?),
|
|
do: can_inner_join?(path, expr, seen_an_or?)
|
|
|
|
defp can_inner_join?(_path, expr, _seen_an_or?) when expr in [nil, true, false], do: true
|
|
|
|
defp can_inner_join?(path, %BooleanExpression{op: :and, left: left, right: right}, seen_an_or?) do
|
|
can_inner_join?(path, left, seen_an_or?) || can_inner_join?(path, right, seen_an_or?)
|
|
end
|
|
|
|
defp can_inner_join?(path, %BooleanExpression{op: :or, left: left, right: right}, _) do
|
|
can_inner_join?(path, left, true) && can_inner_join?(path, right, true)
|
|
end
|
|
|
|
defp can_inner_join?(
|
|
path,
|
|
%Not{expression: %BooleanExpression{op: :or, left: left, right: right}},
|
|
seen_an_or?
|
|
) do
|
|
can_inner_join?(
|
|
path,
|
|
%BooleanExpression{
|
|
op: :and,
|
|
left: %Not{expression: left},
|
|
right: %Not{expression: right}
|
|
},
|
|
seen_an_or?
|
|
)
|
|
end
|
|
|
|
defp can_inner_join?(path, %Not{expression: expression}, seen_an_or?) do
|
|
can_inner_join?(path, expression, seen_an_or?)
|
|
end
|
|
|
|
defp can_inner_join?(
|
|
search_path,
|
|
%{__operator__?: true, left: %Ref{relationship_path: relationship_path}},
|
|
seen_an_or?
|
|
)
|
|
when search_path == relationship_path do
|
|
not seen_an_or?
|
|
end
|
|
|
|
defp can_inner_join?(
|
|
search_path,
|
|
%{__function__?: true, arguments: arguments},
|
|
seen_an_or?
|
|
) do
|
|
if Enum.any?(arguments, &match?(%Ref{relationship_path: ^search_path}, &1)) do
|
|
not seen_an_or?
|
|
else
|
|
true
|
|
end
|
|
end
|
|
|
|
defp can_inner_join?(_, _, _), do: true
|
|
|
|
@impl true
|
|
def add_aggregate(query, aggregate, _resource) do
|
|
resource = aggregate.resource
|
|
query = default_bindings(query, resource)
|
|
|
|
{query, binding} =
|
|
case get_binding(resource, aggregate.relationship_path, query, :aggregate) do
|
|
nil ->
|
|
relationship = Ash.Resource.relationship(resource, aggregate.relationship_path)
|
|
subquery = aggregate_subquery(relationship, aggregate)
|
|
|
|
new_query =
|
|
join_all_relationships(
|
|
query,
|
|
[
|
|
{{:aggregate, aggregate.name, subquery},
|
|
relationship_path_to_relationships(resource, aggregate.relationship_path)}
|
|
]
|
|
)
|
|
|
|
{new_query, get_binding(resource, aggregate.relationship_path, new_query, :aggregate)}
|
|
|
|
binding ->
|
|
{query, binding}
|
|
end
|
|
|
|
query_with_aggregate_binding =
|
|
put_in(
|
|
query.__ash_bindings__.aggregates,
|
|
Map.put(query.__ash_bindings__.aggregates, aggregate.name, binding)
|
|
)
|
|
|
|
new_query =
|
|
query_with_aggregate_binding
|
|
|> add_aggregate_to_subquery(resource, aggregate, binding)
|
|
|> select_aggregate(resource, aggregate)
|
|
|
|
{:ok, new_query}
|
|
end
|
|
|
|
defp select_aggregate(query, resource, aggregate) do
|
|
binding = get_binding(resource, aggregate.relationship_path, query, :aggregate)
|
|
|
|
query =
|
|
if query.select do
|
|
query
|
|
else
|
|
from(row in query,
|
|
select: row,
|
|
select_merge: %{aggregates: %{}}
|
|
)
|
|
end
|
|
|
|
%{query | select: add_to_select(query.select, binding, aggregate)}
|
|
end
|
|
|
|
defp add_to_select(
|
|
%{expr: {:merge, _, [first, {:%{}, _, [{:aggregates, {:%{}, [], fields}}]}]}} = select,
|
|
binding,
|
|
%{load: nil} = aggregate
|
|
) do
|
|
accessed =
|
|
if aggregate.kind == :first do
|
|
{:fragment, [],
|
|
[
|
|
expr: {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []},
|
|
raw: "[1]"
|
|
]}
|
|
else
|
|
{{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []}
|
|
end
|
|
|
|
field =
|
|
{:type, [],
|
|
[
|
|
accessed,
|
|
Ash.Type.ecto_type(aggregate.type)
|
|
]}
|
|
|
|
field_with_default =
|
|
if is_nil(aggregate.default_value) do
|
|
field
|
|
else
|
|
{:coalesce, [],
|
|
[
|
|
field,
|
|
aggregate.default_value
|
|
]}
|
|
end
|
|
|
|
new_fields = [
|
|
{aggregate.name, field_with_default}
|
|
| fields
|
|
]
|
|
|
|
%{select | expr: {:merge, [], [first, {:%{}, [], [{:aggregates, {:%{}, [], new_fields}}]}]}}
|
|
end
|
|
|
|
defp add_to_select(
|
|
%{expr: expr} = select,
|
|
binding,
|
|
%{load: load_as} = aggregate
|
|
) do
|
|
accessed =
|
|
if aggregate.kind == :first do
|
|
{:fragment, [],
|
|
[
|
|
raw: "",
|
|
expr: {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []},
|
|
raw: "[1]"
|
|
]}
|
|
else
|
|
{{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []}
|
|
end
|
|
|
|
field =
|
|
{:type, [],
|
|
[
|
|
accessed,
|
|
Ash.Type.ecto_type(aggregate.type)
|
|
]}
|
|
|
|
field_with_default =
|
|
if is_nil(aggregate.default_value) do
|
|
field
|
|
else
|
|
{:coalesce, [],
|
|
[
|
|
field,
|
|
aggregate.default_value
|
|
]}
|
|
end
|
|
|
|
%{select | expr: {:merge, [], [expr, {:%{}, [], [{load_as, field_with_default}]}]}}
|
|
end
|
|
|
|
defp add_aggregate_to_subquery(query, resource, aggregate, binding) do
|
|
new_joins =
|
|
List.update_at(query.joins, binding - 1, fn join ->
|
|
aggregate_query =
|
|
if aggregate.authorization_filter do
|
|
{:ok, filter} =
|
|
filter(
|
|
join.source.from.source.query,
|
|
aggregate.authorization_filter,
|
|
Ash.Resource.related(resource, aggregate.relationship_path)
|
|
)
|
|
|
|
filter
|
|
else
|
|
join.source.from.source.query
|
|
end
|
|
|
|
new_aggregate_query = add_subquery_aggregate_select(aggregate_query, aggregate, resource)
|
|
|
|
put_in(join.source.from.source.query, new_aggregate_query)
|
|
end)
|
|
|
|
%{
|
|
query
|
|
| joins: new_joins
|
|
}
|
|
end
|
|
|
|
defp aggregate_subquery(relationship, aggregate) do
|
|
query =
|
|
from(row in relationship.destination,
|
|
group_by: ^relationship.destination_field,
|
|
select: field(row, ^relationship.destination_field)
|
|
)
|
|
|
|
if aggregate.query && aggregate.query.tenant do
|
|
Ecto.Query.put_query_prefix(query, aggregate.query.tenant)
|
|
else
|
|
query
|
|
end
|
|
end
|
|
|
|
defp order_to_postgres_order(dir) do
|
|
case dir do
|
|
:asc -> nil
|
|
:asc_nils_last -> " ASC NULLS LAST"
|
|
:asc_nils_first -> " ASC NULLS FIRST"
|
|
:desc -> " DESC"
|
|
:desc_nils_last -> " DESC NULLS LAST"
|
|
:desc_nils_first -> " DESC NULLS FIRST"
|
|
end
|
|
end
|
|
|
|
defp add_subquery_aggregate_select(query, %{kind: :first} = aggregate, _resource) do
|
|
query = default_bindings(query, aggregate.resource)
|
|
key = aggregate.field
|
|
type = Ash.Type.ecto_type(aggregate.type)
|
|
|
|
field =
|
|
if aggregate.query && aggregate.query.sort && aggregate.query.sort != [] do
|
|
sort_expr =
|
|
aggregate.query.sort
|
|
|> Enum.map(fn {sort, order} ->
|
|
case order_to_postgres_order(order) do
|
|
nil ->
|
|
[expr: {{:., [], [{:&, [], [0]}, sort]}, [], []}]
|
|
|
|
order ->
|
|
[expr: {{:., [], [{:&, [], [0]}, sort]}, [], []}, raw: order]
|
|
end
|
|
end)
|
|
|> Enum.intersperse(raw: ", ")
|
|
|> List.flatten()
|
|
|
|
{:fragment, [],
|
|
[
|
|
raw: "array_agg(",
|
|
expr: {{:., [], [{:&, [], [0]}, key]}, [], []},
|
|
raw: "ORDER BY "
|
|
] ++
|
|
sort_expr ++ [raw: ")"]}
|
|
else
|
|
{:fragment, [],
|
|
[
|
|
raw: "array_agg(",
|
|
expr: {{:., [], [{:&, [], [0]}, key]}, [], []},
|
|
raw: ")"
|
|
]}
|
|
end
|
|
|
|
{params, filtered} =
|
|
if aggregate.query && aggregate.query.filter &&
|
|
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
|
|
{params, expr} =
|
|
filter_to_expr(
|
|
aggregate.query.filter,
|
|
query.__ash_bindings__.bindings,
|
|
query.select.params
|
|
)
|
|
|
|
{params, {:filter, [], [field, expr]}}
|
|
else
|
|
{[], field}
|
|
end
|
|
|
|
cast = {:type, [], [filtered, {:array, type}]}
|
|
|
|
new_expr = {:merge, [], [query.select.expr, {:%{}, [], [{aggregate.name, cast}]}]}
|
|
|
|
%{query | select: %{query.select | expr: new_expr, params: params}}
|
|
end
|
|
|
|
defp add_subquery_aggregate_select(query, %{kind: :count} = aggregate, resource) do
|
|
query = default_bindings(query, aggregate.resource)
|
|
key = aggregate.field || List.first(Ash.Resource.primary_key(resource))
|
|
type = Ash.Type.ecto_type(aggregate.type)
|
|
|
|
field = {:count, [], [{{:., [], [{:&, [], [0]}, key]}, [], []}]}
|
|
|
|
{params, filtered} =
|
|
if aggregate.query && aggregate.query.filter &&
|
|
not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do
|
|
{params, expr} =
|
|
filter_to_expr(
|
|
aggregate.query.filter,
|
|
query.__ash_bindings__.bindings,
|
|
query.select.params
|
|
)
|
|
|
|
{params, {:filter, [], [field, expr]}}
|
|
else
|
|
{[], field}
|
|
end
|
|
|
|
cast = {:type, [], [filtered, type]}
|
|
|
|
new_expr = {:merge, [], [query.select.expr, {:%{}, [], [{aggregate.name, cast}]}]}
|
|
|
|
%{query | select: %{query.select | expr: new_expr, params: params}}
|
|
end
|
|
|
|
defp relationship_path_to_relationships(resource, path, acc \\ [])
|
|
defp relationship_path_to_relationships(_resource, [], acc), do: Enum.reverse(acc)
|
|
|
|
defp relationship_path_to_relationships(resource, [relationship | rest], acc) do
|
|
relationship = Ash.Resource.relationship(resource, relationship)
|
|
|
|
relationship_path_to_relationships(relationship.destination, rest, [relationship | acc])
|
|
end
|
|
|
|
defp join_all_relationships(query, relationship_paths, path \\ [], source \\ nil) do
|
|
query = default_bindings(query, source)
|
|
|
|
Enum.reduce(relationship_paths, query, fn
|
|
{_join_type, []}, query ->
|
|
query
|
|
|
|
{join_type, [relationship | rest_rels]}, query ->
|
|
source = source || relationship.source
|
|
|
|
current_path = path ++ [relationship]
|
|
|
|
current_join_type =
|
|
case join_type do
|
|
{:aggregate, _name, _agg} when rest_rels != [] ->
|
|
:left
|
|
|
|
other ->
|
|
other
|
|
end
|
|
|
|
if has_binding?(source, Enum.reverse(current_path), query, current_join_type) do
|
|
query
|
|
else
|
|
joined_query =
|
|
join_relationship(
|
|
query,
|
|
relationship,
|
|
Enum.map(path, & &1.name),
|
|
current_join_type,
|
|
source
|
|
)
|
|
|
|
joined_query_with_distinct = add_distinct(relationship, join_type, joined_query)
|
|
|
|
join_all_relationships(
|
|
joined_query_with_distinct,
|
|
[{join_type, rest_rels}],
|
|
current_path,
|
|
source
|
|
)
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp has_binding?(resource, path, query, {:aggregate, _, _}),
|
|
do: has_binding?(resource, path, query, :aggregate)
|
|
|
|
defp has_binding?(resource, candidate_path, %{__ash_bindings__: _} = query, type) do
|
|
Enum.any?(query.__ash_bindings__.bindings, fn
|
|
{_, %{path: path, source: source, type: ^type}} ->
|
|
Ash.SatSolver.synonymous_relationship_paths?(resource, path, candidate_path, source)
|
|
|
|
_ ->
|
|
false
|
|
end)
|
|
end
|
|
|
|
defp has_binding?(_, _, _, _), do: false
|
|
|
|
defp get_binding(resource, path, %{__ash_bindings__: _} = query, type) do
|
|
paths =
|
|
Enum.flat_map(query.__ash_bindings__.bindings, fn
|
|
{binding, %{path: path, type: ^type}} ->
|
|
[{binding, path}]
|
|
|
|
_ ->
|
|
[]
|
|
end)
|
|
|
|
Enum.find_value(paths, fn {binding, candidate_path} ->
|
|
Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) && binding
|
|
end)
|
|
end
|
|
|
|
defp get_binding(_, _, _, _), do: nil
|
|
|
|
defp add_distinct(relationship, join_type, joined_query) do
|
|
if relationship.cardinality == :many and join_type == :left && !joined_query.distinct do
|
|
from(row in joined_query,
|
|
distinct: ^Ash.Resource.primary_key(relationship.destination)
|
|
)
|
|
else
|
|
joined_query
|
|
end
|
|
end
|
|
|
|
defp join_relationship(query, relationship, path, join_type, source) do
|
|
case Map.get(query.__ash_bindings__.bindings, path) do
|
|
%{type: existing_join_type} when join_type != existing_join_type ->
|
|
raise "unreachable?"
|
|
|
|
nil ->
|
|
do_join_relationship(query, relationship, path, join_type, source)
|
|
|
|
_ ->
|
|
query
|
|
end
|
|
end
|
|
|
|
defp do_join_relationship(query, %{type: :many_to_many} = relationship, path, kind, source) do
|
|
relationship_through = maybe_get_resource_query(relationship.through)
|
|
|
|
relationship_destination =
|
|
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
|
|
|
|
current_binding =
|
|
Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} ->
|
|
if data.type == kind && data.path == Enum.reverse(path) do
|
|
binding
|
|
end
|
|
end)
|
|
|
|
new_query =
|
|
case kind do
|
|
{:aggregate, _, subquery} ->
|
|
subquery =
|
|
subquery(
|
|
from(destination in subquery,
|
|
where:
|
|
field(destination, ^relationship.destination_field) ==
|
|
field(
|
|
parent_as(:rel_through),
|
|
^relationship.destination_field_on_join_table
|
|
)
|
|
)
|
|
)
|
|
|
|
from([{row, current_binding}] in query,
|
|
left_join: through in ^relationship_through,
|
|
as: :rel_through,
|
|
on:
|
|
field(row, ^relationship.source_field) ==
|
|
field(through, ^relationship.source_field_on_join_table),
|
|
left_lateral_join: destination in ^subquery,
|
|
on:
|
|
field(destination, ^relationship.destination_field) ==
|
|
field(through, ^relationship.destination_field_on_join_table)
|
|
)
|
|
|
|
:inner ->
|
|
from([{row, current_binding}] in query,
|
|
join: through in ^relationship_through,
|
|
on:
|
|
field(row, ^relationship.source_field) ==
|
|
field(through, ^relationship.source_field_on_join_table),
|
|
join: destination in ^relationship_destination,
|
|
on:
|
|
field(destination, ^relationship.destination_field) ==
|
|
field(through, ^relationship.destination_field_on_join_table)
|
|
)
|
|
|
|
_ ->
|
|
from([{row, current_binding}] in query,
|
|
left_join: through in ^relationship_through,
|
|
on:
|
|
field(row, ^relationship.source_field) ==
|
|
field(through, ^relationship.source_field_on_join_table),
|
|
left_join: destination in ^relationship_destination,
|
|
on:
|
|
field(destination, ^relationship.destination_field) ==
|
|
field(through, ^relationship.destination_field_on_join_table)
|
|
)
|
|
end
|
|
|
|
join_path =
|
|
Enum.reverse([String.to_existing_atom(to_string(relationship.name) <> "_join_assoc") | path])
|
|
|
|
full_path = Enum.reverse([relationship.name | path])
|
|
|
|
binding_data =
|
|
case kind do
|
|
{:aggregate, name, _agg} ->
|
|
%{type: :aggregate, name: name, path: full_path, source: source}
|
|
|
|
_ ->
|
|
%{type: kind, path: full_path, source: source}
|
|
end
|
|
|
|
new_query
|
|
|> add_binding(%{path: join_path, type: :left, source: source})
|
|
|> add_binding(binding_data)
|
|
end
|
|
|
|
defp do_join_relationship(query, relationship, path, kind, source) do
|
|
relationship_destination =
|
|
Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination))
|
|
|
|
current_binding =
|
|
Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} ->
|
|
if data.type == kind && data.path == Enum.reverse(path) do
|
|
binding
|
|
end
|
|
end)
|
|
|
|
new_query =
|
|
case kind do
|
|
{:aggregate, _, subquery} ->
|
|
subquery =
|
|
from(
|
|
sub in subquery(
|
|
from(destination in subquery,
|
|
where:
|
|
field(destination, ^relationship.destination_field) ==
|
|
field(parent_as(:rel_source), ^relationship.source_field)
|
|
)
|
|
),
|
|
select: field(sub, ^relationship.destination_field)
|
|
)
|
|
|
|
from([{row, current_binding}] in query,
|
|
as: :rel_source,
|
|
left_lateral_join: destination in ^subquery,
|
|
on:
|
|
field(row, ^relationship.source_field) ==
|
|
field(destination, ^relationship.destination_field)
|
|
)
|
|
|
|
:inner ->
|
|
from([{row, current_binding}] in query,
|
|
join: destination in ^relationship_destination,
|
|
on:
|
|
field(row, ^relationship.source_field) ==
|
|
field(destination, ^relationship.destination_field)
|
|
)
|
|
|
|
_ ->
|
|
from([{row, current_binding}] in query,
|
|
left_join: destination in ^relationship_destination,
|
|
on:
|
|
field(row, ^relationship.source_field) ==
|
|
field(destination, ^relationship.destination_field)
|
|
)
|
|
end
|
|
|
|
full_path = Enum.reverse([relationship.name | path])
|
|
|
|
binding_data =
|
|
case kind do
|
|
{:aggregate, name, _agg} ->
|
|
%{type: :aggregate, name: name, path: full_path, source: source}
|
|
|
|
_ ->
|
|
%{type: kind, path: full_path, source: source}
|
|
end
|
|
|
|
new_query
|
|
|> add_binding(binding_data)
|
|
end
|
|
|
|
defp add_filter_expression(query, filter) do
|
|
wheres =
|
|
filter
|
|
|> split_and_statements()
|
|
|> Enum.map(fn filter ->
|
|
{params, expr} = filter_to_expr(filter, query.__ash_bindings__.bindings, [])
|
|
|
|
%Ecto.Query.BooleanExpr{
|
|
expr: expr,
|
|
op: :and,
|
|
params: params
|
|
}
|
|
end)
|
|
|
|
%{query | wheres: query.wheres ++ wheres}
|
|
end
|
|
|
|
defp split_and_statements(%Filter{expression: expression}) do
|
|
split_and_statements(expression)
|
|
end
|
|
|
|
defp split_and_statements(%BooleanExpression{op: :and, left: left, right: right}) do
|
|
split_and_statements(left) ++ split_and_statements(right)
|
|
end
|
|
|
|
defp split_and_statements(%Not{expression: %Not{expression: expression}}) do
|
|
split_and_statements(expression)
|
|
end
|
|
|
|
defp split_and_statements(%Not{
|
|
expression: %BooleanExpression{op: :or, left: left, right: right}
|
|
}) do
|
|
split_and_statements(%BooleanExpression{
|
|
op: :and,
|
|
left: %Not{expression: left},
|
|
right: %Not{expression: right}
|
|
})
|
|
end
|
|
|
|
defp split_and_statements(other), do: [other]
|
|
|
|
defp filter_to_expr(expr, bindings, params, embedded? \\ false, type \\ nil)
|
|
|
|
defp filter_to_expr(%Filter{expression: expression}, bindings, params, embedded?, type) do
|
|
filter_to_expr(expression, bindings, params, embedded?, type)
|
|
end
|
|
|
|
# A nil filter means "everything"
|
|
defp filter_to_expr(nil, _, _, _, _), do: {[], true}
|
|
# A true filter means "everything"
|
|
defp filter_to_expr(true, _, _, _, _), do: {[], true}
|
|
# A false filter means "nothing"
|
|
defp filter_to_expr(false, _, _, _, _), do: {[], false}
|
|
|
|
defp filter_to_expr(expression, bindings, params, embedded?, type) do
|
|
do_filter_to_expr(expression, bindings, params, embedded?, type)
|
|
end
|
|
|
|
defp do_filter_to_expr(expr, bindings, params, embedded?, type \\ nil)
|
|
|
|
defp do_filter_to_expr(
|
|
%BooleanExpression{op: op, left: left, right: right},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
_type
|
|
) do
|
|
{params, left_expr} = do_filter_to_expr(left, bindings, params, embedded?)
|
|
{params, right_expr} = do_filter_to_expr(right, bindings, params, embedded?)
|
|
{params, {op, [], [left_expr, right_expr]}}
|
|
end
|
|
|
|
defp do_filter_to_expr(%Not{expression: expression}, bindings, params, embedded?, _type) do
|
|
{params, new_expression} = do_filter_to_expr(expression, bindings, params, embedded?)
|
|
{params, {:not, [], [new_expression]}}
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%TrigramSimilarity{arguments: [arg1, arg2], embedded?: pred_embedded?},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
_type
|
|
) do
|
|
{params, arg1} = do_filter_to_expr(arg1, bindings, params, pred_embedded? || embedded?)
|
|
{params, arg2} = do_filter_to_expr(arg2, bindings, params, pred_embedded? || embedded?)
|
|
|
|
{params, {:fragment, [], [raw: "similarity(", expr: arg1, raw: ", ", expr: arg2, raw: ")"]}}
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%Type{arguments: [arg1, arg2], embedded?: pred_embedded?},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
_type
|
|
) do
|
|
{params, arg1} = do_filter_to_expr(arg1, bindings, params, pred_embedded? || embedded?)
|
|
{params, arg2} = do_filter_to_expr(arg2, bindings, params, pred_embedded? || embedded?)
|
|
|
|
{params, {:type, [], [arg1, arg2]}}
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%Fragment{arguments: arguments, embedded?: pred_embedded?},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
_type
|
|
) do
|
|
{params, fragment_data} =
|
|
Enum.reduce(arguments, {params, []}, fn
|
|
{:raw, str}, {params, fragment_data} ->
|
|
{params, fragment_data ++ [{:raw, str}]}
|
|
|
|
{:expr, expr}, {params, fragment_data} ->
|
|
{params, expr} = do_filter_to_expr(expr, bindings, params, pred_embedded? || embedded?)
|
|
{params, fragment_data ++ [{:expr, expr}]}
|
|
end)
|
|
|
|
{params, {:fragment, [], fragment_data}}
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%IsNil{left: left, right: right, embedded?: pred_embedded?},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
_type
|
|
) do
|
|
{params, left_expr} = do_filter_to_expr(left, bindings, params, pred_embedded? || embedded?)
|
|
{params, right_expr} = do_filter_to_expr(right, bindings, params, pred_embedded? || embedded?)
|
|
|
|
{params,
|
|
{:==, [],
|
|
[
|
|
{:is_nil, [], [left_expr]},
|
|
right_expr
|
|
]}}
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%Ago{arguments: [left, right], embedded?: _pred_embedded?},
|
|
_bindings,
|
|
params,
|
|
_embedded?,
|
|
_type
|
|
)
|
|
when is_integer(left) and (is_binary(right) or is_atom(right)) do
|
|
{params ++ [{DateTime.utc_now(), {:param, :any_datetime}}],
|
|
{:datetime_add, [], [{:^, [], [Enum.count(params)]}, left * -1, to_string(right)]}}
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%Contains{arguments: [left, %Ash.CiString{} = right], embedded?: pred_embedded?},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
type
|
|
) do
|
|
do_filter_to_expr(
|
|
%Fragment{
|
|
embedded?: pred_embedded?,
|
|
arguments: [
|
|
raw: "strpos(",
|
|
expr: left,
|
|
raw: "::citext, ",
|
|
expr: right,
|
|
raw: ") > 0"
|
|
]
|
|
},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
type
|
|
)
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%Contains{arguments: [left, right], embedded?: pred_embedded?},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
type
|
|
) do
|
|
do_filter_to_expr(
|
|
%Fragment{
|
|
embedded?: pred_embedded?,
|
|
arguments: [
|
|
raw: "strpos(",
|
|
expr: left,
|
|
raw: ", ",
|
|
expr: right,
|
|
raw: ") > 0"
|
|
]
|
|
},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
type
|
|
)
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%mod{
|
|
__predicate__?: _,
|
|
left: left,
|
|
right: right,
|
|
embedded?: pred_embedded?,
|
|
operator: op
|
|
},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
_type
|
|
) do
|
|
{left_type, right_type} =
|
|
case determine_type(mod, left) do
|
|
nil ->
|
|
case determine_type(mod, right, true) do
|
|
nil ->
|
|
{nil, nil}
|
|
|
|
left_type ->
|
|
{left_type, nil}
|
|
end
|
|
|
|
right_type ->
|
|
if vague?(right_type) do
|
|
case determine_type(mod, right, true) do
|
|
nil ->
|
|
{nil, right_type}
|
|
|
|
left_type ->
|
|
{left_type, nil}
|
|
end
|
|
else
|
|
{nil, right_type}
|
|
end
|
|
end
|
|
|
|
{params, left_expr} =
|
|
do_filter_to_expr(left, bindings, params, pred_embedded? || embedded?, left_type)
|
|
|
|
{params, right_expr} =
|
|
do_filter_to_expr(right, bindings, params, pred_embedded? || embedded?, right_type)
|
|
|
|
{params,
|
|
{op, [],
|
|
[
|
|
left_expr,
|
|
right_expr
|
|
]}}
|
|
end
|
|
|
|
defp do_filter_to_expr(
|
|
%Ref{attribute: %{name: name}} = ref,
|
|
bindings,
|
|
params,
|
|
_embedded?,
|
|
_type
|
|
) do
|
|
{params, {{:., [], [{:&, [], [ref_binding(ref, bindings)]}, name]}, [], []}}
|
|
end
|
|
|
|
defp do_filter_to_expr({:embed, other}, _bindings, params, _true, _type) do
|
|
{params, other}
|
|
end
|
|
|
|
defp do_filter_to_expr(%Ash.CiString{string: string}, bindings, params, embedded?, type) do
|
|
do_filter_to_expr(
|
|
%Fragment{
|
|
embedded?: embedded?,
|
|
arguments: [
|
|
raw: "",
|
|
expr: string,
|
|
raw: "::citext"
|
|
]
|
|
},
|
|
bindings,
|
|
params,
|
|
embedded?,
|
|
type
|
|
)
|
|
end
|
|
|
|
defp do_filter_to_expr(%MapSet{} = mapset, bindings, params, embedded?, type) do
|
|
do_filter_to_expr(Enum.to_list(mapset), bindings, params, embedded?, type)
|
|
end
|
|
|
|
defp do_filter_to_expr(other, _bindings, params, true, _type) do
|
|
{params, other}
|
|
end
|
|
|
|
defp do_filter_to_expr(value, _bindings, params, false, type) do
|
|
type = type || :any
|
|
value = last_ditch_cast(value, type)
|
|
|
|
{params ++ [{value, type}], {:^, [], [Enum.count(params)]}}
|
|
end
|
|
|
|
defp last_ditch_cast(value, :string) when is_atom(value) do
|
|
to_string(value)
|
|
end
|
|
|
|
defp last_ditch_cast(value, _type) do
|
|
value
|
|
end
|
|
|
|
defp determine_type(mod, ref, flip? \\ false)
|
|
|
|
defp determine_type(mod, %Ref{attribute: %{type: type}}, flip?) do
|
|
Enum.find_value(mod.types(), fn types ->
|
|
types =
|
|
case types do
|
|
:same ->
|
|
[type]
|
|
|
|
:any ->
|
|
[]
|
|
|
|
other when is_list(other) ->
|
|
other =
|
|
if flip? do
|
|
Enum.reverse(other)
|
|
else
|
|
other
|
|
end
|
|
|
|
Enum.map(other, fn
|
|
{:array, :any} ->
|
|
{:in, :any}
|
|
|
|
{:array, :same} ->
|
|
{:in, type}
|
|
|
|
{:array, type} ->
|
|
{:in, type}
|
|
|
|
type ->
|
|
type
|
|
end)
|
|
|
|
other ->
|
|
[other]
|
|
end
|
|
|
|
types
|
|
|> Enum.sort_by(&vague?/1)
|
|
|> Enum.at(0)
|
|
|> case do
|
|
nil ->
|
|
nil
|
|
|
|
{:in, :any} ->
|
|
{:in, :any}
|
|
|
|
{:in, type} ->
|
|
if Ash.Type.ash_type?(type) do
|
|
{:in, Ash.Type.storage_type(type)}
|
|
else
|
|
{:in, type}
|
|
end
|
|
|
|
type ->
|
|
if Ash.Type.ash_type?(type) do
|
|
Ash.Type.storage_type(type)
|
|
else
|
|
type
|
|
end
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp determine_type(_mod, _, _), do: nil
|
|
|
|
defp vague?({:in, :any}), do: true
|
|
defp vague?(:any), do: true
|
|
defp vague?(_), do: false
|
|
|
|
defp ref_binding(ref, bindings) do
|
|
case ref.attribute do
|
|
%Ash.Resource.Attribute{} ->
|
|
Enum.find_value(bindings, fn {binding, data} ->
|
|
data.path == ref.relationship_path && data.type in [:inner, :left, :root] && binding
|
|
end)
|
|
|
|
%Ash.Query.Aggregate{} = aggregate ->
|
|
Enum.find_value(bindings, fn {binding, data} ->
|
|
data.path == aggregate.relationship_path && data.type == :aggregate && binding
|
|
end)
|
|
end
|
|
end
|
|
|
|
defp add_binding(query, data) do
|
|
current = query.__ash_bindings__.current
|
|
bindings = query.__ash_bindings__.bindings
|
|
|
|
new_ash_bindings = %{
|
|
query.__ash_bindings__
|
|
| bindings: Map.put(bindings, current, data),
|
|
current: current + 1
|
|
}
|
|
|
|
%{query | __ash_bindings__: new_ash_bindings}
|
|
end
|
|
|
|
@impl true
|
|
def transaction(resource, func) do
|
|
repo(resource).transaction(func)
|
|
end
|
|
|
|
@impl true
|
|
def rollback(resource, term) do
|
|
repo(resource).rollback(term)
|
|
end
|
|
|
|
defp maybe_get_resource_query(resource) do
|
|
case Ash.Query.data_layer_query(Ash.Query.new(resource), only_validate_filter?: false) do
|
|
{:ok, query} -> query
|
|
{:error, error} -> {:error, error}
|
|
end
|
|
end
|
|
|
|
defp table(resource, changeset) do
|
|
changeset.context[:data_layer][:table] || AshPostgres.table(resource)
|
|
end
|
|
end
|