mirror of
https://github.com/ash-project/ash_postgres.git
synced 2024-09-20 05:23:18 +12:00
2294 lines
67 KiB
Elixir
2294 lines
67 KiB
Elixir
defmodule AshPostgres.DataLayer do
|
|
@manage_tenant %Spark.Dsl.Section{
|
|
name: :manage_tenant,
|
|
describe: """
|
|
Configuration for the behavior of a resource that manages a tenant
|
|
""",
|
|
examples: [
|
|
"""
|
|
manage_tenant do
|
|
template ["organization_", :id]
|
|
create? true
|
|
update? false
|
|
end
|
|
"""
|
|
],
|
|
schema: [
|
|
template: [
|
|
type: {:custom, __MODULE__, :tenant_template, []},
|
|
required: true,
|
|
doc: """
|
|
A template that will cause the resource to create/manage the specified schema.
|
|
|
|
Use this if you have a resource that, when created, it should create a new tenant
|
|
for you. For example, if you have a `customer` resource, and you want to create
|
|
a schema for each customer based on their id, e.g `customer_10` set this option
|
|
to `["customer_", :id]`. Then, when this is created, it will create a schema called
|
|
`["customer_", :id]`, and run your tenant migrations on it. Then, if you were to change
|
|
that customer's id to `20`, it would rename the schema to `customer_20`. Generally speaking
|
|
you should avoid changing the tenant id.
|
|
"""
|
|
],
|
|
create?: [
|
|
type: :boolean,
|
|
default: true,
|
|
doc: "Whether or not to automatically create a tenant when a record is created"
|
|
],
|
|
update?: [
|
|
type: :boolean,
|
|
default: true,
|
|
doc: "Whether or not to automatically update the tenant name if the record is udpated"
|
|
]
|
|
]
|
|
}
|
|
|
|
@index %Spark.Dsl.Entity{
|
|
name: :index,
|
|
describe: """
|
|
Add an index to be managed by the migration generator.
|
|
""",
|
|
examples: [
|
|
"index [\"column\", \"column2\"], unique: true, where: \"thing = TRUE\""
|
|
],
|
|
target: AshPostgres.CustomIndex,
|
|
schema: AshPostgres.CustomIndex.schema(),
|
|
transform: {AshPostgres.CustomIndex, :transform, []},
|
|
args: [:fields]
|
|
}
|
|
|
|
@custom_indexes %Spark.Dsl.Section{
|
|
name: :custom_indexes,
|
|
describe: """
|
|
A section for configuring indexes to be created by the migration generator.
|
|
|
|
In general, prefer to use `identities` for simple unique constraints. This is a tool to allow
|
|
for declaring more complex indexes.
|
|
""",
|
|
examples: [
|
|
"""
|
|
custom_indexes do
|
|
index [:column1, :column2], unique: true, where: "thing = TRUE"
|
|
end
|
|
"""
|
|
],
|
|
entities: [
|
|
@index
|
|
]
|
|
}
|
|
|
|
@statement %Spark.Dsl.Entity{
|
|
name: :statement,
|
|
describe: """
|
|
Add a custom statement for migrations.
|
|
""",
|
|
examples: [
|
|
"""
|
|
statement :pgweb_idx do
|
|
up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
|
|
down "DROP INDEX pgweb_idx;"
|
|
end
|
|
"""
|
|
],
|
|
target: AshPostgres.Statement,
|
|
schema: AshPostgres.Statement.schema(),
|
|
args: [:name]
|
|
}
|
|
|
|
@custom_statements %Spark.Dsl.Section{
|
|
name: :custom_statements,
|
|
describe: """
|
|
A section for configuring custom statements to be added to migrations.
|
|
|
|
Changing custom statements may require manual intervention, because Ash can't determine what order they should run
|
|
in (i.e if they depend on table structure that you've added, or vice versa). As such, any `down` statements we run
|
|
for custom statements happen first, and any `up` statements happen last.
|
|
|
|
Additionally, when changing a custom statement, we must make some assumptions, i.e that we should migrate
|
|
the old structure down using the previously configured `down` and recreate it.
|
|
|
|
This may not be desired, and so what you may end up doing is simply modifying the old migration and deleting whatever was
|
|
generated by the migration generator. As always: read your migrations after generating them!
|
|
""",
|
|
examples: [
|
|
"""
|
|
custom_statements do
|
|
# the name is used to detect if you remove or modify the statement
|
|
statement :pgweb_idx do
|
|
up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
|
|
down "DROP INDEX pgweb_idx;"
|
|
end
|
|
end
|
|
"""
|
|
],
|
|
entities: [
|
|
@statement
|
|
]
|
|
}
|
|
|
|
@reference %Spark.Dsl.Entity{
|
|
name: :reference,
|
|
describe: """
|
|
Configures the reference for a relationship in resource migrations.
|
|
|
|
Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys.
|
|
In those cases, you only need to configure the `reference` behavior for one of them. Any conflicts will result
|
|
in an error, across this resource and any other resources that share a table with this one. For this reason,
|
|
instead of adding a reference configuration for `:nothing`, its best to just leave the configuration out, as that
|
|
is the default behavior if *no* relationship anywhere has configured the behavior of that reference.
|
|
""",
|
|
examples: [
|
|
"reference :post, on_delete: :delete, on_update: :update, name: \"comments_to_posts_fkey\""
|
|
],
|
|
args: [:relationship],
|
|
target: AshPostgres.Reference,
|
|
schema: AshPostgres.Reference.schema()
|
|
}
|
|
|
|
@references %Spark.Dsl.Section{
|
|
name: :references,
|
|
describe: """
|
|
A section for configuring the references (foreign keys) in resource migrations.
|
|
|
|
This section is only relevant if you are using the migration generator with this resource.
|
|
Otherwise, it has no effect.
|
|
""",
|
|
examples: [
|
|
"""
|
|
references do
|
|
reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
|
|
end
|
|
"""
|
|
],
|
|
entities: [@reference],
|
|
schema: [
|
|
polymorphic_on_delete: [
|
|
type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
|
|
doc:
|
|
"For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
|
|
],
|
|
polymorphic_on_update: [
|
|
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
|
|
doc:
|
|
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
|
|
],
|
|
polymorphic_name: [
|
|
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
|
|
doc:
|
|
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
|
|
]
|
|
]
|
|
}
|
|
|
|
@check_constraint %Spark.Dsl.Entity{
|
|
name: :check_constraint,
|
|
describe: """
|
|
Add a check constraint to be validated.
|
|
|
|
If a check constraint exists on the table but not in this section, and it produces an error, a runtime error will be raised.
|
|
|
|
Provide a list of attributes instead of a single attribute to add the message to multiple attributes.
|
|
|
|
By adding the `check` option, the migration generator will include it when generating migrations.
|
|
""",
|
|
examples: [
|
|
"""
|
|
check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive"
|
|
"""
|
|
],
|
|
args: [:attribute, :name],
|
|
target: AshPostgres.CheckConstraint,
|
|
schema: AshPostgres.CheckConstraint.schema()
|
|
}
|
|
|
|
@check_constraints %Spark.Dsl.Section{
|
|
name: :check_constraints,
|
|
describe: """
|
|
A section for configuring the check constraints for a given table.
|
|
|
|
This can be used to automatically create those check constraints, or just to provide message when they are raised
|
|
""",
|
|
examples: [
|
|
"""
|
|
check_constraints do
|
|
check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive"
|
|
end
|
|
"""
|
|
],
|
|
entities: [@check_constraint]
|
|
}
|
|
|
|
@references %Spark.Dsl.Section{
|
|
name: :references,
|
|
describe: """
|
|
A section for configuring the references (foreign keys) in resource migrations.
|
|
|
|
This section is only relevant if you are using the migration generator with this resource.
|
|
Otherwise, it has no effect.
|
|
""",
|
|
examples: [
|
|
"""
|
|
references do
|
|
reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
|
|
end
|
|
"""
|
|
],
|
|
entities: [@reference],
|
|
schema: [
|
|
polymorphic_on_delete: [
|
|
type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
|
|
doc:
|
|
"For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
|
|
],
|
|
polymorphic_on_update: [
|
|
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
|
|
doc:
|
|
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
|
|
],
|
|
polymorphic_name: [
|
|
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
|
|
doc:
|
|
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
|
|
]
|
|
]
|
|
}
|
|
|
|
@postgres %Spark.Dsl.Section{
|
|
name: :postgres,
|
|
describe: """
|
|
Postgres data layer configuration
|
|
""",
|
|
sections: [
|
|
@custom_indexes,
|
|
@custom_statements,
|
|
@manage_tenant,
|
|
@references,
|
|
@check_constraints
|
|
],
|
|
modules: [
|
|
:repo
|
|
],
|
|
examples: [
|
|
"""
|
|
postgres do
|
|
repo MyApp.Repo
|
|
table "organizations"
|
|
end
|
|
"""
|
|
],
|
|
schema: [
|
|
repo: [
|
|
type: :atom,
|
|
required: true,
|
|
doc:
|
|
"The repo that will be used to fetch your data. See the `AshPostgres.Repo` documentation for more"
|
|
],
|
|
migrate?: [
|
|
type: :boolean,
|
|
default: true,
|
|
doc:
|
|
"Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`"
|
|
],
|
|
migration_types: [
|
|
type: :keyword_list,
|
|
default: [],
|
|
doc:
|
|
"A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults."
|
|
],
|
|
migration_defaults: [
|
|
type: :keyword_list,
|
|
default: [],
|
|
doc: """
|
|
A keyword list of attribute names to the ecto migration default that should be used for that attribute. Only necessary if you need to override the defaults.
|
|
|
|
The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\\\"now()\\\\")`, or for `nil`, use `\\\\"nil\\\\"`.
|
|
"""
|
|
],
|
|
base_filter_sql: [
|
|
type: :string,
|
|
doc:
|
|
"A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter"
|
|
],
|
|
simple_join_first_aggregates: [
|
|
type: {:list, :atom},
|
|
default: [],
|
|
doc: """
|
|
A list of `:first` type aggregate names that can be joined to using a simple join.
|
|
|
|
This is used in the relatively rare case that you have a `:first` aggregate that uses
|
|
a `has_many` or `many_to_many` relationship in its path, but your `filter` statement ensures
|
|
that there is only one result. In these cases, we can use a more optimized version of
|
|
computing the aggregate value.
|
|
"""
|
|
],
|
|
skip_unique_indexes: [
|
|
type: {:custom, __MODULE__, :validate_skip_unique_indexes, []},
|
|
default: false,
|
|
doc: "Skip generating unique indexes when generating migrations"
|
|
],
|
|
unique_index_names: [
|
|
type: :any,
|
|
default: [],
|
|
doc: """
|
|
A list of unique index names that could raise errors, or an mfa to a function that takes a changeset
|
|
and returns the list. Must be in the format `{[:affected, :keys], "name_of_constraint"}` or `{[:affected, :keys], "name_of_constraint", "custom error message"}`
|
|
|
|
Note that this is *not* used to rename the unique indexes created from `identities`.
|
|
Use `identity_index_names` for that. This is used to tell ash_postgres about unique indexes that
|
|
exist in the database that it didn't create.
|
|
"""
|
|
],
|
|
exclusion_constraint_names: [
|
|
type: :any,
|
|
default: [],
|
|
doc: """
|
|
A list of exclusion constraint names that could raise errors. Must be in the format `{:affected_key, "name_of_constraint"}` or `{:affected_key, "name_of_constraint", "custom error message"}`
|
|
"""
|
|
],
|
|
identity_index_names: [
|
|
type: :any,
|
|
default: [],
|
|
doc: """
|
|
A keyword list of identity names to the unique index name that they should use when being managed by the migration
|
|
generator.
|
|
"""
|
|
],
|
|
foreign_key_names: [
|
|
type: :any,
|
|
default: [],
|
|
doc: """
|
|
A list of foreign keys that could raise errors, or an mfa to a function that takes a changeset and returns the list.
|
|
Must be in the format `{:key, "name_of_constraint"}` or `{:key, "name_of_constraint", "custom error message"}`
|
|
"""
|
|
],
|
|
migration_ignore_attributes: [
|
|
type: {:list, :atom},
|
|
default: [],
|
|
doc: """
|
|
A list of attributes that will be ignored when generating migrations.
|
|
"""
|
|
],
|
|
table: [
|
|
type: :string,
|
|
doc: """
|
|
The table to store and read the resource from. Required unless `polymorphic?` is true.
|
|
|
|
If this is changed, the migration generator will not remove the old table.
|
|
"""
|
|
],
|
|
schema: [
|
|
type: :string,
|
|
doc: """
|
|
The schema that the table is located in.
|
|
Multitenancy supersedes this, so this acts as the schema in the cases that `global?: true` is set.
|
|
If this is changed, the migration generator will not remove the old table in the old schema.
|
|
"""
|
|
],
|
|
polymorphic?: [
|
|
type: :boolean,
|
|
default: false,
|
|
doc: """
|
|
Declares this resource as polymorphic.
|
|
|
|
Polymorphic resources cannot be read or updated unless the table is provided in the query/changeset context.
|
|
|
|
For example:
|
|
|
|
PolymorphicResource
|
|
|> Ash.Query.set_context(%{data_layer: %{table: "table"}})
|
|
|> MyApi.read!()
|
|
|
|
When relating to polymorphic resources, you'll need to use the `context` option on relationships,
|
|
e.g
|
|
|
|
belongs_to :polymorphic_association, PolymorphicResource,
|
|
context: %{data_layer: %{table: "table"}}
|
|
"""
|
|
]
|
|
]
|
|
}
|
|
|
|
alias Ash.Filter
|
|
alias Ash.Query.{BooleanExpression, Not, Ref}
|
|
|
|
@behaviour Ash.DataLayer
|
|
|
|
@sections [@postgres]
|
|
|
|
@moduledoc """
|
|
A postgres data layer that leverages Ecto's postgres capabilities.
|
|
|
|
<!--- ash-hq-hide-start --> <!--- -->
|
|
|
|
## DSL Documentation
|
|
|
|
### Index
|
|
|
|
#{Spark.Dsl.Extension.doc_index(@sections)}
|
|
|
|
### Docs
|
|
|
|
#{Spark.Dsl.Extension.doc(@sections)}
|
|
<!--- ash-hq-hide-stop --> <!--- -->
|
|
"""
|
|
|
|
use Spark.Dsl.Extension,
|
|
sections: @sections,
|
|
transformers: [
|
|
AshPostgres.Transformers.ValidateReferences,
|
|
AshPostgres.Transformers.VerifyRepo,
|
|
AshPostgres.Transformers.EnsureTableOrPolymorphic,
|
|
AshPostgres.Transformers.PreventMultidimensionalArrayAggregates
|
|
]
|
|
|
|
@doc false
|
|
def tenant_template(value) do
|
|
value = List.wrap(value)
|
|
|
|
if Enum.all?(value, &(is_binary(&1) || is_atom(&1))) do
|
|
{:ok, value}
|
|
else
|
|
{:error, "Expected all values for `manages_tenant` to be strings or atoms"}
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
def validate_skip_unique_indexes(indexes) do
|
|
indexes = List.wrap(indexes)
|
|
|
|
if Enum.all?(indexes, &is_atom/1) do
|
|
{:ok, indexes}
|
|
else
|
|
{:error, "All indexes to skip must be atoms"}
|
|
end
|
|
end
|
|
|
|
import Ecto.Query, only: [from: 2, subquery: 1]
|
|
|
|
@impl true
|
|
def can?(_, :async_engine), do: true
|
|
def can?(_, :bulk_create), do: true
|
|
def can?(_, {:lock, :for_update}), do: true
|
|
|
|
def can?(_, {:lock, string}) do
|
|
string = String.trim_trailing(string, " NOWAIT")
|
|
|
|
String.upcase(string) in [
|
|
"FOR UPDATE",
|
|
"FOR NO KEY UPDATE",
|
|
"FOR SHARE",
|
|
"FOR KEY SHARE"
|
|
]
|
|
end
|
|
|
|
def can?(_, :transact), do: true
|
|
def can?(_, :composite_primary_key), do: true
|
|
def can?(_, :upsert), do: true
|
|
def can?(_, :changeset_filter), do: true
|
|
|
|
def can?(resource, {:join, other_resource}) do
|
|
data_layer = Ash.DataLayer.data_layer(resource)
|
|
other_data_layer = Ash.DataLayer.data_layer(other_resource)
|
|
|
|
data_layer == other_data_layer and
|
|
AshPostgres.DataLayer.Info.repo(resource) == AshPostgres.DataLayer.Info.repo(other_resource)
|
|
end
|
|
|
|
def can?(resource, {:lateral_join, resources}) do
|
|
repo = AshPostgres.DataLayer.Info.repo(resource)
|
|
data_layer = Ash.DataLayer.data_layer(resource)
|
|
|
|
data_layer == __MODULE__ &&
|
|
Enum.all?(resources, fn resource ->
|
|
Ash.DataLayer.data_layer(resource) == data_layer &&
|
|
AshPostgres.DataLayer.Info.repo(resource) == repo
|
|
end)
|
|
end
|
|
|
|
def can?(_, :boolean_filter), do: true
|
|
|
|
def can?(_, {:aggregate, type})
|
|
when type in [:count, :sum, :first, :list, :avg, :max, :min, :exists, :custom],
|
|
do: true
|
|
|
|
def can?(_, :aggregate_filter), do: true
|
|
def can?(_, :aggregate_sort), do: true
|
|
def can?(_, :expression_calculation), do: true
|
|
def can?(_, :expression_calculation_sort), do: true
|
|
def can?(_, :create), do: true
|
|
def can?(_, :select), do: true
|
|
def can?(_, :read), do: true
|
|
def can?(_, :update), do: true
|
|
def can?(_, :destroy), do: true
|
|
def can?(_, :filter), do: true
|
|
def can?(_, :limit), do: true
|
|
def can?(_, :offset), do: true
|
|
def can?(_, :multitenancy), do: true
|
|
|
|
def can?(_, {:filter_relationship, %{manual: {module, _}}}) do
|
|
Spark.implements_behaviour?(module, AshPostgres.ManualRelationship)
|
|
end
|
|
|
|
def can?(_, {:filter_relationship, _}), do: true
|
|
|
|
def can?(_, {:aggregate_relationship, %{manual: {module, _}}}) do
|
|
Spark.implements_behaviour?(module, AshPostgres.ManualRelationship)
|
|
end
|
|
|
|
def can?(_, {:aggregate_relationship, _}), do: true
|
|
|
|
def can?(_, :timeout), do: true
|
|
def can?(_, {:filter_expr, _}), do: true
|
|
def can?(_, :nested_expressions), do: true
|
|
def can?(_, {:query_aggregate, :count}), do: true
|
|
def can?(_, :sort), do: true
|
|
def can?(_, :distinct_sort), do: true
|
|
def can?(_, :distinct), do: true
|
|
def can?(_, {:sort, _}), do: true
|
|
def can?(_, _), do: false
|
|
|
|
@impl true
|
|
def in_transaction?(resource) do
|
|
AshPostgres.DataLayer.Info.repo(resource).in_transaction?()
|
|
end
|
|
|
|
@impl true
|
|
def limit(query, nil, _), do: {:ok, query}
|
|
|
|
def limit(query, limit, _resource) do
|
|
{:ok, from(row in query, limit: ^limit)}
|
|
end
|
|
|
|
@impl true
|
|
def source(resource) do
|
|
AshPostgres.DataLayer.Info.table(resource) || ""
|
|
end
|
|
|
|
@impl true
|
|
def set_context(resource, data_layer_query, context) do
|
|
start_bindings = context[:data_layer][:start_bindings_at] || 0
|
|
data_layer_query = from(row in data_layer_query, as: ^start_bindings)
|
|
|
|
data_layer_query =
|
|
if context[:data_layer][:table] do
|
|
%{
|
|
data_layer_query
|
|
| from: %{data_layer_query.from | source: {context[:data_layer][:table], resource}}
|
|
}
|
|
else
|
|
data_layer_query
|
|
end
|
|
|
|
data_layer_query =
|
|
if context[:data_layer][:schema] do
|
|
Ecto.Query.put_query_prefix(data_layer_query, to_string(context[:data_layer][:schema]))
|
|
else
|
|
data_layer_query
|
|
end
|
|
|
|
data_layer_query =
|
|
data_layer_query
|
|
|> default_bindings(resource, context)
|
|
|
|
{:ok, data_layer_query}
|
|
end
|
|
|
|
@impl true
|
|
def offset(query, nil, _), do: query
|
|
|
|
def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do
|
|
{:ok, query}
|
|
end
|
|
|
|
def offset(query, offset, _resource) do
|
|
{:ok, from(row in query, offset: ^offset)}
|
|
end
|
|
|
|
@impl true
|
|
def run_query(query, resource) do
|
|
query = default_bindings(query, resource)
|
|
|
|
with_sort_applied =
|
|
if query.__ash_bindings__[:sort_applied?] do
|
|
{:ok, query}
|
|
else
|
|
apply_sort(query, query.__ash_bindings__[:sort], resource)
|
|
end
|
|
|
|
case with_sort_applied do
|
|
{:error, error} ->
|
|
{:error, error}
|
|
|
|
{:ok, query} ->
|
|
query =
|
|
if query.__ash_bindings__[:__order__?] && query.windows[:order] do
|
|
if query.distinct do
|
|
query_with_order =
|
|
from(row in query, select_merge: %{__order__: over(row_number(), :order)})
|
|
|
|
query_without_limit_and_offset =
|
|
query_with_order
|
|
|> Ecto.Query.exclude(:limit)
|
|
|> Ecto.Query.exclude(:offset)
|
|
|
|
from(row in subquery(query_without_limit_and_offset),
|
|
select: row,
|
|
order_by: row.__order__
|
|
)
|
|
|> Map.put(:limit, query.limit)
|
|
|> Map.put(:offset, query.offset)
|
|
else
|
|
order_by = %{query.windows[:order] | expr: query.windows[:order].expr[:order_by]}
|
|
|
|
%{
|
|
query
|
|
| windows: Keyword.delete(query.windows, :order),
|
|
order_bys: [order_by]
|
|
}
|
|
end
|
|
else
|
|
%{query | windows: Keyword.delete(query.windows, :order)}
|
|
end
|
|
|
|
if AshPostgres.DataLayer.Info.polymorphic?(resource) && no_table?(query) do
|
|
raise_table_error!(resource, :read)
|
|
else
|
|
{:ok, dynamic_repo(resource, query).all(query, repo_opts(nil, nil, resource))}
|
|
end
|
|
end
|
|
rescue
|
|
e ->
|
|
handle_raised_error(e, __STACKTRACE__, query, resource)
|
|
end
|
|
|
|
defp no_table?(%{from: %{source: {"", _}}}), do: true
|
|
defp no_table?(_), do: false
|
|
|
|
defp repo_opts(timeout, nil, resource) do
|
|
if schema = AshPostgres.DataLayer.Info.schema(resource) do
|
|
[prefix: schema]
|
|
else
|
|
[]
|
|
end
|
|
|> add_timeout(timeout)
|
|
end
|
|
|
|
defp repo_opts(timeout, tenant, resource) do
|
|
if Ash.Resource.Info.multitenancy_strategy(resource) == :context do
|
|
[prefix: tenant]
|
|
else
|
|
if schema = AshPostgres.DataLayer.Info.schema(resource) do
|
|
[prefix: schema]
|
|
else
|
|
[]
|
|
end
|
|
end
|
|
|> add_timeout(timeout)
|
|
end
|
|
|
|
defp add_timeout(opts, timeout) when not is_nil(timeout) do
|
|
Keyword.put(opts, :timeout, timeout)
|
|
end
|
|
|
|
defp add_timeout(opts, _), do: opts
|
|
|
|
@impl true
|
|
def functions(resource) do
|
|
config = AshPostgres.DataLayer.Info.repo(resource).config()
|
|
|
|
functions = [
|
|
AshPostgres.Functions.Fragment,
|
|
AshPostgres.Functions.Like,
|
|
AshPostgres.Functions.ILike
|
|
]
|
|
|
|
if "pg_trgm" in (config[:installed_extensions] || []) do
|
|
functions ++
|
|
[
|
|
AshPostgres.Functions.TrigramSimilarity
|
|
]
|
|
else
|
|
functions
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def run_aggregate_query(query, aggregates, resource) do
|
|
{exists, aggregates} = Enum.split_with(aggregates, &(&1.kind == :exists))
|
|
query = default_bindings(query, resource)
|
|
|
|
query =
|
|
if query.distinct do
|
|
query =
|
|
query
|
|
|> Ecto.Query.exclude(:select)
|
|
|> Ecto.Query.exclude(:order_by)
|
|
|> Map.put(:windows, [])
|
|
|
|
from(row in subquery(query), as: ^0, select: %{})
|
|
else
|
|
query
|
|
|> Ecto.Query.exclude(:select)
|
|
|> Ecto.Query.exclude(:order_by)
|
|
|> Map.put(:windows, [])
|
|
|> Ecto.Query.select(%{})
|
|
end
|
|
|
|
query_before_select = query
|
|
|
|
query =
|
|
Enum.reduce(
|
|
aggregates,
|
|
query,
|
|
fn agg, query ->
|
|
AshPostgres.Aggregate.add_subquery_aggregate_select(
|
|
query,
|
|
agg.relationship_path |> Enum.drop(1),
|
|
agg,
|
|
resource,
|
|
true
|
|
)
|
|
end
|
|
)
|
|
|
|
{:ok,
|
|
dynamic_repo(resource, query).one(query, repo_opts(nil, nil, resource))
|
|
|> add_exists_aggs(resource, query_before_select, exists)}
|
|
end
|
|
|
|
defp add_exists_aggs(result, resource, query, exists) do
|
|
repo = dynamic_repo(resource, query)
|
|
repo_opts = repo_opts(nil, nil, resource)
|
|
|
|
Enum.reduce(exists, result, fn agg, result ->
|
|
{:ok, filtered} =
|
|
case agg do
|
|
%{query: %{filter: filter}} when not is_nil(filter) ->
|
|
filter(query, filter, resource)
|
|
|
|
_ ->
|
|
{:ok, query}
|
|
end
|
|
|
|
Map.put(
|
|
result || %{},
|
|
agg.name,
|
|
repo.exists?(filtered, repo_opts)
|
|
)
|
|
end)
|
|
end
|
|
|
|
@impl true
|
|
def set_tenant(_resource, query, tenant) do
|
|
{:ok, Map.put(Ecto.Query.put_query_prefix(query, to_string(tenant)), :__tenant__, tenant)}
|
|
end
|
|
|
|
@impl true
|
|
def run_aggregate_query_with_lateral_join(
|
|
query,
|
|
aggregates,
|
|
root_data,
|
|
destination_resource,
|
|
path
|
|
) do
|
|
{exists, aggregates} = Enum.split_with(aggregates, &(&1.kind == :exists))
|
|
|
|
case lateral_join_query(
|
|
query,
|
|
root_data,
|
|
path
|
|
) do
|
|
{:ok, lateral_join_query} ->
|
|
source_resource =
|
|
path
|
|
|> Enum.at(0)
|
|
|> elem(0)
|
|
|> Map.get(:resource)
|
|
|
|
subquery = from(row in subquery(lateral_join_query), as: ^0, select: %{})
|
|
subquery = default_bindings(subquery, source_resource)
|
|
|
|
query =
|
|
Enum.reduce(
|
|
aggregates,
|
|
subquery,
|
|
fn agg, subquery ->
|
|
has_exists? =
|
|
Ash.Filter.find(agg.query && agg.query.filter, fn
|
|
%Ash.Query.Exists{} -> true
|
|
_ -> false
|
|
end)
|
|
|
|
AshPostgres.Aggregate.add_subquery_aggregate_select(
|
|
subquery,
|
|
agg.relationship_path |> Enum.drop(1),
|
|
agg,
|
|
destination_resource,
|
|
has_exists?
|
|
)
|
|
end
|
|
)
|
|
|
|
{:ok,
|
|
dynamic_repo(source_resource, query).one(
|
|
query,
|
|
repo_opts(nil, nil, source_resource)
|
|
)
|
|
|> add_exists_aggs(source_resource, subquery, exists)}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def run_query_with_lateral_join(
|
|
query,
|
|
root_data,
|
|
_destination_resource,
|
|
path
|
|
) do
|
|
with_sort_applied =
|
|
if query.__ash_bindings__[:sort_applied?] do
|
|
{:ok, query}
|
|
else
|
|
apply_sort(query, query.__ash_bindings__[:sort], query.__ash_bindings__.resource)
|
|
end
|
|
|
|
case with_sort_applied do
|
|
{:error, error} ->
|
|
{:error, error}
|
|
|
|
{:ok, query} ->
|
|
case lateral_join_query(
|
|
query,
|
|
root_data,
|
|
path
|
|
) do
|
|
{:ok, query} ->
|
|
source_resource =
|
|
path
|
|
|> Enum.at(0)
|
|
|> elem(0)
|
|
|> Map.get(:resource)
|
|
|
|
{:ok,
|
|
dynamic_repo(source_resource, query).all(
|
|
query,
|
|
repo_opts(nil, nil, source_resource)
|
|
)}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
end
|
|
|
|
defp lateral_join_query(
|
|
query,
|
|
root_data,
|
|
[{source_query, source_attribute, destination_attribute, relationship}]
|
|
) do
|
|
source_values = Enum.map(root_data, &Map.get(&1, source_attribute))
|
|
source_query = Ash.Query.new(source_query)
|
|
|
|
subquery =
|
|
if query.__ash_bindings__[:__order__?] do
|
|
subquery(
|
|
from(destination in query,
|
|
select_merge: %{__order__: over(row_number(), :order)},
|
|
where:
|
|
field(destination, ^destination_attribute) ==
|
|
field(parent_as(^0), ^source_attribute)
|
|
)
|
|
|> set_subquery_prefix(source_query, relationship.destination)
|
|
)
|
|
else
|
|
subquery(
|
|
from(destination in query,
|
|
where:
|
|
field(destination, ^destination_attribute) ==
|
|
field(parent_as(^0), ^source_attribute)
|
|
)
|
|
|> set_subquery_prefix(source_query, relationship.destination)
|
|
)
|
|
end
|
|
|
|
source_query.resource
|
|
|> Ash.Query.set_context(%{:data_layer => source_query.context[:data_layer]})
|
|
|> Ash.Query.set_tenant(source_query.tenant)
|
|
|> set_lateral_join_prefix(query)
|
|
|> case do
|
|
%{valid?: true} = query ->
|
|
Ash.Query.data_layer_query(query)
|
|
|
|
query ->
|
|
{:error, query}
|
|
end
|
|
|> case do
|
|
{:ok, data_layer_query} ->
|
|
if query.__ash_bindings__[:__order__?] do
|
|
{:ok,
|
|
from(source in data_layer_query,
|
|
where: field(source, ^source_attribute) in ^source_values,
|
|
inner_lateral_join: destination in ^subquery,
|
|
on: field(source, ^source_attribute) == field(destination, ^destination_attribute),
|
|
order_by: destination.__order__,
|
|
select: destination,
|
|
distinct: true
|
|
)}
|
|
else
|
|
{:ok,
|
|
from(source in data_layer_query,
|
|
where: field(source, ^source_attribute) in ^source_values,
|
|
inner_lateral_join: destination in ^subquery,
|
|
on: field(source, ^source_attribute) == field(destination, ^destination_attribute),
|
|
select: destination,
|
|
distinct: true
|
|
)}
|
|
end
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
defp lateral_join_query(
|
|
query,
|
|
root_data,
|
|
[
|
|
{source_query, source_attribute, source_attribute_on_join_resource, relationship},
|
|
{through_resource, destination_attribute_on_join_resource, destination_attribute,
|
|
through_relationship}
|
|
]
|
|
) do
|
|
source_query = Ash.Query.new(source_query)
|
|
source_values = Enum.map(root_data, &Map.get(&1, source_attribute))
|
|
|
|
through_resource
|
|
|> Ash.Query.new()
|
|
|> Ash.Query.set_context(through_relationship.context)
|
|
|> Ash.Query.do_filter(through_relationship.filter)
|
|
|> Ash.Query.sort(through_relationship.sort, prepend?: true)
|
|
|> Ash.Query.set_tenant(source_query.tenant)
|
|
|> Ash.Query.put_context(:data_layer, %{
|
|
start_bindings_at: query.__ash_bindings__.current
|
|
})
|
|
|> set_lateral_join_prefix(query)
|
|
|> case do
|
|
%{valid?: true} = through_query ->
|
|
through_query
|
|
|> Ash.Query.data_layer_query()
|
|
|
|
query ->
|
|
{:error, query}
|
|
end
|
|
|> case do
|
|
{:ok, through_query} ->
|
|
source_query.resource
|
|
|> Ash.Query.new()
|
|
|> Ash.Query.set_context(relationship.context)
|
|
|> Ash.Query.set_context(%{:data_layer => source_query.context[:data_layer]})
|
|
|> Ash.Query.put_context(:data_layer, %{
|
|
start_bindings_at: through_query.__ash_bindings__.current
|
|
})
|
|
|> set_lateral_join_prefix(query)
|
|
|> Ash.Query.do_filter(relationship.filter)
|
|
|> case do
|
|
%{valid?: true} = query ->
|
|
query
|
|
|> Ash.Query.data_layer_query()
|
|
|
|
query ->
|
|
{:error, query}
|
|
end
|
|
|> case do
|
|
{:ok, data_layer_query} ->
|
|
if query.__ash_bindings__[:__order__?] do
|
|
subquery =
|
|
subquery(
|
|
from(
|
|
destination in query,
|
|
select_merge: %{__order__: over(row_number(), :order)},
|
|
join:
|
|
through in ^set_subquery_prefix(
|
|
through_query,
|
|
source_query,
|
|
relationship.through
|
|
),
|
|
as: ^query.__ash_bindings__.current,
|
|
on:
|
|
field(through, ^destination_attribute_on_join_resource) ==
|
|
field(destination, ^destination_attribute),
|
|
where:
|
|
field(through, ^source_attribute_on_join_resource) ==
|
|
field(
|
|
parent_as(^through_query.__ash_bindings__.current),
|
|
^source_attribute
|
|
),
|
|
select_merge: %{
|
|
__lateral_join_source__: field(through, ^source_attribute_on_join_resource)
|
|
}
|
|
)
|
|
|> set_subquery_prefix(
|
|
source_query,
|
|
relationship.destination
|
|
)
|
|
)
|
|
|
|
{:ok,
|
|
from(source in data_layer_query,
|
|
where: field(source, ^source_attribute) in ^source_values,
|
|
inner_lateral_join: destination in ^subquery,
|
|
on: true,
|
|
select: destination,
|
|
order_by: destination.__order__,
|
|
distinct: true
|
|
)}
|
|
else
|
|
subquery =
|
|
subquery(
|
|
from(
|
|
destination in query,
|
|
join:
|
|
through in ^set_subquery_prefix(
|
|
through_query,
|
|
source_query,
|
|
relationship.through
|
|
),
|
|
as: ^query.__ash_bindings__.current,
|
|
on:
|
|
field(through, ^destination_attribute_on_join_resource) ==
|
|
field(destination, ^destination_attribute),
|
|
where:
|
|
field(through, ^source_attribute_on_join_resource) ==
|
|
field(
|
|
parent_as(^through_query.__ash_bindings__.current),
|
|
^source_attribute
|
|
),
|
|
select_merge: %{
|
|
__lateral_join_source__: field(through, ^source_attribute_on_join_resource)
|
|
}
|
|
)
|
|
|> set_subquery_prefix(
|
|
source_query,
|
|
relationship.destination
|
|
)
|
|
)
|
|
|
|
{:ok,
|
|
from(source in data_layer_query,
|
|
where: field(source, ^source_attribute) in ^source_values,
|
|
inner_lateral_join: destination in ^subquery,
|
|
on: true,
|
|
select: destination,
|
|
distinct: true
|
|
)}
|
|
end
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
def set_subquery_prefix(data_layer_query, source_query, resource) do
|
|
config = AshPostgres.DataLayer.Info.repo(resource).config()
|
|
|
|
if Ash.Resource.Info.multitenancy_strategy(resource) == :context do
|
|
%{
|
|
data_layer_query
|
|
| prefix:
|
|
to_string(
|
|
source_query.tenant || AshPostgres.DataLayer.Info.schema(resource) ||
|
|
config[:default_prefix] ||
|
|
"public"
|
|
)
|
|
}
|
|
else
|
|
%{
|
|
data_layer_query
|
|
| prefix:
|
|
to_string(
|
|
AshPostgres.DataLayer.Info.schema(resource) || config[:default_prefix] ||
|
|
"public"
|
|
)
|
|
}
|
|
end
|
|
end
|
|
|
|
defp set_lateral_join_prefix(ash_query, query) do
|
|
if Ash.Resource.Info.multitenancy_strategy(ash_query.resource) == :context do
|
|
Ash.Query.set_tenant(ash_query, query.prefix)
|
|
else
|
|
ash_query
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def resource_to_query(resource, _) do
|
|
from(row in {AshPostgres.DataLayer.Info.table(resource) || "", resource}, [])
|
|
end
|
|
|
|
@impl true
|
|
def bulk_create(resource, stream, options) do
|
|
opts = repo_opts(nil, options[:tenant], resource)
|
|
|
|
opts =
|
|
if options.return_records? do
|
|
Keyword.put(opts, :returning, true)
|
|
else
|
|
opts
|
|
end
|
|
|
|
opts =
|
|
if options[:upsert?] do
|
|
opts
|
|
|> Keyword.put(:on_conflict, {:replace, options[:upsert_fields] || []})
|
|
|> Keyword.put(
|
|
:conflict_target,
|
|
conflict_target(
|
|
resource,
|
|
options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)
|
|
)
|
|
)
|
|
else
|
|
opts
|
|
end
|
|
|
|
changesets = Enum.to_list(stream)
|
|
|
|
ecto_changesets =
|
|
changesets
|
|
|> Stream.map(& &1.attributes)
|
|
|> Enum.to_list()
|
|
|
|
resource
|
|
|> dynamic_repo(resource, Enum.at(changesets, 0)).insert_all(ecto_changesets, opts)
|
|
|> case do
|
|
{_, nil} ->
|
|
:ok
|
|
|
|
{_, results} ->
|
|
{:ok,
|
|
Stream.zip_with(results, changesets, fn result, changeset ->
|
|
Ash.Resource.put_metadata(
|
|
result,
|
|
:bulk_create_index,
|
|
changeset.context.bulk_create.index
|
|
)
|
|
end)}
|
|
end
|
|
rescue
|
|
e ->
|
|
changeset = Ash.Changeset.new(resource)
|
|
|
|
handle_raised_error(
|
|
e,
|
|
__STACKTRACE__,
|
|
{:bulk_create, ecto_changeset(changeset.data, changeset, :create)},
|
|
resource
|
|
)
|
|
end
|
|
|
|
@impl true
|
|
def create(resource, changeset) do
|
|
changeset.data
|
|
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|
|
|> ecto_changeset(changeset, :create)
|
|
|> dynamic_repo(resource, changeset).insert(
|
|
repo_opts(changeset.timeout, changeset.tenant, changeset.resource)
|
|
)
|
|
|> from_ecto()
|
|
|> handle_errors()
|
|
|> case do
|
|
{:ok, result} ->
|
|
maybe_create_tenant!(resource, result)
|
|
|
|
{:ok, result}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
defp maybe_create_tenant!(resource, result) do
|
|
if AshPostgres.DataLayer.Info.manage_tenant_create?(resource) do
|
|
tenant_name = tenant_name(resource, result)
|
|
|
|
AshPostgres.MultiTenancy.create_tenant!(
|
|
tenant_name,
|
|
AshPostgres.DataLayer.Info.repo(resource)
|
|
)
|
|
else
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp maybe_update_tenant(resource, changeset, result) do
|
|
if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do
|
|
changing_tenant_name? =
|
|
resource
|
|
|> AshPostgres.DataLayer.Info.manage_tenant_template()
|
|
|> Enum.filter(&is_atom/1)
|
|
|> Enum.any?(&Ash.Changeset.changing_attribute?(changeset, &1))
|
|
|
|
if changing_tenant_name? do
|
|
old_tenant_name = tenant_name(resource, changeset.data)
|
|
|
|
new_tenant_name = tenant_name(resource, result)
|
|
|
|
AshPostgres.MultiTenancy.rename_tenant(
|
|
AshPostgres.DataLayer.Info.repo(resource),
|
|
old_tenant_name,
|
|
new_tenant_name
|
|
)
|
|
end
|
|
end
|
|
|
|
:ok
|
|
end
|
|
|
|
defp tenant_name(resource, result) do
|
|
resource
|
|
|> AshPostgres.DataLayer.Info.manage_tenant_template()
|
|
|> Enum.map_join(fn item ->
|
|
if is_binary(item) do
|
|
item
|
|
else
|
|
result
|
|
|> Map.get(item)
|
|
|> to_string()
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
|
|
{:error, Enum.map(errors, &to_ash_error/1)}
|
|
end
|
|
|
|
defp handle_errors({:ok, val}), do: {:ok, val}
|
|
|
|
defp to_ash_error({field, {message, vars}}) do
|
|
Ash.Error.Changes.InvalidAttribute.exception(
|
|
field: field,
|
|
message: message,
|
|
private_vars: vars
|
|
)
|
|
end
|
|
|
|
defp ecto_changeset(record, changeset, type) do
|
|
filters =
|
|
if changeset.action_type == :create do
|
|
%{}
|
|
else
|
|
Map.get(changeset, :filters, %{})
|
|
end
|
|
|
|
attributes =
|
|
changeset.resource
|
|
|> Ash.Resource.Info.attributes()
|
|
|> Enum.map(& &1.name)
|
|
|
|
ecto_changeset =
|
|
record
|
|
|> to_ecto()
|
|
|> set_table(changeset, type)
|
|
|> Ecto.Changeset.change(Map.take(changeset.attributes, attributes))
|
|
|> Map.update!(:filters, &Map.merge(&1, filters))
|
|
|> add_configured_foreign_key_constraints(record.__struct__)
|
|
|> add_unique_indexes(record.__struct__, changeset)
|
|
|> add_check_constraints(record.__struct__)
|
|
|> add_exclusion_constraints(record.__struct__)
|
|
|
|
case type do
|
|
:create ->
|
|
ecto_changeset
|
|
|> add_my_foreign_key_constraints(record.__struct__)
|
|
|
|
type when type in [:upsert, :update] ->
|
|
ecto_changeset
|
|
|> add_my_foreign_key_constraints(record.__struct__)
|
|
|> add_related_foreign_key_constraints(record.__struct__)
|
|
|
|
:delete ->
|
|
ecto_changeset
|
|
|> add_related_foreign_key_constraints(record.__struct__)
|
|
end
|
|
end
|
|
|
|
defp handle_raised_error(
|
|
%Ecto.StaleEntryError{changeset: %{data: %resource{}, filters: filters}},
|
|
stacktrace,
|
|
context,
|
|
resource
|
|
) do
|
|
handle_raised_error(
|
|
Ash.Error.Changes.StaleRecord.exception(resource: resource, filters: filters),
|
|
stacktrace,
|
|
context,
|
|
resource
|
|
)
|
|
end
|
|
|
|
defp handle_raised_error(
|
|
%Postgrex.Error{
|
|
postgres: %{
|
|
code: :lock_not_available,
|
|
message: message
|
|
}
|
|
},
|
|
stacktrace,
|
|
context,
|
|
resource
|
|
) do
|
|
handle_raised_error(
|
|
Ash.Error.Invalid.Unavailable.exception(
|
|
resource: resource,
|
|
source: inspect(context, pretty: true),
|
|
reason: message
|
|
),
|
|
stacktrace,
|
|
context,
|
|
resource
|
|
)
|
|
end
|
|
|
|
defp handle_raised_error(
|
|
%Postgrex.Error{} = error,
|
|
stacktrace,
|
|
{:bulk_create, fake_changeset},
|
|
_resource
|
|
) do
|
|
case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do
|
|
[] ->
|
|
{:error, Ash.Error.to_ash_error(error, stacktrace)}
|
|
|
|
constraints ->
|
|
{:error,
|
|
fake_changeset
|
|
|> constraints_to_errors(:insert, constraints)
|
|
|> Ash.Error.to_ash_error()}
|
|
end
|
|
end
|
|
|
|
defp handle_raised_error(%Ecto.Query.CastError{} = e, stacktrace, context, resource) do
|
|
handle_raised_error(
|
|
Ash.Error.Query.InvalidFilterValue.exception(value: e.value, context: context),
|
|
stacktrace,
|
|
context,
|
|
resource
|
|
)
|
|
end
|
|
|
|
defp handle_raised_error(error, stacktrace, _context, _resource) do
|
|
{:error, Ash.Error.to_ash_error(error, stacktrace)}
|
|
end
|
|
|
|
defp constraints_to_errors(%{constraints: user_constraints} = changeset, action, constraints) do
|
|
Enum.map(constraints, fn {type, constraint} ->
|
|
user_constraint =
|
|
Enum.find(user_constraints, fn c ->
|
|
case {c.type, c.constraint, c.match} do
|
|
{^type, ^constraint, :exact} -> true
|
|
{^type, cc, :suffix} -> String.ends_with?(constraint, cc)
|
|
{^type, cc, :prefix} -> String.starts_with?(constraint, cc)
|
|
{^type, %Regex{} = r, _match} -> Regex.match?(r, constraint)
|
|
_ -> false
|
|
end
|
|
end)
|
|
|
|
case user_constraint do
|
|
%{field: field, error_message: error_message, type: type, constraint: constraint} ->
|
|
Ash.Error.Changes.InvalidAttribute.exception(
|
|
field: field,
|
|
message: error_message,
|
|
private_vars: [
|
|
constraint: constraint,
|
|
constraint_type: type
|
|
]
|
|
)
|
|
|
|
nil ->
|
|
Ecto.ConstraintError.exception(
|
|
action: action,
|
|
type: type,
|
|
constraint: constraint,
|
|
changeset: changeset
|
|
)
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp set_table(record, changeset, operation) do
|
|
if AshPostgres.DataLayer.Info.polymorphic?(record.__struct__) do
|
|
table =
|
|
changeset.context[:data_layer][:table] ||
|
|
AshPostgres.DataLayer.Info.table(record.__struct__)
|
|
|
|
record =
|
|
if table do
|
|
Ecto.put_meta(record, source: table)
|
|
else
|
|
raise_table_error!(changeset.resource, operation)
|
|
end
|
|
|
|
prefix =
|
|
changeset.context[:data_layer][:schema] ||
|
|
AshPostgres.DataLayer.Info.schema(record.__struct__)
|
|
|
|
if prefix do
|
|
Ecto.put_meta(record, prefix: table)
|
|
else
|
|
record
|
|
end
|
|
else
|
|
record
|
|
end
|
|
end
|
|
|
|
def from_ecto({:ok, result}), do: {:ok, from_ecto(result)}
|
|
def from_ecto({:error, _} = other), do: other
|
|
|
|
def from_ecto(nil), do: nil
|
|
|
|
def from_ecto(value) when is_list(value) do
|
|
Enum.map(value, &from_ecto/1)
|
|
end
|
|
|
|
def from_ecto(%resource{} = record) do
|
|
if Spark.Dsl.is?(resource, Ash.Resource) do
|
|
empty = struct(resource)
|
|
|
|
resource
|
|
|> Ash.Resource.Info.relationships()
|
|
|> Enum.reduce(record, fn relationship, record ->
|
|
case Map.get(record, relationship.name) do
|
|
%Ecto.Association.NotLoaded{} ->
|
|
Map.put(record, relationship.name, Map.get(empty, relationship.name))
|
|
|
|
value ->
|
|
Map.put(record, relationship.name, from_ecto(value))
|
|
end
|
|
end)
|
|
else
|
|
record
|
|
end
|
|
end
|
|
|
|
def from_ecto(other), do: other
|
|
|
|
def to_ecto(nil), do: nil
|
|
|
|
def to_ecto(value) when is_list(value) do
|
|
Enum.map(value, &to_ecto/1)
|
|
end
|
|
|
|
def to_ecto(%resource{} = record) do
|
|
if Spark.Dsl.is?(resource, Ash.Resource) do
|
|
resource
|
|
|> Ash.Resource.Info.relationships()
|
|
|> Enum.reduce(record, fn relationship, record ->
|
|
value =
|
|
case Map.get(record, relationship.name) do
|
|
%Ash.NotLoaded{} ->
|
|
%Ecto.Association.NotLoaded{
|
|
__field__: relationship.name,
|
|
__cardinality__: relationship.cardinality
|
|
}
|
|
|
|
value ->
|
|
to_ecto(value)
|
|
end
|
|
|
|
Map.put(record, relationship.name, value)
|
|
end)
|
|
else
|
|
record
|
|
end
|
|
end
|
|
|
|
def to_ecto(other), do: other
|
|
|
|
defp add_check_constraints(changeset, resource) do
|
|
resource
|
|
|> AshPostgres.DataLayer.Info.check_constraints()
|
|
|> Enum.reduce(changeset, fn constraint, changeset ->
|
|
constraint.attribute
|
|
|> List.wrap()
|
|
|> Enum.reduce(changeset, fn attribute, changeset ->
|
|
Ecto.Changeset.check_constraint(changeset, attribute,
|
|
name: constraint.name,
|
|
message: constraint.message || "is invalid"
|
|
)
|
|
end)
|
|
end)
|
|
end
|
|
|
|
defp add_exclusion_constraints(changeset, resource) do
|
|
resource
|
|
|> AshPostgres.DataLayer.Info.exclusion_constraint_names()
|
|
|> Enum.reduce(changeset, fn constraint, changeset ->
|
|
case constraint do
|
|
{key, name} ->
|
|
Ecto.Changeset.exclusion_constraint(changeset, key, name: name)
|
|
|
|
{key, name, message} ->
|
|
Ecto.Changeset.exclusion_constraint(changeset, key, name: name, message: message)
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp add_related_foreign_key_constraints(changeset, resource) do
|
|
# TODO: this doesn't guarantee us to get all of them, because if something is related to this
|
|
# schema and there is no back-relation, then this won't catch it's foreign key constraints
|
|
resource
|
|
|> Ash.Resource.Info.relationships()
|
|
|> Enum.map(& &1.destination)
|
|
|> Enum.uniq()
|
|
|> Enum.flat_map(fn related ->
|
|
related
|
|
|> Ash.Resource.Info.relationships()
|
|
|> Enum.filter(&(&1.destination == resource))
|
|
|> Enum.map(&Map.take(&1, [:source, :source_attribute, :destination_attribute]))
|
|
end)
|
|
|> Enum.uniq()
|
|
|> Enum.reduce(changeset, fn %{
|
|
source: source,
|
|
source_attribute: source_attribute,
|
|
destination_attribute: destination_attribute
|
|
},
|
|
changeset ->
|
|
Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
|
|
name: "#{AshPostgres.DataLayer.Info.table(source)}_#{source_attribute}_fkey",
|
|
message: "would leave records behind"
|
|
)
|
|
end)
|
|
end
|
|
|
|
defp add_my_foreign_key_constraints(changeset, resource) do
|
|
resource
|
|
|> Ash.Resource.Info.relationships()
|
|
|> Enum.reduce(changeset, &Ecto.Changeset.foreign_key_constraint(&2, &1.source_attribute))
|
|
end
|
|
|
|
defp add_configured_foreign_key_constraints(changeset, resource) do
|
|
resource
|
|
|> AshPostgres.DataLayer.Info.foreign_key_names()
|
|
|> case do
|
|
{m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
|
|
value -> List.wrap(value)
|
|
end
|
|
|> Enum.reduce(changeset, fn
|
|
{key, name}, changeset ->
|
|
Ecto.Changeset.foreign_key_constraint(changeset, key, name: name)
|
|
|
|
{key, name, message}, changeset ->
|
|
Ecto.Changeset.foreign_key_constraint(changeset, key, name: name, message: message)
|
|
end)
|
|
end
|
|
|
|
defp add_unique_indexes(changeset, resource, ash_changeset) do
|
|
changeset =
|
|
resource
|
|
|> Ash.Resource.Info.identities()
|
|
|> Enum.reduce(changeset, fn identity, changeset ->
|
|
name =
|
|
AshPostgres.DataLayer.Info.identity_index_names(resource)[identity.name] ||
|
|
"#{table(resource, ash_changeset)}_#{identity.name}_index"
|
|
|
|
opts =
|
|
if Map.get(identity, :message) do
|
|
[name: name, message: identity.message]
|
|
else
|
|
[name: name]
|
|
end
|
|
|
|
Ecto.Changeset.unique_constraint(changeset, identity.keys, opts)
|
|
end)
|
|
|
|
changeset =
|
|
resource
|
|
|> AshPostgres.DataLayer.Info.custom_indexes()
|
|
|> Enum.reduce(changeset, fn index, changeset ->
|
|
opts =
|
|
if index.message do
|
|
[name: index.name, message: index.message]
|
|
else
|
|
[name: index.name]
|
|
end
|
|
|
|
Ecto.Changeset.unique_constraint(changeset, index.fields, opts)
|
|
end)
|
|
|
|
names =
|
|
resource
|
|
|> AshPostgres.DataLayer.Info.unique_index_names()
|
|
|> case do
|
|
{m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
|
|
value -> List.wrap(value)
|
|
end
|
|
|
|
names = [
|
|
{Ash.Resource.Info.primary_key(resource), table(resource, ash_changeset) <> "_pkey"} | names
|
|
]
|
|
|
|
Enum.reduce(names, changeset, fn
|
|
{keys, name}, changeset ->
|
|
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name)
|
|
|
|
{keys, name, message}, changeset ->
|
|
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name, message: message)
|
|
end)
|
|
end
|
|
|
|
@impl true
|
|
def upsert(resource, changeset, keys \\ nil) do
|
|
keys = keys || Ash.Resource.Info.primary_key(resource)
|
|
|
|
explicitly_changing_attributes =
|
|
Enum.map(
|
|
Map.keys(changeset.attributes) -- Map.get(changeset, :defaults, []) -- keys,
|
|
fn key ->
|
|
{key, Ash.Changeset.get_attribute(changeset, key)}
|
|
end
|
|
)
|
|
|
|
on_conflict =
|
|
changeset
|
|
|> update_defaults()
|
|
|> Keyword.merge(explicitly_changing_attributes)
|
|
|
|
conflict_target = conflict_target(resource, keys)
|
|
|
|
repo_opts =
|
|
changeset.timeout
|
|
|> repo_opts(changeset.tenant, changeset.resource)
|
|
|> Keyword.put(:on_conflict, set: on_conflict)
|
|
|> Keyword.put(:conflict_target, conflict_target)
|
|
|
|
if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do
|
|
{:error, "Cannot currently upsert a resource that owns a tenant"}
|
|
else
|
|
changeset.data
|
|
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|
|
|> ecto_changeset(changeset, :upsert)
|
|
|> AshPostgres.DataLayer.Info.repo(resource).insert(
|
|
Keyword.put(repo_opts, :returning, true)
|
|
)
|
|
|> from_ecto()
|
|
|> handle_errors()
|
|
end
|
|
end
|
|
|
|
defp conflict_target(resource, keys) do
|
|
if Ash.Resource.Info.base_filter(resource) do
|
|
base_filter_sql =
|
|
AshPostgres.DataLayer.Info.base_filter_sql(resource) ||
|
|
raise """
|
|
Cannot use upserts with resources that have a base_filter without also adding `base_filter_sql` in the postgres section.
|
|
"""
|
|
|
|
sources =
|
|
Enum.map(keys, fn key ->
|
|
~s("#{Ash.Resource.Info.attribute(resource, key).source || key}")
|
|
end)
|
|
|
|
{:unsafe_fragment, "(" <> Enum.join(sources, ", ") <> ") WHERE (#{base_filter_sql})"}
|
|
else
|
|
keys
|
|
end
|
|
end
|
|
|
|
defp update_defaults(changeset) do
|
|
attributes =
|
|
changeset.resource
|
|
|> Ash.Resource.Info.attributes()
|
|
|> Enum.reject(&is_nil(&1.update_default))
|
|
|
|
attributes
|
|
|> static_defaults()
|
|
|> Enum.concat(lazy_matching_defaults(attributes))
|
|
|> Enum.concat(lazy_non_matching_defaults(attributes))
|
|
end
|
|
|
|
defp static_defaults(attributes) do
|
|
attributes
|
|
|> Enum.reject(&get_default_fun(&1))
|
|
|> Enum.map(&{&1.name, &1.update_default})
|
|
end
|
|
|
|
defp lazy_non_matching_defaults(attributes) do
|
|
attributes
|
|
|> Enum.filter(&(!&1.match_other_defaults? && get_default_fun(&1)))
|
|
|> Enum.map(fn attribute ->
|
|
default_value =
|
|
case attribute.update_default do
|
|
function when is_function(function) ->
|
|
function.()
|
|
|
|
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
|
|
apply(m, f, a)
|
|
end
|
|
|
|
{attribute.name, default_value}
|
|
end)
|
|
end
|
|
|
|
defp lazy_matching_defaults(attributes) do
|
|
attributes
|
|
|> Enum.filter(&(&1.match_other_defaults? && get_default_fun(&1)))
|
|
|> Enum.group_by(& &1.update_default)
|
|
|> Enum.flat_map(fn {default_fun, attributes} ->
|
|
default_value =
|
|
case default_fun do
|
|
function when is_function(function) ->
|
|
function.()
|
|
|
|
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
|
|
apply(m, f, a)
|
|
end
|
|
|
|
Enum.map(attributes, &{&1.name, default_value})
|
|
end)
|
|
end
|
|
|
|
defp get_default_fun(attribute) do
|
|
if is_function(attribute.update_default) or match?({_, _, _}, attribute.update_default) do
|
|
attribute.update_default
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def update(resource, changeset) do
|
|
changeset.data
|
|
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|
|
|> ecto_changeset(changeset, :update)
|
|
|> dynamic_repo(resource, changeset).update(
|
|
repo_opts(changeset.timeout, changeset.tenant, changeset.resource)
|
|
)
|
|
|> from_ecto()
|
|
|> handle_errors()
|
|
|> case do
|
|
{:ok, result} ->
|
|
maybe_update_tenant(resource, changeset, result)
|
|
|
|
{:ok, result}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
rescue
|
|
e ->
|
|
handle_raised_error(e, __STACKTRACE__, changeset, resource)
|
|
end
|
|
|
|
@impl true
|
|
def destroy(resource, %{data: record} = changeset) do
|
|
record
|
|
|> ecto_changeset(changeset, :delete)
|
|
|> dynamic_repo(resource, changeset).delete(
|
|
repo_opts(changeset.timeout, changeset.tenant, changeset.resource)
|
|
)
|
|
|> from_ecto()
|
|
|> case do
|
|
{:ok, _record} ->
|
|
:ok
|
|
|
|
{:error, error} ->
|
|
handle_errors({:error, error})
|
|
end
|
|
rescue
|
|
e ->
|
|
handle_raised_error(e, __STACKTRACE__, changeset, resource)
|
|
end
|
|
|
|
@impl true
|
|
def lock(query, :for_update, _) do
|
|
{:ok, Ecto.Query.lock(query, [{^0, a}], fragment("FOR UPDATE OF ?", a))}
|
|
end
|
|
|
|
@locks [
|
|
"FOR UPDATE",
|
|
"FOR NO KEY UPDATE",
|
|
"FOR SHARE",
|
|
"FOR KEY SHARE"
|
|
]
|
|
|
|
for lock <- @locks do
|
|
frag = "#{lock} OF ?"
|
|
|
|
def lock(query, unquote(lock), _) do
|
|
{:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))}
|
|
end
|
|
|
|
frag = "#{lock} OF ? NOWAIT"
|
|
lock = "#{lock} NOWAIT"
|
|
|
|
def lock(query, unquote(lock), _) do
|
|
{:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def sort(query, sort, _resource) do
|
|
{:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :sort, sort))}
|
|
end
|
|
|
|
@impl true
|
|
def select(query, select, resource) do
|
|
query = default_bindings(query, resource)
|
|
|
|
{:ok,
|
|
from(row in query,
|
|
select: struct(row, ^Enum.uniq(select))
|
|
)}
|
|
end
|
|
|
|
@impl true
|
|
def distinct_sort(query, sort, _) when sort in [nil, []] do
|
|
{:ok, query}
|
|
end
|
|
|
|
def distinct_sort(query, sort, _) do
|
|
{:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :distinct_sort, sort))}
|
|
end
|
|
|
|
# If the order by does not match the initial sort clause, then we use a subquery
|
|
# to limit to only distinct rows. This may not perform that well, so we may need
|
|
# to come up with alternatives here.
|
|
@impl true
|
|
def distinct(query, empty, resource) when empty in [nil, []] do
|
|
query |> apply_sort(query.__ash_bindings__[:sort], resource)
|
|
end
|
|
|
|
def distinct(query, distinct_on, resource) do
|
|
case get_distinct_statement(query, distinct_on) do
|
|
{:ok, distinct_statement} ->
|
|
%{query | distinct: distinct_statement}
|
|
|> apply_sort(query.__ash_bindings__[:sort], resource)
|
|
|
|
{:error, distinct_statement} ->
|
|
query
|
|
|> Ecto.Query.exclude(:order_by)
|
|
|> default_bindings(resource)
|
|
|> Map.put(:distinct, distinct_statement)
|
|
|> apply_sort(
|
|
query.__ash_bindings__[:distinct_sort] || query.__ash_bindings__[:sort],
|
|
resource,
|
|
true
|
|
)
|
|
|> case do
|
|
{:ok, distinct_query} ->
|
|
on =
|
|
Enum.reduce(Ash.Resource.Info.primary_key(resource), nil, fn key, dynamic ->
|
|
if dynamic do
|
|
Ecto.Query.dynamic(
|
|
[row, distinct],
|
|
^dynamic and field(row, ^key) == field(distinct, ^key)
|
|
)
|
|
else
|
|
Ecto.Query.dynamic([row, distinct], field(row, ^key) == field(distinct, ^key))
|
|
end
|
|
end)
|
|
|
|
joined_query_source =
|
|
Enum.reduce(
|
|
[
|
|
:join,
|
|
:order_by,
|
|
:group_by,
|
|
:having,
|
|
:distinct,
|
|
:select,
|
|
:combinations,
|
|
:with_ctes,
|
|
:limit,
|
|
:offset,
|
|
:lock,
|
|
:preload,
|
|
:update
|
|
],
|
|
query,
|
|
&Ecto.Query.exclude(&2, &1)
|
|
)
|
|
|
|
joined_query =
|
|
from(row in joined_query_source,
|
|
join: distinct in subquery(distinct_query),
|
|
on: ^on
|
|
)
|
|
|
|
from([row, distinct] in joined_query,
|
|
select: distinct
|
|
)
|
|
|> default_bindings(resource)
|
|
|> apply_sort(query.__ash_bindings__[:sort], resource)
|
|
|> case do
|
|
{:ok, joined_query} ->
|
|
{:ok,
|
|
Map.update!(
|
|
joined_query,
|
|
:__ash_bindings__,
|
|
&Map.put(&1, :__order__?, query.__ash_bindings__[:__order__?] || false)
|
|
)}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
end
|
|
|
|
defp apply_sort(query, sort, resource, directly? \\ false)
|
|
|
|
defp apply_sort(query, sort, _resource, _) when sort in [nil, []] do
|
|
{:ok, query |> set_sort_applied()}
|
|
end
|
|
|
|
defp apply_sort(query, sort, resource, directly?) do
|
|
query
|
|
|> AshPostgres.Sort.sort(sort, resource, [], 0, directly?)
|
|
|> case do
|
|
{:ok, sort} when directly? ->
|
|
{:ok, query |> Ecto.Query.order_by(^sort) |> set_sort_applied()}
|
|
|
|
{:ok, query} ->
|
|
{:ok, query |> set_sort_applied()}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
defp set_sort_applied(query) do
|
|
Map.update!(query, :__ash_bindings__, &Map.put(&1, :sort_applied?, true))
|
|
end
|
|
|
|
defp get_distinct_statement(query, distinct_on) do
|
|
has_distinct_sort? = match?(%{__ash_bindings__: %{distinct_sort: _}}, query)
|
|
|
|
if has_distinct_sort? do
|
|
{:error, default_distinct_statement(query, distinct_on)}
|
|
else
|
|
sort = query.__ash_bindings__[:sort] || []
|
|
|
|
distinct =
|
|
query.distinct ||
|
|
%Ecto.Query.QueryExpr{
|
|
expr: [],
|
|
params: []
|
|
}
|
|
|
|
if sort == [] do
|
|
{:ok, default_distinct_statement(query, distinct_on)}
|
|
else
|
|
distinct_on
|
|
|> Enum.reduce_while({sort, [], [], Enum.count(distinct.params)}, fn
|
|
_, {[], _distinct_statement, _, _count} ->
|
|
{:halt, :error}
|
|
|
|
distinct_on, {[order_by | rest_order_by], distinct_statement, params, count} ->
|
|
case order_by do
|
|
{^distinct_on, order} ->
|
|
{distinct_expr, params, count} =
|
|
distinct_on_expr(query, distinct_on, params, count)
|
|
|
|
{:cont,
|
|
{rest_order_by, [{order, distinct_expr} | distinct_statement], params, count}}
|
|
|
|
_ ->
|
|
{:halt, :error}
|
|
end
|
|
end)
|
|
|> case do
|
|
:error ->
|
|
{:error, default_distinct_statement(query, distinct_on)}
|
|
|
|
{_, result, params, _} ->
|
|
{:ok,
|
|
%{
|
|
distinct
|
|
| expr: distinct.expr ++ Enum.reverse(result),
|
|
params: distinct.params ++ Enum.reverse(params)
|
|
}}
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
defp default_distinct_statement(query, distinct_on) do
|
|
distinct =
|
|
query.distinct ||
|
|
%Ecto.Query.QueryExpr{
|
|
expr: []
|
|
}
|
|
|
|
{expr, params, _} =
|
|
Enum.reduce(distinct_on, {[], [], Enum.count(distinct.params)}, fn
|
|
{distinct_on_field, order}, {expr, params, count} ->
|
|
{distinct_expr, params, count} =
|
|
distinct_on_expr(query, distinct_on_field, params, count)
|
|
|
|
{[{order, distinct_expr} | expr], params, count}
|
|
|
|
distinct_on_field, {expr, params, count} ->
|
|
{distinct_expr, params, count} =
|
|
distinct_on_expr(query, distinct_on_field, params, count)
|
|
|
|
{[{:asc, distinct_expr} | expr], params, count}
|
|
end)
|
|
|
|
%{
|
|
distinct
|
|
| expr: distinct.expr ++ Enum.reverse(expr),
|
|
params: distinct.params ++ Enum.reverse(params)
|
|
}
|
|
end
|
|
|
|
defp distinct_on_expr(query, field, params, count) do
|
|
resource = query.__ash_bindings__.resource
|
|
|
|
ref =
|
|
case field do
|
|
%Ash.Query.Calculation{} = calc ->
|
|
%Ref{attribute: calc, relationship_path: [], resource: resource}
|
|
|
|
field ->
|
|
%Ref{
|
|
attribute: Ash.Resource.Info.field(resource, field),
|
|
relationship_path: [],
|
|
resource: resource
|
|
}
|
|
end
|
|
|
|
dynamic = AshPostgres.Expr.dynamic_expr(query, ref, query.__ash_bindings__)
|
|
|
|
result =
|
|
Ecto.Query.Builder.Dynamic.partially_expand(
|
|
:distinct,
|
|
query,
|
|
dynamic,
|
|
params,
|
|
count
|
|
)
|
|
|
|
expr = elem(result, 0)
|
|
new_params = elem(result, 1)
|
|
new_count = result |> Tuple.to_list() |> List.last()
|
|
|
|
{expr, new_params, new_count}
|
|
end
|
|
|
|
@impl true
|
|
def filter(query, filter, resource, opts \\ []) do
|
|
query = default_bindings(query, resource)
|
|
|
|
used_calculations =
|
|
Ash.Filter.used_calculations(
|
|
filter,
|
|
resource
|
|
)
|
|
|
|
used_aggregates =
|
|
filter
|
|
|> AshPostgres.Aggregate.used_aggregates(
|
|
resource,
|
|
used_calculations,
|
|
[]
|
|
)
|
|
|> Enum.map(fn aggregate ->
|
|
%{aggregate | load: aggregate.name}
|
|
end)
|
|
|
|
query
|
|
|> AshPostgres.Join.join_all_relationships(filter, opts)
|
|
|> case do
|
|
{:ok, query} ->
|
|
query
|
|
|> AshPostgres.Aggregate.add_aggregates(used_aggregates, resource, false, 0)
|
|
|> case do
|
|
{:ok, query} ->
|
|
{:ok, add_filter_expression(query, filter)}
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
|
|
{:error, error} ->
|
|
{:error, error}
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
def default_bindings(query, resource, context \\ %{}) do
|
|
start_bindings = context[:data_layer][:start_bindings_at] || 0
|
|
|
|
Map.put_new(query, :__ash_bindings__, %{
|
|
resource: resource,
|
|
current: Enum.count(query.joins) + 1 + start_bindings,
|
|
in_group?: false,
|
|
calculations: %{},
|
|
parent_resources: [],
|
|
aggregate_defs: %{},
|
|
context: context,
|
|
bindings: %{start_bindings => %{path: [], type: :root, source: resource}}
|
|
})
|
|
end
|
|
|
|
@impl true
|
|
def add_aggregates(query, aggregates, resource) do
|
|
AshPostgres.Aggregate.add_aggregates(query, aggregates, resource, true, 0)
|
|
end
|
|
|
|
@impl true
|
|
def add_calculations(query, calculations, resource) do
|
|
AshPostgres.Calculation.add_calculations(query, calculations, resource, 0)
|
|
end
|
|
|
|
@doc false
|
|
def get_binding(resource, path, query, type, name_match \\ nil)
|
|
|
|
def get_binding(resource, path, %{__ash_bindings__: _} = query, type, name_match) do
|
|
types = List.wrap(type)
|
|
|
|
Enum.find_value(query.__ash_bindings__.bindings, fn
|
|
{binding, %{path: candidate_path, type: binding_type} = data} ->
|
|
if binding_type in types do
|
|
if name_match do
|
|
if data[:name] == name_match do
|
|
if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do
|
|
binding
|
|
end
|
|
end
|
|
else
|
|
if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do
|
|
binding
|
|
else
|
|
false
|
|
end
|
|
end
|
|
end
|
|
|
|
_ ->
|
|
nil
|
|
end)
|
|
end
|
|
|
|
def get_binding(_, _, _, _, _), do: nil
|
|
|
|
defp add_filter_expression(query, filter) do
|
|
filter
|
|
|> split_and_statements()
|
|
|> Enum.reduce(query, fn filter, query ->
|
|
dynamic = AshPostgres.Expr.dynamic_expr(query, filter, query.__ash_bindings__)
|
|
|
|
Ecto.Query.where(query, ^dynamic)
|
|
end)
|
|
end
|
|
|
|
defp split_and_statements(%Filter{expression: expression}) do
|
|
split_and_statements(expression)
|
|
end
|
|
|
|
defp split_and_statements(%BooleanExpression{op: :and, left: left, right: right}) do
|
|
split_and_statements(left) ++ split_and_statements(right)
|
|
end
|
|
|
|
defp split_and_statements(%Not{expression: %Not{expression: expression}}) do
|
|
split_and_statements(expression)
|
|
end
|
|
|
|
defp split_and_statements(%Not{
|
|
expression: %BooleanExpression{op: :or, left: left, right: right}
|
|
}) do
|
|
split_and_statements(%BooleanExpression{
|
|
op: :and,
|
|
left: %Not{expression: left},
|
|
right: %Not{expression: right}
|
|
})
|
|
end
|
|
|
|
defp split_and_statements(other), do: [other]
|
|
|
|
@doc false
|
|
def add_binding(query, data, additional_bindings \\ 0) do
|
|
current = query.__ash_bindings__.current
|
|
bindings = query.__ash_bindings__.bindings
|
|
|
|
new_ash_bindings = %{
|
|
query.__ash_bindings__
|
|
| bindings: Map.put(bindings, current, data),
|
|
current: current + 1 + additional_bindings
|
|
}
|
|
|
|
%{query | __ash_bindings__: new_ash_bindings}
|
|
end
|
|
|
|
def add_known_binding(query, data, known_binding) do
|
|
bindings = query.__ash_bindings__.bindings
|
|
|
|
new_ash_bindings = %{
|
|
query.__ash_bindings__
|
|
| bindings: Map.put(bindings, known_binding, data)
|
|
}
|
|
|
|
%{query | __ash_bindings__: new_ash_bindings}
|
|
end
|
|
|
|
@impl true
|
|
def transaction(resource, func, timeout \\ nil, reason \\ %{type: :custom, metadata: %{}}) do
|
|
func = fn ->
|
|
AshPostgres.DataLayer.Info.repo(resource).on_transaction_begin(reason)
|
|
func.()
|
|
end
|
|
|
|
if timeout do
|
|
AshPostgres.DataLayer.Info.repo(resource).transaction(func, timeout: timeout)
|
|
else
|
|
AshPostgres.DataLayer.Info.repo(resource).transaction(func)
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def rollback(resource, term) do
|
|
AshPostgres.DataLayer.Info.repo(resource).rollback(term)
|
|
end
|
|
|
|
defp table(resource, changeset) do
|
|
changeset.context[:data_layer][:table] || AshPostgres.DataLayer.Info.table(resource)
|
|
end
|
|
|
|
defp raise_table_error!(resource, operation) do
|
|
if AshPostgres.DataLayer.Info.polymorphic?(resource) do
|
|
raise """
|
|
Could not determine table for #{operation} on #{inspect(resource)}.
|
|
|
|
Polymorphic resources require that the `data_layer[:table]` context is provided.
|
|
See the guide on polymorphic resources for more information.
|
|
"""
|
|
else
|
|
raise """
|
|
Could not determine table for #{operation} on #{inspect(resource)}.
|
|
"""
|
|
end
|
|
end
|
|
|
|
defp dynamic_repo(resource, %{__ash_bindings__: %{context: %{data_layer: %{repo: repo}}}}) do
|
|
repo || AshPostgres.DataLayer.Info.repo(resource)
|
|
end
|
|
|
|
defp dynamic_repo(resource, %{context: %{data_layer: %{repo: repo}}}) do
|
|
repo || AshPostgres.DataLayer.Info.repo(resource)
|
|
end
|
|
|
|
defp dynamic_repo(resource, _) do
|
|
AshPostgres.DataLayer.Info.repo(resource)
|
|
end
|
|
end
|