ash_postgres/lib/data_layer.ex

2769 lines
79 KiB
Elixir
Raw Normal View History

defmodule AshPostgres.DataLayer do
2022-08-19 06:56:36 +12:00
@manage_tenant %Spark.Dsl.Section{
2020-10-29 15:26:45 +13:00
name: :manage_tenant,
describe: """
Configuration for the behavior of a resource that manages a tenant
""",
2020-12-27 19:20:12 +13:00
examples: [
"""
manage_tenant do
template ["organization_", :id]
create? true
update? false
end
"""
],
2020-10-29 15:26:45 +13:00
schema: [
template: [
type: {:wrap_list, {:or, [:string, :atom]}},
2020-10-29 15:26:45 +13:00
required: true,
doc: """
A template that will cause the resource to create/manage the specified schema.
"""
],
create?: [
type: :boolean,
default: true,
doc: "Whether or not to automatically create a tenant when a record is created"
],
update?: [
type: :boolean,
default: true,
doc: "Whether or not to automatically update the tenant name if the record is udpated"
]
]
}
2020-10-29 16:53:28 +13:00
2022-08-19 06:56:36 +12:00
@index %Spark.Dsl.Entity{
2021-09-21 08:38:36 +12:00
name: :index,
describe: """
Add an index to be managed by the migration generator.
""",
examples: [
"index [\"column\", \"column2\"], unique: true, where: \"thing = TRUE\""
],
target: AshPostgres.CustomIndex,
schema: AshPostgres.CustomIndex.schema(),
transform: {AshPostgres.CustomIndex, :transform, []},
2021-09-21 08:38:36 +12:00
args: [:fields]
}
2022-08-19 06:56:36 +12:00
@custom_indexes %Spark.Dsl.Section{
2021-09-21 08:38:36 +12:00
name: :custom_indexes,
describe: """
A section for configuring indexes to be created by the migration generator.
In general, prefer to use `identities` for simple unique constraints. This is a tool to allow
for declaring more complex indexes.
""",
examples: [
"""
custom_indexes do
index [:column1, :column2], unique: true, where: "thing = TRUE"
end
"""
],
entities: [
@index
]
}
2022-08-19 06:56:36 +12:00
@statement %Spark.Dsl.Entity{
name: :statement,
describe: """
Add a custom statement for migrations.
""",
examples: [
"""
statement :pgweb_idx do
up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
down "DROP INDEX pgweb_idx;"
end
"""
],
target: AshPostgres.Statement,
schema: AshPostgres.Statement.schema(),
args: [:name]
}
2022-08-19 06:56:36 +12:00
@custom_statements %Spark.Dsl.Section{
name: :custom_statements,
describe: """
A section for configuring custom statements to be added to migrations.
Changing custom statements may require manual intervention, because Ash can't determine what order they should run
in (i.e if they depend on table structure that you've added, or vice versa). As such, any `down` statements we run
for custom statements happen first, and any `up` statements happen last.
Additionally, when changing a custom statement, we must make some assumptions, i.e that we should migrate
the old structure down using the previously configured `down` and recreate it.
This may not be desired, and so what you may end up doing is simply modifying the old migration and deleting whatever was
generated by the migration generator. As always: read your migrations after generating them!
""",
examples: [
"""
custom_statements do
# the name is used to detect if you remove or modify the statement
2023-03-24 10:19:45 +13:00
statement :pgweb_idx do
up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
down "DROP INDEX pgweb_idx;"
end
end
"""
],
entities: [
@statement
]
}
2022-08-19 06:56:36 +12:00
@reference %Spark.Dsl.Entity{
name: :reference,
describe: """
Configures the reference for a relationship in resource migrations.
Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys.
In those cases, you only need to configure the `reference` behavior for one of them. Any conflicts will result
in an error, across this resource and any other resources that share a table with this one. For this reason,
instead of adding a reference configuration for `:nothing`, its best to just leave the configuration out, as that
is the default behavior if *no* relationship anywhere has configured the behavior of that reference.
""",
examples: [
"reference :post, on_delete: :delete, on_update: :update, name: \"comments_to_posts_fkey\""
],
args: [:relationship],
target: AshPostgres.Reference,
schema: AshPostgres.Reference.schema()
}
2022-08-19 06:56:36 +12:00
@references %Spark.Dsl.Section{
name: :references,
describe: """
A section for configuring the references (foreign keys) in resource migrations.
This section is only relevant if you are using the migration generator with this resource.
Otherwise, it has no effect.
""",
examples: [
"""
references do
reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
end
"""
],
entities: [@reference],
schema: [
polymorphic_on_delete: [
type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
],
polymorphic_on_update: [
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
],
polymorphic_name: [
type: :string,
doc:
"For polymorphic resources, then index name to use for the foreign key to the source table."
]
]
}
2022-08-19 06:56:36 +12:00
@check_constraint %Spark.Dsl.Entity{
name: :check_constraint,
describe: """
Add a check constraint to be validated.
If a check constraint exists on the table but not in this section, and it produces an error, a runtime error will be raised.
Provide a list of attributes instead of a single attribute to add the message to multiple attributes.
By adding the `check` option, the migration generator will include it when generating migrations.
""",
examples: [
"""
check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive"
"""
],
args: [:attribute, :name],
target: AshPostgres.CheckConstraint,
schema: AshPostgres.CheckConstraint.schema()
}
2022-08-19 06:56:36 +12:00
@check_constraints %Spark.Dsl.Section{
name: :check_constraints,
describe: """
A section for configuring the check constraints for a given table.
This can be used to automatically create those check constraints, or just to provide message when they are raised
""",
examples: [
"""
check_constraints do
check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive"
end
"""
],
entities: [@check_constraint]
}
2022-08-19 06:56:36 +12:00
@references %Spark.Dsl.Section{
name: :references,
describe: """
A section for configuring the references (foreign keys) in resource migrations.
This section is only relevant if you are using the migration generator with this resource.
Otherwise, it has no effect.
""",
examples: [
"""
references do
reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
end
"""
],
entities: [@reference],
schema: [
polymorphic_on_delete: [
type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
],
polymorphic_on_update: [
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
]
]
}
2022-08-19 06:56:36 +12:00
@postgres %Spark.Dsl.Section{
name: :postgres,
describe: """
Postgres data layer configuration
""",
2020-10-29 15:26:45 +13:00
sections: [
2021-09-21 08:38:36 +12:00
@custom_indexes,
@custom_statements,
@manage_tenant,
@references,
@check_constraints
2020-10-29 15:26:45 +13:00
],
modules: [
:repo
],
2020-12-27 19:20:12 +13:00
examples: [
"""
postgres do
repo MyApp.Repo
table "organizations"
end
"""
],
schema: [
repo: [
type: {:or, [{:behaviour, Ecto.Repo}, {:fun, 2}]},
required: true,
doc:
"The repo that will be used to fetch your data. See the `AshPostgres.Repo` documentation for more. Can also be a function that takes a resource and a type `:read | :mutate` and returns the repo"
],
migrate?: [
type: :boolean,
default: true,
doc:
"Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`"
],
migration_types: [
type: :keyword_list,
default: [],
doc:
"A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults."
],
migration_defaults: [
type: :keyword_list,
default: [],
doc: """
2023-09-23 08:14:25 +12:00
A keyword list of attribute names to the ecto migration default that should be used for that attribute. The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\\\"now()\\\\")`, or for `nil`, use `\\\\"nil\\\\"`.
"""
],
2020-09-20 10:08:09 +12:00
base_filter_sql: [
type: :string,
doc:
"A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter"
],
simple_join_first_aggregates: [
type: {:list, :atom},
default: [],
doc: """
2023-09-23 08:14:25 +12:00
A list of `:first` type aggregate names that can be joined to using a simple join. Use when you have a `:first` aggregate that uses a to-many relationship , but your `filter` statement ensures that there is only one result. Optimizes the generated query.
"""
],
2020-09-20 10:08:09 +12:00
skip_unique_indexes: [
type: {:wrap_list, :atom},
2020-09-20 10:08:09 +12:00
default: false,
doc: "Skip generating unique indexes when generating migrations"
],
unique_index_names: [
2023-09-23 09:16:47 +12:00
type:
{:list,
{:or,
[{:tuple, [{:list, :atom}, :string]}, {:tuple, [{:list, :atom}, :string, :string]}]}},
default: [],
doc: """
2023-09-23 08:14:25 +12:00
A list of unique index names that could raise errors that are not configured in identities, or an mfa to a function that takes a changeset and returns the list. In the format `{[:affected, :keys], "name_of_constraint"}` or `{[:affected, :keys], "name_of_constraint", "custom error message"}`
"""
],
exclusion_constraint_names: [
type: :any,
default: [],
doc: """
A list of exclusion constraint names that could raise errors. Must be in the format `{:affected_key, "name_of_constraint"}` or `{:affected_key, "name_of_constraint", "custom error message"}`
"""
],
identity_index_names: [
type: :any,
default: [],
doc: """
2023-09-23 08:14:25 +12:00
A keyword list of identity names to the unique index name that they should use when being managed by the migration generator.
"""
],
foreign_key_names: [
type:
{:list,
{:or,
[
{:tuple, [{:or, [:atom, :string]}, :string]},
{:tuple, [{:or, [:atom, :string]}, :string, :string]}
]}},
default: [],
doc: """
2023-09-23 08:14:25 +12:00
A list of foreign keys that could raise errors, or an mfa to a function that takes a changeset and returns a list. In the format: `{:key, "name_of_constraint"}` or `{:key, "name_of_constraint", "custom error message"}`
"""
],
migration_ignore_attributes: [
type: {:list, :atom},
default: [],
doc: """
A list of attributes that will be ignored when generating migrations.
"""
],
table: [
type: :string,
doc: """
2023-09-23 08:14:25 +12:00
The table to store and read the resource from. If this is changed, the migration generator will not remove the old table.
"""
],
schema: [
type: :string,
doc: """
2023-09-23 08:14:25 +12:00
The schema that the table is located in. Schema-based multitenancy will supercede this option. If this is changed, the migration generator will not remove the old schema.
"""
],
polymorphic?: [
type: :boolean,
default: false,
doc: """
Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/resources/polymorphic-resources.md) for more.
"""
]
]
}
@behaviour Ash.DataLayer
2020-12-27 19:20:12 +13:00
@sections [@postgres]
@moduledoc """
2022-03-29 15:30:27 +13:00
A postgres data layer that leverages Ecto's postgres capabilities.
2020-12-27 19:20:12 +13:00
"""
2022-08-19 06:56:36 +12:00
use Spark.Dsl.Extension,
2020-12-27 19:20:12 +13:00
sections: @sections,
verifiers: [
AshPostgres.Verifiers.PreventMultidimensionalArrayAggregates,
AshPostgres.Verifiers.ValidateReferences,
AshPostgres.Verifiers.PreventAttributeMultitenancyAndNonFullMatchType,
AshPostgres.Verifiers.EnsureTableOrPolymorphic
]
def migrate(args) do
Mix.Task.run("ash_postgres.migrate", args)
end
def rollback(args) do
repos = AshPostgres.Mix.Helpers.repos!([], args)
show_for_repo? = Enum.count_until(repos, 2) == 2
for repo <- repos do
for_repo =
if show_for_repo? do
" for repo #{inspect(repo)}"
else
""
end
migrations_path = AshPostgres.Mix.Helpers.migrations_path([], repo)
tenant_migrations_path = AshPostgres.Mix.Helpers.tenant_migrations_path([], repo)
files =
migrations_path
|> Path.join("**/*.exs")
|> Path.wildcard()
|> Enum.sort()
|> Enum.reverse()
|> Enum.take(20)
|> Enum.map(&String.trim_leading(&1, migrations_path))
|> Enum.with_index()
|> Enum.map(fn {file, index} -> "#{index + 1}: #{file}" end)
n =
2024-04-10 12:49:39 +12:00
Mix.shell().prompt(
"""
How many migrations should be rolled back#{for_repo}? (default: 0)
2024-04-10 12:49:39 +12:00
Last 20 migration names, with the input you must provide to
rollback up to *and including* that migration:
2024-04-10 12:49:39 +12:00
#{Enum.join(files, "\n")}
Rollback to:
"""
|> String.trim_trailing()
)
|> String.trim()
|> case do
"" ->
0
n ->
try do
String.to_integer(n)
rescue
_ ->
2024-04-12 03:39:49 +12:00
reraise "Required an integer value, got: #{n}", __STACKTRACE__
end
end
Mix.Task.run("ash_postgres.rollback", args ++ ["-r", inspect(repo), "-n", to_string(n)])
Mix.Task.reenable("ash_postgres.rollback")
tenant_files =
tenant_migrations_path
|> Path.join("**/*.exs")
|> Path.wildcard()
|> Enum.sort()
|> Enum.reverse()
|> Enum.take(20)
|> Enum.map(&String.trim_leading(&1, tenant_migrations_path))
|> Enum.with_index()
|> Enum.map(fn {file, index} -> "#{index + 1}: #{file}" end)
if !Enum.empty?(tenant_files) do
n =
Mix.shell().prompt(
"""
How many _tenant_ migrations should be rolled back#{for_repo}? (default: 0)
Last 20 migration names, with the input you must provide to
rollback up to *and including* that migration:
#{Enum.join(tenant_files, "\n")}
Rollback to:
"""
|> String.trim_trailing()
)
|> String.trim()
|> case do
"" ->
0
n ->
try do
String.to_integer(n)
rescue
_ ->
2024-04-12 03:39:49 +12:00
reraise "Required an integer value, got: #{n}", __STACKTRACE__
end
end
2024-04-10 12:49:39 +12:00
Mix.Task.run(
"ash_postgres.rollback",
args ++ ["--tenants", "-r", inspect(repo), "-n", to_string(n)]
)
Mix.Task.reenable("ash_postgres.rollback")
end
end
end
def codegen(args) do
{args, _, _} = OptionParser.parse(args, strict: [name: :string])
args =
if args[:name] do
["--name", to_string(args[:name])]
else
[]
end
Mix.Task.run("ash_postgres.generate_migrations", args)
end
def setup(args) do
# TODO: take args that we care about
Mix.Task.run("ash_postgres.create", args)
Mix.Task.run("ash_postgres.migrate", args)
2024-04-10 12:49:39 +12:00
[]
|> AshPostgres.Mix.Helpers.repos!(args)
|> Enum.all?(fn repo ->
[]
|> AshPostgres.Mix.Helpers.tenant_migrations_path(repo)
|> Path.join("**/*.exs")
|> Path.wildcard()
|> Enum.empty?()
end)
|> case do
true ->
:ok
_ ->
Mix.Task.run("ash_postgres.migrate", ["--tenant" | args])
end
end
def tear_down(args) do
# TODO: take args that we care about
Mix.Task.run("ash_postgres.drop", args)
end
2020-07-23 17:13:47 +12:00
import Ecto.Query, only: [from: 2, subquery: 1]
@impl true
2020-06-19 15:04:41 +12:00
def can?(_, :async_engine), do: true
2023-05-03 14:16:21 +12:00
def can?(_, :bulk_create), do: true
def can?(resource, :update_query) do
# We can't currently support updating a record from a query
# if that record manages a tenant on update
!AshPostgres.DataLayer.Info.manage_tenant_update?(resource)
end
def can?(_, :destroy_query), do: true
2023-04-28 13:49:13 +12:00
def can?(_, {:lock, :for_update}), do: true
def can?(_, :composite_types), do: true
2023-05-17 07:14:45 +12:00
def can?(_, {:lock, string}) do
string = String.trim_trailing(string, " NOWAIT")
String.upcase(string) in [
"FOR UPDATE",
"FOR NO KEY UPDATE",
"FOR SHARE",
"FOR KEY SHARE"
2023-05-17 07:14:45 +12:00
]
end
2020-06-19 15:04:41 +12:00
def can?(_, :transact), do: true
def can?(_, :composite_primary_key), do: true
def can?(_resource, {:atomic, :update}), do: true
def can?(_resource, {:atomic, :upsert}), do: true
2020-06-19 15:04:41 +12:00
def can?(_, :upsert), do: true
def can?(_, :changeset_filter), do: true
2020-08-26 16:28:55 +12:00
def can?(resource, {:join, other_resource}) do
2021-02-23 17:53:18 +13:00
data_layer = Ash.DataLayer.data_layer(resource)
other_data_layer = Ash.DataLayer.data_layer(other_resource)
2022-08-24 11:56:46 +12:00
data_layer == other_data_layer and
AshPostgres.DataLayer.Info.repo(resource, :read) ==
AshPostgres.DataLayer.Info.repo(other_resource, :read)
2020-08-26 16:28:55 +12:00
end
def can?(resource, {:lateral_join, resources}) do
repo = AshPostgres.DataLayer.Info.repo(resource, :read)
2021-02-23 17:53:18 +13:00
data_layer = Ash.DataLayer.data_layer(resource)
data_layer == __MODULE__ &&
Enum.all?(resources, fn resource ->
2022-08-24 11:56:46 +12:00
Ash.DataLayer.data_layer(resource) == data_layer &&
AshPostgres.DataLayer.Info.repo(resource, :read) == repo
end)
2020-08-26 16:28:55 +12:00
end
2020-06-29 14:29:38 +12:00
def can?(_, :boolean_filter), do: true
def can?(_, {:aggregate, type})
when type in [:count, :sum, :first, :list, :avg, :max, :min, :exists, :custom],
do: true
2020-07-23 17:13:47 +12:00
def can?(_, :aggregate_filter), do: true
def can?(_, :aggregate_sort), do: true
def can?(_, :expression_calculation), do: true
2021-06-06 10:13:20 +12:00
def can?(_, :expression_calculation_sort), do: true
2020-08-19 16:52:23 +12:00
def can?(_, :create), do: true
def can?(_, :select), do: true
2020-08-19 16:52:23 +12:00
def can?(_, :read), do: true
def can?(resource, action) when action in ~w[update destroy]a do
resource
|> Ash.Resource.Info.primary_key()
|> Enum.any?()
end
2020-08-19 16:52:23 +12:00
def can?(_, :filter), do: true
def can?(_, :limit), do: true
def can?(_, :offset), do: true
2020-10-29 15:26:45 +13:00
def can?(_, :multitenancy), do: true
def can?(_, {:filter_relationship, %{manual: {module, _}}}) do
Spark.implements_behaviour?(module, AshPostgres.ManualRelationship)
end
def can?(_, {:filter_relationship, _}), do: true
def can?(_, {:aggregate_relationship, %{manual: {module, _}}}) do
Spark.implements_behaviour?(module, AshPostgres.ManualRelationship)
end
def can?(_, {:aggregate_relationship, _}), do: true
def can?(_, :timeout), do: true
2023-12-16 12:11:18 +13:00
def can?(_, :expr_error), do: true
def can?(resource, {:filter_expr, %Ash.Query.Function.Error{}}) do
"ash-functions" in AshPostgres.DataLayer.Info.repo(resource, :read).installed_extensions() &&
"ash-functions" in AshPostgres.DataLayer.Info.repo(resource, :mutate).installed_extensions()
end
def can?(_, {:filter_expr, _}), do: true
def can?(_, :nested_expressions), do: true
def can?(_, {:query_aggregate, _}), do: true
2020-08-19 17:18:52 +12:00
def can?(_, :sort), do: true
def can?(_, :distinct_sort), do: true
def can?(_, :distinct), do: true
2020-07-23 17:13:47 +12:00
def can?(_, {:sort, _}), do: true
2020-08-17 18:46:59 +12:00
def can?(_, _), do: false
@impl true
def in_transaction?(resource) do
AshPostgres.DataLayer.Info.repo(resource, :mutate).in_transaction?()
end
@impl true
def limit(query, nil, _), do: {:ok, query}
def limit(query, limit, _resource) do
{:ok, from(row in query, limit: ^limit)}
end
2020-07-08 12:01:01 +12:00
@impl true
def source(resource) do
2022-08-24 11:56:46 +12:00
AshPostgres.DataLayer.Info.table(resource) || ""
end
@impl true
def set_context(resource, data_layer_query, context) do
AshSql.Query.set_context(resource, data_layer_query, AshPostgres.SqlImplementation, context)
end
@impl true
def offset(query, nil, _), do: query
2020-09-20 10:08:09 +12:00
def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do
{:ok, query}
end
def offset(query, offset, _resource) do
{:ok, from(row in query, offset: ^offset)}
end
@impl true
def return_query(query, resource) do
AshSql.Query.return_query(query, resource)
end
@impl true
def run_query(query, resource) do
query = AshSql.Bindings.default_bindings(query, resource, AshPostgres.SqlImplementation)
if AshPostgres.DataLayer.Info.polymorphic?(resource) && no_table?(query) do
raise_table_error!(resource, :read)
else
repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, query)
with_savepoint(repo, query, fn ->
{:ok,
repo.all(
query,
AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, nil, resource)
)
|> remap_mapped_fields(query)}
end)
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, query, resource)
2020-10-29 15:26:45 +13:00
end
defp no_table?(%{from: %{source: {"", _}}}), do: true
defp no_table?(_), do: false
@impl true
def functions(resource) do
config = AshPostgres.DataLayer.Info.repo(resource, :mutate).config()
2022-12-22 10:12:49 +13:00
functions = [
AshPostgres.Functions.Like,
AshPostgres.Functions.ILike
]
functions =
if "pg_trgm" in (config[:installed_extensions] || []) do
functions ++
[
AshPostgres.Functions.TrigramSimilarity
]
else
functions
end
if "vector" in (config[:installed_extensions] || []) do
functions ++
[
AshPostgres.Functions.VectorCosineDistance
]
else
functions
end
end
2020-10-18 12:13:51 +13:00
@impl true
def run_aggregate_query(original_query, aggregates, resource) do
AshSql.AggregateQuery.run_aggregate_query(
original_query,
aggregates,
resource,
AshPostgres.SqlImplementation
)
2020-10-29 15:26:45 +13:00
end
@impl true
def set_tenant(_resource, query, tenant) do
{:ok, Map.put(Ecto.Query.put_query_prefix(query, to_string(tenant)), :__tenant__, tenant)}
2020-10-18 12:13:51 +13:00
end
@impl true
def run_aggregate_query_with_lateral_join(
query,
aggregates,
root_data,
destination_resource,
path
2020-10-18 12:13:51 +13:00
) do
{can_group, cant_group} =
aggregates
|> Enum.split_with(&AshSql.Aggregate.can_group?(destination_resource, &1, query))
|> case do
{[one], cant_group} -> {[], [one | cant_group]}
{can_group, cant_group} -> {can_group, cant_group}
end
2021-05-07 19:09:49 +12:00
case lateral_join_query(
query,
root_data,
path
) do
{:ok, lateral_join_query} ->
source_resource =
path
|> Enum.at(0)
|> elem(0)
|> Map.get(:resource)
2021-05-07 19:09:49 +12:00
subquery = from(row in subquery(lateral_join_query), as: ^0, select: %{})
subquery =
AshSql.Bindings.default_bindings(
subquery,
source_resource,
AshPostgres.SqlImplementation
)
2021-05-07 19:09:49 +12:00
{global_filter, can_group} =
AshSql.Aggregate.extract_shared_filters(can_group)
original_subquery = subquery
subquery =
case global_filter do
{:ok, global_filter} ->
filter(subquery, global_filter, destination_resource)
:error ->
{:ok, subquery}
end
case subquery do
{:error, error} ->
{:error, error}
{:ok, subquery} ->
query =
Enum.reduce(
can_group,
subquery,
fn agg, subquery ->
has_exists? =
Ash.Filter.find(agg.query && agg.query.filter, fn
%Ash.Query.Exists{} -> true
_ -> false
end)
first_relationship =
Ash.Resource.Info.relationship(
source_resource,
agg.relationship_path |> Enum.at(0)
)
AshSql.Aggregate.add_subquery_aggregate_select(
subquery,
agg.relationship_path |> Enum.drop(1),
agg,
destination_resource,
has_exists?,
first_relationship
)
end
)
2020-10-18 12:13:51 +13:00
result =
case can_group do
[] ->
%{}
_ ->
repo =
AshSql.dynamic_repo(source_resource, AshPostgres.SqlImplementation, query)
repo.one(
query,
AshSql.repo_opts(
repo,
AshPostgres.SqlImplementation,
nil,
nil,
source_resource
)
)
end
{:ok,
AshSql.AggregateQuery.add_single_aggs(
result,
source_resource,
original_subquery,
cant_group,
AshPostgres.SqlImplementation
)}
end
2020-10-18 12:13:51 +13:00
2021-05-07 19:09:49 +12:00
{:error, error} ->
{:error, error}
end
2020-10-18 12:13:51 +13:00
end
2020-08-26 16:28:55 +12:00
@impl true
def run_query_with_lateral_join(
query,
root_data,
_destination_resource,
path
2020-08-26 16:28:55 +12:00
) do
case lateral_join_query(
query,
root_data,
path
) do
{:ok, lateral_join_query} ->
source_resource =
path
|> Enum.at(0)
|> elem(0)
|> Map.get(:resource)
repo =
AshSql.dynamic_repo(source_resource, AshPostgres.SqlImplementation, lateral_join_query)
results =
repo.all(
lateral_join_query,
AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, nil, source_resource)
)
|> remap_mapped_fields(query)
{:ok, results}
{:error, error} ->
{:error, error}
2021-05-07 19:09:49 +12:00
end
2020-10-18 12:13:51 +13:00
end
defp remap_mapped_fields(results, query) do
calculation_names = query.__ash_bindings__.calculation_names
aggregate_names = query.__ash_bindings__.aggregate_names
if Enum.empty?(calculation_names) and Enum.empty?(aggregate_names) do
results
else
Enum.map(results, fn result ->
result
|> remap(:calculations, calculation_names)
|> remap(:aggregates, aggregate_names)
end)
end
end
defp remap(record, _subfield, mapping) when mapping == %{} do
record
end
defp remap(record, subfield, mapping) do
Map.update!(record, subfield, fn subfield_values ->
Enum.reduce(mapping, subfield_values, fn {dest, source}, subfield_values ->
subfield_values
|> Map.put(dest, Map.get(subfield_values, source))
|> Map.delete(source)
end)
end)
end
2020-10-18 12:13:51 +13:00
defp lateral_join_query(
query,
root_data,
2022-08-19 06:56:36 +12:00
[{source_query, source_attribute, destination_attribute, relationship}]
2020-10-18 12:13:51 +13:00
) do
source_query = Ash.Query.new(source_query)
2020-08-26 16:28:55 +12:00
base_query =
if query.__ash_bindings__[:__order__?] do
from(row in query,
select_merge: %{__order__: over(row_number(), :order)}
2020-08-26 16:28:55 +12:00
)
else
query
end
base_query =
cond do
Map.get(relationship, :manual) ->
{module, opts} = relationship.manual
module.ash_postgres_subquery(
opts,
0,
0,
base_query
)
Map.get(relationship, :no_attributes?) ->
base_query
true ->
from(destination in base_query,
where:
2022-08-19 06:56:36 +12:00
field(destination, ^destination_attribute) ==
field(parent_as(^0), ^source_attribute)
)
end
2020-08-26 16:28:55 +12:00
subquery =
base_query
|> set_subquery_prefix(source_query, relationship.destination)
|> subquery()
source_pkey = Ash.Resource.Info.primary_key(source_query.resource)
source_query.resource
|> Ash.Query.set_context(%{:data_layer => source_query.context[:data_layer]})
|> Ash.Query.set_tenant(source_query.tenant)
|> set_lateral_join_prefix(query)
2021-05-07 19:09:49 +12:00
|> case do
%{valid?: true} = query ->
Ash.Query.data_layer_query(query)
query ->
{:error, query}
end
|> case do
{:ok, data_layer_query} ->
source_values = Enum.map(root_data, &Map.get(&1, source_attribute))
data_layer_query =
from(source in data_layer_query,
where: field(source, ^source_attribute) in ^source_values
)
if query.__ash_bindings__[:__order__?] do
{:ok,
from(source in data_layer_query,
inner_lateral_join: destination in ^subquery,
on: true,
order_by: destination.__order__,
select: destination,
select_merge: %{__lateral_join_source__: map(source, ^source_pkey)},
distinct: true
)}
else
{:ok,
from(source in data_layer_query,
inner_lateral_join: destination in ^subquery,
on: true,
select: destination,
select_merge: %{__lateral_join_source__: map(source, ^source_pkey)},
distinct: true
)}
end
{:error, error} ->
{:error, error}
end
2020-08-26 16:28:55 +12:00
end
defp lateral_join_query(
query,
root_data,
[
2022-08-19 06:56:36 +12:00
{source_query, source_attribute, source_attribute_on_join_resource, relationship},
{through_resource, destination_attribute_on_join_resource, destination_attribute,
through_relationship}
]
) do
source_query = Ash.Query.new(source_query)
2022-08-19 06:56:36 +12:00
source_values = Enum.map(root_data, &Map.get(&1, source_attribute))
source_pkey = Ash.Resource.Info.primary_key(source_query.resource)
through_resource
|> Ash.Query.new()
|> Ash.Query.set_context(through_relationship.context)
|> Ash.Query.do_filter(through_relationship.filter)
|> Ash.Query.set_tenant(source_query.tenant)
|> Ash.Query.put_context(:data_layer, %{
start_bindings_at: query.__ash_bindings__.current
})
|> set_lateral_join_prefix(query)
2021-05-07 19:09:49 +12:00
|> case do
%{valid?: true} = through_query ->
through_query
|> Ash.Query.data_layer_query()
2021-05-07 19:09:49 +12:00
query ->
{:error, query}
end
|> case do
{:ok, through_query} ->
source_query.resource
|> Ash.Query.new()
|> Ash.Query.set_context(relationship.context)
|> Ash.Query.set_context(%{:data_layer => source_query.context[:data_layer]})
|> Ash.Query.put_context(:data_layer, %{
start_bindings_at: through_query.__ash_bindings__.current
})
|> set_lateral_join_prefix(query)
|> Ash.Query.do_filter(relationship.filter)
2021-05-07 19:09:49 +12:00
|> case do
%{valid?: true} = query ->
query
|> Ash.Query.data_layer_query()
2021-05-07 19:09:49 +12:00
query ->
{:error, query}
end
|> case do
{:ok, data_layer_query} ->
if query.__ash_bindings__[:__order__?] do
subquery =
subquery(
from(
destination in query,
select_merge: %{__order__: over(row_number(), :order)},
join:
through in ^set_subquery_prefix(
through_query,
source_query,
relationship.through
),
as: ^query.__ash_bindings__.current,
on:
2022-08-19 06:56:36 +12:00
field(through, ^destination_attribute_on_join_resource) ==
field(destination, ^destination_attribute),
where:
2022-08-19 06:56:36 +12:00
field(through, ^source_attribute_on_join_resource) ==
field(
parent_as(^through_query.__ash_bindings__.current),
^source_attribute
)
)
2021-07-28 15:03:39 +12:00
|> set_subquery_prefix(
source_query,
relationship.destination
)
)
{:ok,
from(source in data_layer_query,
2022-08-19 06:56:36 +12:00
where: field(source, ^source_attribute) in ^source_values,
inner_lateral_join: destination in ^subquery,
on: true,
select: destination,
select_merge: %{__lateral_join_source__: map(source, ^source_pkey)},
order_by: destination.__order__,
distinct: true
)}
else
subquery =
subquery(
from(
destination in query,
join:
through in ^set_subquery_prefix(
through_query,
source_query,
relationship.through
),
as: ^query.__ash_bindings__.current,
on:
2022-08-19 06:56:36 +12:00
field(through, ^destination_attribute_on_join_resource) ==
field(destination, ^destination_attribute),
where:
2022-08-19 06:56:36 +12:00
field(through, ^source_attribute_on_join_resource) ==
field(
parent_as(^through_query.__ash_bindings__.current),
^source_attribute
)
)
2021-07-28 15:03:39 +12:00
|> set_subquery_prefix(
source_query,
relationship.destination
)
)
{:ok,
from(source in data_layer_query,
2022-08-19 06:56:36 +12:00
where: field(source, ^source_attribute) in ^source_values,
inner_lateral_join: destination in ^subquery,
on: true,
select: destination,
select_merge: %{__lateral_join_source__: map(source, ^source_pkey)},
distinct: true
)}
end
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
end
@doc false
def set_subquery_prefix(data_layer_query, source_query, resource) do
repo = AshPostgres.DataLayer.Info.repo(resource, :mutate)
config = repo.config()
2021-07-28 15:03:39 +12:00
2024-01-11 02:34:23 +13:00
case data_layer_query do
%{__ash_bindings__: %{context: %{data_layer: %{schema: schema}}}} when not is_nil(schema) ->
data_layer_query
2024-01-11 02:34:23 +13:00
_ ->
query_tenant =
case source_query do
%{__tenant__: tenant} -> tenant
%{tenant: tenant} -> tenant
_ -> nil
end
if Ash.Resource.Info.multitenancy_strategy(resource) == :context do
%{
data_layer_query
| prefix:
query_tenant || AshPostgres.DataLayer.Info.schema(resource) ||
config[:default_prefix] ||
"public"
2024-01-11 02:34:23 +13:00
}
else
%{
data_layer_query
| prefix:
AshPostgres.DataLayer.Info.schema(resource) || config[:default_prefix] ||
"public"
2024-01-11 02:34:23 +13:00
}
end
end
end
defp set_lateral_join_prefix(ash_query, query) do
if Ash.Resource.Info.multitenancy_strategy(ash_query.resource) == :context do
Ash.Query.set_tenant(ash_query, query.prefix)
else
ash_query
end
end
@impl true
2021-12-21 16:19:24 +13:00
def resource_to_query(resource, _) do
AshSql.Query.resource_to_query(resource, AshPostgres.SqlImplementation)
2021-12-21 16:19:24 +13:00
end
@impl true
def update_query(query, changeset, resource, options) do
ecto_changeset =
case changeset.data do
%Ash.Changeset.OriginalDataNotAvailable{} ->
changeset.resource.__struct__
data ->
data
end
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|> ecto_changeset(changeset, :update, true)
case bulk_updatable_query(
query,
resource,
changeset.atomics,
options[:calculations] || [],
changeset.context
) do
{:error, error} ->
{:error, error}
{:ok, query} ->
try do
repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset)
repo_opts =
AshSql.repo_opts(
repo,
AshPostgres.SqlImplementation,
changeset.timeout,
changeset.tenant,
changeset.resource
)
case AshSql.Atomics.query_with_atomics(
resource,
query,
changeset.filter,
changeset.atomics,
ecto_changeset.changes,
[]
) do
:empty ->
if options[:return_records?] do
if changeset.context[:data_layer][:use_atomic_update_data?] do
{:ok, [changeset.data]}
else
{:ok, repo.all(query)}
end
else
:ok
end
{:ok, query} ->
query =
if options[:return_records?] do
{:ok, query} =
query
|> Ecto.Query.exclude(:select)
|> Ecto.Query.select([row], row)
|> add_calculations(options[:calculations] || [], resource)
query
else
Ecto.Query.exclude(query, :select)
end
{_, results} =
with_savepoint(repo, query, fn ->
repo.update_all(
query,
[],
repo_opts
)
end)
if options[:return_records?] do
{:ok, remap_mapped_fields(results, query)}
else
:ok
end
{:error, error} ->
{:error, error}
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
end
defp bulk_updatable_query(query, resource, atomics, calculations, context) do
Enum.reduce_while(atomics ++ calculations, {:ok, query}, fn {_, expr}, {:ok, query} ->
used_aggregates =
Ash.Filter.used_aggregates(expr, [])
with {:ok, query} <-
AshSql.Join.join_all_relationships(
query,
%Ash.Filter{
resource: resource,
expression: expr
},
left_only?: true
),
{:ok, query} <-
AshSql.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0) do
{:cont, {:ok, query}}
else
{:error, error} ->
{:halt, {:error, error}}
end
end)
|> case do
{:ok, query} ->
needs_to_join? =
case Enum.at(query.joins, 0) do
nil ->
query.limit || query.offset
%{qual: :inner} ->
query.limit || query.offset
_other_type_of_join ->
true
end
if needs_to_join? do
root_query =
from(row in query.from.source, [])
|> Map.put(:__ash_bindings__, query.__ash_bindings__)
|> Ecto.Query.exclude(:select)
|> Map.put(:limit, query.limit)
|> Map.put(:offset, query.offset)
root_query =
if query.limit || query.offset do
root_query
else
Ecto.Query.exclude(root_query, :order_by)
end
dynamic =
Enum.reduce(Ash.Resource.Info.primary_key(resource), nil, fn pkey, dynamic ->
if dynamic do
Ecto.Query.dynamic(
[row, joining],
field(row, ^pkey) == field(joining, ^pkey) and ^dynamic
)
else
Ecto.Query.dynamic([row, joining], field(row, ^pkey) == field(joining, ^pkey))
end
end)
faked_query =
from(row in root_query,
inner_join: limiter in ^subquery(root_query),
as: ^0,
on: ^dynamic
)
joins_to_add =
for {%{on: on} = join, ix} <- Enum.with_index(query.joins) do
%{join | on: Ecto.Query.Planner.rewrite_sources(on, &(&1 + 1)), ix: ix + 1}
end
{:ok,
%{
faked_query
| joins: faked_query.joins ++ joins_to_add,
aliases: Map.new(query.aliases, fn {key, val} -> {key, val + 1} end),
limit: nil,
offset: nil,
wheres:
faked_query.wheres ++
Enum.map(query.wheres, fn where ->
Ecto.Query.Planner.rewrite_sources(where, &(&1 + 1))
end)
}}
else
{:ok,
query
|> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation, context)
|> Ecto.Query.exclude(:select)
|> Ecto.Query.exclude(:order_by)}
end
{:error, error} ->
{:error, error}
end
end
@impl true
def destroy_query(query, changeset, resource, options) do
ecto_changeset =
case changeset.data do
%Ash.Changeset.OriginalDataNotAvailable{} ->
changeset.resource.__struct__
data ->
data
end
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|> ecto_changeset(changeset, :update, true)
try do
query =
query
|> AshSql.Bindings.default_bindings(
resource,
AshPostgres.SqlImplementation,
changeset.context
)
query =
if options[:return_records?] do
attrs = resource |> Ash.Resource.Info.attributes() |> Enum.map(& &1.name)
Ecto.Query.select(query, ^attrs)
else
query
end
|> Ecto.Query.exclude(:order_by)
repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset)
repo_opts =
AshSql.repo_opts(
repo,
AshPostgres.SqlImplementation,
changeset.timeout,
changeset.tenant,
changeset.resource
)
query =
if Enum.any?(query.joins, &(&1.qual != :inner)) do
{:ok, root_query} =
from(row in query.from.source, [])
|> AshSql.Bindings.default_bindings(
resource,
AshPostgres.SqlImplementation,
changeset.context
)
|> Ecto.Query.exclude(:select)
|> Ecto.Query.exclude(:order_by)
|> add_calculations(options[:calculations] || [], resource)
on =
Enum.reduce(Ash.Resource.Info.primary_key(resource), nil, fn key, dynamic ->
if dynamic do
Ecto.Query.dynamic(
[row, distinct],
^dynamic and field(row, ^key) == field(distinct, ^key)
)
else
Ecto.Query.dynamic([row, distinct], field(row, ^key) == field(distinct, ^key))
end
end)
from(row in root_query,
select: row,
join: subquery(query),
as: :sub,
on: ^on
)
else
Ecto.Query.exclude(query, :order_by)
end
query =
if options[:return_records?] do
if Enum.any?(query.joins, &(&1.qual != :inner)) do
query
|> Ecto.Query.exclude(:select)
|> Ecto.Query.select([sub: sub], sub)
else
query
|> Ecto.Query.exclude(:select)
|> Ecto.Query.select([row], row)
end
else
query
|> Ecto.Query.exclude(:select)
end
{_, results} =
with_savepoint(repo, query, fn ->
repo.delete_all(
query,
repo_opts
)
end)
if options[:return_records?] do
{:ok, remap_mapped_fields(results, query)}
else
:ok
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
2023-05-03 14:16:21 +12:00
@impl true
def bulk_create(resource, stream, options) do
changesets = Enum.to_list(stream)
repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, Enum.at(changesets, 0))
opts = AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, options[:tenant], resource)
2023-05-03 14:16:21 +12:00
opts =
if options.return_records? do
Keyword.put(opts, :returning, true)
else
opts
end
source = resolve_source(resource, Enum.at(changesets, 0))
try do
opts =
if options[:upsert?] do
# Ash groups changesets by atomics before dispatching them to the data layer
# this means that all changesets have the same atomics
%{atomics: atomics, filter: filter} = Enum.at(changesets, 0)
query = from(row in source, as: ^0)
query =
query
|> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation)
upsert_set =
upsert_set(resource, changesets, options)
on_conflict =
case AshSql.Atomics.query_with_atomics(
resource,
query,
filter,
atomics,
%{},
upsert_set
) do
:empty ->
{:replace, options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)}
{:ok, query} ->
query
{:error, error} ->
raise Ash.Error.to_ash_error(error)
end
opts
|> Keyword.put(:on_conflict, on_conflict)
|> Keyword.put(
:conflict_target,
conflict_target(
resource,
options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)
)
2023-05-03 14:16:21 +12:00
)
else
opts
end
2023-05-03 14:16:21 +12:00
ecto_changesets = Enum.map(changesets, & &1.attributes)
2023-05-03 14:16:21 +12:00
opts =
if schema = Enum.at(changesets, 0).context[:data_layer][:schema] do
Keyword.put(opts, :prefix, schema)
else
opts
end
result =
with_savepoint(repo, opts[:on_conflict], fn ->
repo.insert_all(source, ecto_changesets, opts)
end)
case result do
{_, nil} ->
:ok
{_, results} ->
if options[:single?] do
Enum.each(results, &maybe_create_tenant!(resource, &1))
{:ok, results}
else
{:ok,
Stream.zip_with(results, changesets, fn result, changeset ->
if !opts[:upsert?] do
maybe_create_tenant!(resource, result)
end
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)
end)}
end
end
rescue
e ->
changeset = Ash.Changeset.new(resource)
handle_raised_error(
e,
__STACKTRACE__,
{:bulk_create, ecto_changeset(changeset.data, changeset, :create, false)},
resource
)
end
end
defp with_savepoint(
repo,
%{
__ash_bindings__: %{
expression_accumulator: %AshSql.Expr.ExprInfo{has_error?: true}
}
},
fun
) do
if repo.in_transaction?() do
savepoint_id = "a" <> (Ash.UUID.generate() |> String.replace("-", "_"))
2023-05-03 14:16:21 +12:00
repo.query!("SAVEPOINT #{savepoint_id}")
result =
try do
{:ok, fun.()}
rescue
e in Postgrex.Error ->
case e do
%Postgrex.Error{
postgres: %{
code: :raise_exception,
message: "ash_error:" <> _,
severity: "ERROR"
}
} ->
repo.query!("ROLLBACK TO #{savepoint_id}")
# This kind of exception won't trigger
# a rollback
{:exception, e, __STACKTRACE__}
2024-01-25 06:49:38 +13:00
_ ->
{:exception, e, __STACKTRACE__}
end
end
case result do
{:exception, e, stacktrace} ->
reraise e, stacktrace
{:ok, result} ->
repo.query!("RELEASE #{savepoint_id}")
result
end
else
fun.()
end
2023-05-03 14:16:21 +12:00
end
defp with_savepoint(_repo, _acc, fun) do
fun.()
end
defp upsert_set(resource, changesets, options) do
attributes_changing_anywhere =
changesets |> Enum.flat_map(&Map.keys(&1.attributes)) |> Enum.uniq()
update_defaults = update_defaults(resource)
# We can't reference EXCLUDED if at least one of the changesets in the stream is not
# changing the value (and we wouldn't want to even if we could as it would be unnecessary)
upsert_fields =
(options[:upsert_fields] || []) |> Enum.filter(&(&1 in attributes_changing_anywhere))
fields_to_upsert =
upsert_fields --
Keyword.keys(Enum.at(changesets, 0).atomics)
Enum.map(fields_to_upsert, fn upsert_field ->
# for safety, we check once more at the end that all values in
# upsert_fields are names of attributes. This is because
# below we use `literal/1` to bring them into the query
if is_nil(resource.__schema__(:type, upsert_field)) do
raise "Only attribute names can be used in upsert_fields"
end
case Keyword.fetch(update_defaults, upsert_field) do
{:ok, default} ->
if upsert_field in upsert_fields do
{upsert_field,
Ecto.Query.dynamic(
[],
fragment(
"COALESCE(EXCLUDED.?, ?)",
literal(^to_string(get_source_for_upsert_field(upsert_field, resource))),
^default
)
)}
else
{upsert_field, default}
end
:error ->
{upsert_field,
Ecto.Query.dynamic(
[],
fragment(
"EXCLUDED.?",
literal(^to_string(get_source_for_upsert_field(upsert_field, resource)))
)
)}
end
end)
end
defp get_source_for_upsert_field(field, resource) do
case Ash.Resource.Info.attribute(resource, field) do
%{source: source} when not is_nil(source) ->
source
_ ->
field
end
end
@impl true
def create(resource, changeset) do
changeset = %{
changeset
| data:
Map.update!(
changeset.data,
:__meta__,
&Map.put(&1, :source, table(resource, changeset))
)
}
2020-10-29 15:26:45 +13:00
case bulk_create(resource, [changeset], %{
single?: true,
2024-04-12 03:39:49 +12:00
tenant: Map.get(changeset, :to_tenant, changeset.tenant),
return_records?: true
}) do
{:ok, [result]} ->
{:ok, result}
2020-10-29 15:26:45 +13:00
{:error, error} ->
{:error, error}
end
end
defp maybe_create_tenant!(resource, result) do
2022-08-24 11:56:46 +12:00
if AshPostgres.DataLayer.Info.manage_tenant_create?(resource) do
2020-10-29 15:26:45 +13:00
tenant_name = tenant_name(resource, result)
2022-08-24 11:56:46 +12:00
AshPostgres.MultiTenancy.create_tenant!(
tenant_name,
AshPostgres.DataLayer.Info.repo(resource, :read)
2022-08-24 11:56:46 +12:00
)
2020-10-29 15:26:45 +13:00
else
:ok
end
end
defp maybe_update_tenant(resource, changeset, result) do
2022-08-24 11:56:46 +12:00
if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do
2020-10-29 15:26:45 +13:00
changing_tenant_name? =
resource
2022-08-24 11:56:46 +12:00
|> AshPostgres.DataLayer.Info.manage_tenant_template()
2020-10-29 15:26:45 +13:00
|> Enum.filter(&is_atom/1)
|> Enum.any?(&Ash.Changeset.changing_attribute?(changeset, &1))
if changing_tenant_name? do
old_tenant_name = tenant_name(resource, changeset.data)
new_tenant_name = tenant_name(resource, result)
2022-08-24 11:56:46 +12:00
AshPostgres.MultiTenancy.rename_tenant(
AshPostgres.DataLayer.Info.repo(resource, :read),
2022-08-24 11:56:46 +12:00
old_tenant_name,
new_tenant_name
)
2020-10-29 15:26:45 +13:00
end
end
:ok
end
defp tenant_name(resource, result) do
resource
2022-08-24 11:56:46 +12:00
|> AshPostgres.DataLayer.Info.manage_tenant_template()
2020-10-29 15:26:45 +13:00
|> Enum.map_join(fn item ->
if is_binary(item) do
item
else
result
|> Map.get(item)
|> to_string()
end
end)
end
defp to_ash_error({field, {message, vars}}) do
Ash.Error.Changes.InvalidAttribute.exception(
field: field,
message: message,
private_vars: vars
)
end
defp ecto_changeset(record, changeset, type, table_error? \\ true) do
attributes =
changeset.resource
|> Ash.Resource.Info.attributes()
|> Enum.map(& &1.name)
2023-08-29 08:18:56 +12:00
attributes_to_change =
Enum.reject(attributes, fn attribute ->
Keyword.has_key?(changeset.atomics, attribute)
end)
ecto_changeset =
record
|> to_ecto()
|> set_table(changeset, type, table_error?)
2023-08-29 08:18:56 +12:00
|> Ecto.Changeset.change(Map.take(changeset.attributes, attributes_to_change))
|> add_configured_foreign_key_constraints(record.__struct__)
|> add_unique_indexes(record.__struct__, changeset)
|> add_check_constraints(record.__struct__)
|> add_exclusion_constraints(record.__struct__)
case type do
:create ->
ecto_changeset
|> add_my_foreign_key_constraints(record.__struct__)
type when type in [:upsert, :update] ->
ecto_changeset
|> add_my_foreign_key_constraints(record.__struct__)
|> add_related_foreign_key_constraints(record.__struct__)
:delete ->
ecto_changeset
|> add_related_foreign_key_constraints(record.__struct__)
end
end
defp handle_raised_error(
%Ecto.StaleEntryError{changeset: %{data: %resource{}, filters: filters}},
stacktrace,
context,
resource
) do
handle_raised_error(
Ash.Error.Changes.StaleRecord.exception(resource: resource, filters: filters),
stacktrace,
context,
resource
)
end
defp handle_raised_error(
%Postgrex.Error{
postgres: %{
code: :lock_not_available,
message: message
}
},
stacktrace,
context,
resource
) do
handle_raised_error(
Ash.Error.Invalid.Unavailable.exception(
resource: resource,
source: inspect(context, pretty: true),
reason: message
),
stacktrace,
context,
resource
)
end
defp handle_raised_error(
%Postgrex.Error{} = error,
stacktrace,
{:bulk_create, fake_changeset},
_resource
) do
case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do
[] ->
{:error, Ash.Error.to_ash_error(error, stacktrace)}
constraints ->
{:error,
fake_changeset
|> constraints_to_errors(:insert, constraints)
|> Ash.Error.to_ash_error()}
end
end
defp handle_raised_error(%Ecto.Query.CastError{} = e, stacktrace, context, resource) do
handle_raised_error(
Ash.Error.Query.InvalidFilterValue.exception(value: e.value, context: context),
stacktrace,
context,
resource
)
end
defp handle_raised_error(
%Postgrex.Error{
postgres: %{
code: :raise_exception,
message: "ash_error: \"" <> json,
severity: "ERROR"
}
},
_,
_,
_
) do
%{"exception" => exception, "input" => input} =
json
|> String.trim_trailing("\"")
|> String.replace("\\\"", "\"")
|> Jason.decode!()
exception = Module.concat([exception])
{:error, :no_rollback, Ash.Error.from_json(exception, input)}
end
defp handle_raised_error(
%Postgrex.Error{
postgres: %{
code: :raise_exception,
message: "ash_error: " <> json,
severity: "ERROR"
}
},
_,
_,
_
) do
%{"exception" => exception, "input" => input} =
Jason.decode!(json)
exception = Module.concat([exception])
{:error, :no_rollback, Ash.Error.from_json(exception, input)}
end
2023-08-29 08:18:56 +12:00
defp handle_raised_error(
%Postgrex.Error{} = error,
stacktrace,
%{constraints: user_constraints},
_resource
) do
case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do
[{type, constraint}] ->
user_constraint =
Enum.find(user_constraints, fn c ->
case {c.type, c.constraint, c.match} do
{^type, ^constraint, :exact} -> true
{^type, cc, :suffix} -> String.ends_with?(constraint, cc)
{^type, cc, :prefix} -> String.starts_with?(constraint, cc)
{^type, %Regex{} = r, _match} -> Regex.match?(r, constraint)
_ -> false
end
end)
case user_constraint do
%{field: field, error_message: error_message, error_type: error_type} ->
{:error,
to_ash_error(
{field, {error_message, [constraint: error_type, constraint_name: constraint]}}
)}
nil ->
reraise error, stacktrace
end
_ ->
reraise error, stacktrace
end
end
defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do
2023-10-12 09:26:48 +13:00
{:error, Ash.Error.to_ash_error(error, stacktrace)}
end
defp constraints_to_errors(%{constraints: user_constraints} = changeset, action, constraints) do
Enum.map(constraints, fn {type, constraint} ->
user_constraint =
Enum.find(user_constraints, fn c ->
case {c.type, c.constraint, c.match} do
{^type, ^constraint, :exact} -> true
{^type, cc, :suffix} -> String.ends_with?(constraint, cc)
{^type, cc, :prefix} -> String.starts_with?(constraint, cc)
{^type, %Regex{} = r, _match} -> Regex.match?(r, constraint)
_ -> false
end
end)
case user_constraint do
%{field: field, error_message: error_message, type: type, constraint: constraint} ->
Ash.Error.Changes.InvalidAttribute.exception(
field: field,
message: error_message,
private_vars: [
constraint: constraint,
constraint_type: type
]
)
nil ->
Ecto.ConstraintError.exception(
action: action,
type: type,
constraint: constraint,
changeset: changeset
)
end
end)
end
defp set_table(record, changeset, operation, table_error?) do
2022-08-24 11:56:46 +12:00
if AshPostgres.DataLayer.Info.polymorphic?(record.__struct__) do
table =
changeset.context[:data_layer][:table] ||
AshPostgres.DataLayer.Info.table(record.__struct__)
record =
if table do
Ecto.put_meta(record, source: table)
else
if table_error? do
raise_table_error!(changeset.resource, operation)
else
record
end
end
2022-08-24 11:56:46 +12:00
prefix =
changeset.context[:data_layer][:schema] ||
AshPostgres.DataLayer.Info.schema(record.__struct__)
if prefix do
Ecto.put_meta(record, prefix: table)
else
record
end
else
record
end
end
def from_ecto({:ok, result}), do: {:ok, from_ecto(result)}
def from_ecto({:error, _} = other), do: other
def from_ecto(nil), do: nil
def from_ecto(value) when is_list(value) do
Enum.map(value, &from_ecto/1)
end
def from_ecto(%resource{} = record) do
if Spark.Dsl.is?(resource, Ash.Resource) do
empty = struct(resource)
resource
|> Ash.Resource.Info.relationships()
|> Enum.reduce(record, fn relationship, record ->
case Map.get(record, relationship.name) do
%Ecto.Association.NotLoaded{} ->
Map.put(record, relationship.name, Map.get(empty, relationship.name))
value ->
Map.put(record, relationship.name, from_ecto(value))
end
end)
else
record
end
end
def from_ecto(other), do: other
def to_ecto(nil), do: nil
def to_ecto(value) when is_list(value) do
Enum.map(value, &to_ecto/1)
end
def to_ecto(%resource{} = record) do
if Spark.Dsl.is?(resource, Ash.Resource) do
resource
|> Ash.Resource.Info.relationships()
|> Enum.reduce(record, fn relationship, record ->
value =
case Map.get(record, relationship.name) do
%Ash.NotLoaded{} ->
%Ecto.Association.NotLoaded{
__field__: relationship.name,
__cardinality__: relationship.cardinality
}
value ->
to_ecto(value)
end
Map.put(record, relationship.name, value)
end)
else
record
end
end
def to_ecto(other), do: other
defp add_check_constraints(changeset, resource) do
resource
2022-08-24 11:56:46 +12:00
|> AshPostgres.DataLayer.Info.check_constraints()
|> Enum.reduce(changeset, fn constraint, changeset ->
constraint.attribute
|> List.wrap()
|> Enum.reduce(changeset, fn attribute, changeset ->
Ecto.Changeset.check_constraint(changeset, attribute,
name: constraint.name,
message: constraint.message || "is invalid"
)
end)
end)
end
defp add_exclusion_constraints(changeset, resource) do
resource
2022-08-24 11:56:46 +12:00
|> AshPostgres.DataLayer.Info.exclusion_constraint_names()
|> Enum.reduce(changeset, fn constraint, changeset ->
case constraint do
{key, name} ->
Ecto.Changeset.exclusion_constraint(changeset, key, name: name)
{key, name, message} ->
Ecto.Changeset.exclusion_constraint(changeset, key, name: name, message: message)
end
end)
end
defp add_related_foreign_key_constraints(changeset, resource) do
# TODO: this doesn't guarantee us to get all of them, because if something is related to this
# schema and there is no back-relation, then this won't catch it's foreign key constraints
resource
|> Ash.Resource.Info.relationships()
|> Enum.map(& &1.destination)
|> Enum.uniq()
|> Enum.flat_map(fn related ->
related
|> Ash.Resource.Info.relationships()
|> Enum.filter(&(&1.destination == resource))
|> Enum.map(&Map.take(&1, [:source, :source_attribute, :destination_attribute, :name]))
end)
|> Enum.reduce(changeset, fn %{
source: source,
2022-08-19 06:56:36 +12:00
source_attribute: source_attribute,
destination_attribute: destination_attribute,
name: relationship_name
},
changeset ->
case AshPostgres.DataLayer.Info.reference(resource, relationship_name) do
%{name: name} when not is_nil(name) ->
Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
name: name,
message: "would leave records behind"
)
_ ->
Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
name: "#{AshPostgres.DataLayer.Info.table(source)}_#{source_attribute}_fkey",
message: "would leave records behind"
)
end
end)
end
defp add_my_foreign_key_constraints(changeset, resource) do
resource
|> Ash.Resource.Info.relationships()
2022-08-19 06:56:36 +12:00
|> Enum.reduce(changeset, &Ecto.Changeset.foreign_key_constraint(&2, &1.source_attribute))
end
defp add_configured_foreign_key_constraints(changeset, resource) do
resource
2022-08-24 11:56:46 +12:00
|> AshPostgres.DataLayer.Info.foreign_key_names()
|> case do
{m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
value -> List.wrap(value)
end
|> Enum.reduce(changeset, fn
{key, name}, changeset ->
Ecto.Changeset.foreign_key_constraint(changeset, key, name: name)
{key, name, message}, changeset ->
Ecto.Changeset.foreign_key_constraint(changeset, key, name: name, message: message)
end)
end
defp add_unique_indexes(changeset, resource, ash_changeset) do
table = table(resource, ash_changeset)
pkey = Ash.Resource.Info.primary_key(resource)
2024-01-30 04:36:20 +13:00
changeset =
resource
2021-02-23 17:53:18 +13:00
|> Ash.Resource.Info.identities()
|> Enum.reduce(changeset, fn identity, changeset ->
name =
2022-08-24 11:56:46 +12:00
AshPostgres.DataLayer.Info.identity_index_names(resource)[identity.name] ||
"#{table}_#{identity.name}_index"
opts =
if Map.get(identity, :message) do
[name: name, message: identity.message]
else
[name: name]
end
2024-02-02 09:48:23 +13:00
fields =
case identity.keys do
[] ->
pkey
keys ->
keys
end
2024-02-02 09:48:23 +13:00
Ecto.Changeset.unique_constraint(changeset, fields, opts)
end)
changeset =
resource
|> AshPostgres.DataLayer.Info.custom_indexes()
|> Enum.reduce(changeset, fn index, changeset ->
name = index.name || AshPostgres.CustomIndex.name(table, index)
opts =
if index.message do
[name: name, message: index.message]
else
[name: name]
end
fields =
if index.error_fields do
2024-02-02 09:48:23 +13:00
case index.error_fields do
[] -> pkey
fields -> fields
end
else
case Enum.filter(index.fields, &is_atom/1) do
2024-02-02 09:48:23 +13:00
[] -> pkey
fields -> fields
end
end
Ecto.Changeset.unique_constraint(changeset, fields, opts)
end)
names =
resource
2022-08-24 11:56:46 +12:00
|> AshPostgres.DataLayer.Info.unique_index_names()
|> case do
{m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
value -> List.wrap(value)
end
names =
case Ash.Resource.Info.primary_key(resource) do
[] ->
names
fields ->
if table = table(resource, ash_changeset) do
[{fields, table <> "_pkey"} | names]
else
[]
end
end
Enum.reduce(names, changeset, fn
{keys, name}, changeset ->
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name)
{keys, name, message}, changeset ->
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name, message: message)
end)
2020-07-13 16:41:38 +12:00
end
@impl true
def upsert(resource, changeset, keys \\ nil) do
2022-08-24 11:56:46 +12:00
if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do
2020-10-29 15:26:45 +13:00
{:error, "Cannot currently upsert a resource that owns a tenant"}
else
keys = keys || Ash.Resource.Info.primary_key(keys)
update_defaults = update_defaults(resource)
explicitly_changing_attributes =
changeset.attributes
|> Map.keys()
|> Enum.concat(Keyword.keys(update_defaults))
|> Kernel.--(Map.get(changeset, :defaults, []))
|> Kernel.--(keys)
upsert_fields =
changeset.context[:private][:upsert_fields] || explicitly_changing_attributes
case bulk_create(resource, [changeset], %{
single?: true,
upsert?: true,
tenant: changeset.tenant,
upsert_keys: keys,
upsert_fields: upsert_fields,
return_records?: true
}) do
{:ok, [result]} ->
{:ok, result}
{:error, error} ->
{:error, error}
end
2020-10-29 15:26:45 +13:00
end
end
2023-05-03 14:16:21 +12:00
defp conflict_target(resource, keys) do
if Ash.Resource.Info.base_filter(resource) do
base_filter_sql =
AshPostgres.DataLayer.Info.base_filter_sql(resource) ||
raise """
Cannot use upserts with resources that have a base_filter without also adding `base_filter_sql` in the postgres section.
"""
sources =
Enum.map(keys, fn key ->
~s("#{Ash.Resource.Info.attribute(resource, key).source || key}")
2023-05-03 14:16:21 +12:00
end)
{:unsafe_fragment, "(" <> Enum.join(sources, ", ") <> ") WHERE (#{base_filter_sql})"}
else
keys
end
end
defp update_defaults(resource) do
attributes =
resource
|> Ash.Resource.Info.attributes()
|> Enum.reject(&is_nil(&1.update_default))
attributes
|> static_defaults()
|> Enum.concat(lazy_matching_defaults(attributes))
|> Enum.concat(lazy_non_matching_defaults(attributes))
end
defp static_defaults(attributes) do
attributes
|> Enum.reject(&get_default_fun(&1))
|> Enum.map(&{&1.name, &1.update_default})
end
defp lazy_non_matching_defaults(attributes) do
attributes
|> Enum.filter(&(!&1.match_other_defaults? && get_default_fun(&1)))
|> Enum.map(fn attribute ->
2022-12-14 11:06:39 +13:00
default_value =
case attribute.update_default do
2022-12-14 11:06:39 +13:00
function when is_function(function) ->
function.()
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
apply(m, f, a)
end
{:ok, default_value} =
Ash.Type.cast_input(attribute.type, default_value, attribute.constraints)
{attribute.name, default_value}
2022-12-14 11:06:39 +13:00
end)
end
defp lazy_matching_defaults(attributes) do
attributes
|> Enum.filter(&(&1.match_other_defaults? && get_default_fun(&1)))
|> Enum.group_by(&{&1.update_default, &1.type, &1.constraints})
|> Enum.flat_map(fn {{default_fun, type, constraints}, attributes} ->
default_value =
case default_fun do
function when is_function(function) ->
function.()
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
apply(m, f, a)
end
{:ok, default_value} =
Ash.Type.cast_input(type, default_value, constraints)
Enum.map(attributes, &{&1.name, default_value})
end)
end
defp get_default_fun(attribute) do
if is_function(attribute.update_default) or match?({_, _, _}, attribute.update_default) do
attribute.update_default
end
end
@impl true
def update(resource, changeset) do
2023-08-29 08:18:56 +12:00
ecto_changeset =
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|> ecto_changeset(changeset, :update)
2020-10-29 15:26:45 +13:00
source = resolve_source(resource, changeset)
query =
from(row in source, as: ^0)
|> AshSql.Bindings.default_bindings(
resource,
AshPostgres.SqlImplementation,
changeset.context
)
|> pkey_filter(changeset.data)
2023-08-29 08:18:56 +12:00
select = Keyword.keys(changeset.atomics) ++ Ash.Resource.Info.primary_key(resource)
2023-09-01 08:03:23 +12:00
case bulk_updatable_query(query, resource, changeset.atomics, [], changeset.context) do
{:error, error} ->
{:error, error}
{:ok, query} ->
query = Ecto.Query.select(query, ^select)
try do
case AshSql.Atomics.query_with_atomics(
resource,
query,
changeset.filter,
changeset.atomics,
ecto_changeset.changes,
[]
) do
:empty ->
{:ok, changeset.data}
{:ok, query} ->
repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset)
repo_opts =
AshSql.repo_opts(
repo,
AshPostgres.SqlImplementation,
changeset.timeout,
changeset.tenant,
changeset.resource
)
2023-08-29 08:18:56 +12:00
repo_opts =
Keyword.put(repo_opts, :returning, Keyword.keys(changeset.atomics))
result =
with_savepoint(repo, query, fn ->
repo.update_all(
query,
[],
repo_opts
)
end)
case result do
{0, []} ->
{:error,
Ash.Error.Changes.StaleRecord.exception(
resource: resource,
filters: changeset.filter
)}
2023-08-29 08:18:56 +12:00
{1, [result]} ->
record =
changeset.data
|> Map.merge(changeset.attributes)
|> Map.merge(Map.take(result, Keyword.keys(changeset.atomics)))
maybe_update_tenant(resource, changeset, record)
{:ok, record}
end
{:error, error} ->
{:error, error}
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
end
defp pkey_filter(query, %resource{} = record) do
pkey =
record
|> Map.take(Ash.Resource.Info.primary_key(resource))
|> Map.to_list()
Ecto.Query.where(query, ^pkey)
end
@impl true
2020-10-29 15:26:45 +13:00
def destroy(resource, %{data: record} = changeset) do
2023-08-29 08:18:56 +12:00
ecto_changeset = ecto_changeset(record, changeset, :delete)
2020-10-29 15:26:45 +13:00
2023-08-29 08:18:56 +12:00
try do
repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset)
source = resolve_source(resource, changeset)
from(row in source, as: ^0)
|> AshSql.Bindings.default_bindings(
resource,
AshPostgres.SqlImplementation,
changeset.context
)
|> filter(changeset.filter, resource)
2023-08-29 08:18:56 +12:00
|> case do
{:ok, query} ->
query
|> pkey_filter(record)
|> repo.delete_all(
AshSql.repo_opts(
repo,
AshPostgres.SqlImplementation,
changeset.timeout,
changeset.tenant,
changeset.resource
)
)
2023-08-29 08:18:56 +12:00
:ok
{:error, error} ->
{:error, error}
2023-08-29 08:18:56 +12:00
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
2023-04-28 13:49:13 +12:00
@impl true
def lock(query, :for_update, _) do
if query.distinct do
new_query =
Ecto.Query.lock(%{query | distinct: nil}, [{^0, a}], fragment("FOR UPDATE OF ?", a))
q = from(row in subquery(new_query), [])
{:ok, %{q | distinct: query.distinct}}
else
{:ok, Ecto.Query.lock(query, [{^0, a}], fragment("FOR UPDATE OF ?", a))}
end
2023-04-28 13:49:13 +12:00
end
2023-05-17 07:14:45 +12:00
@locks [
2023-05-17 07:20:27 +12:00
"FOR UPDATE",
"FOR NO KEY UPDATE",
"FOR SHARE",
"FOR KEY SHARE"
2023-05-17 07:14:45 +12:00
]
for lock <- @locks do
frag = "#{lock} OF ?"
2023-05-17 07:14:45 +12:00
def lock(query, unquote(lock), _) do
{:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))}
end
frag = "#{lock} OF ? NOWAIT"
lock = "#{lock} NOWAIT"
def lock(query, unquote(lock), _) do
{:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))}
2023-05-17 07:14:45 +12:00
end
end
@impl true
def sort(query, sort, _resource) do
{:ok,
Map.update!(
query,
:__ash_bindings__,
&Map.put(&1, :sort, sort)
)}
end
@impl true
def select(query, select, _resource) do
{:ok, from(row in query, select: struct(row, ^Enum.uniq(select)))}
end
@impl true
def distinct_sort(query, sort, _) when sort in [nil, []] do
{:ok, query}
end
def distinct_sort(query, sort, _) do
{:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :distinct_sort, sort))}
end
# If the order by does not match the initial sort clause, then we use a subquery
# to limit to only distinct rows. This may not perform that well, so we may need
# to come up with alternatives here.
@impl true
def distinct(query, distinct, resource) do
AshSql.Distinct.distinct(query, distinct, resource)
end
@impl true
2023-01-05 06:36:01 +13:00
def filter(query, filter, resource, opts \\ []) do
used_aggregates = Ash.Filter.used_aggregates(filter, [])
2021-05-07 19:09:49 +12:00
query
|> AshSql.Join.join_all_relationships(filter, opts)
2021-05-07 19:09:49 +12:00
|> case do
{:ok, query} ->
query
|> AshSql.Aggregate.add_aggregates(used_aggregates, resource, false, 0)
|> case do
{:ok, query} ->
{:ok, AshSql.Filter.add_filter_expression(query, filter)}
{:error, error} ->
{:error, error}
end
2021-05-07 19:09:49 +12:00
{:error, error} ->
{:error, error}
end
2020-06-19 15:04:41 +12:00
end
2020-07-23 17:13:47 +12:00
@impl true
2022-01-14 08:11:30 +13:00
def add_aggregates(query, aggregates, resource) do
AshSql.Aggregate.add_aggregates(query, aggregates, resource, true, 0)
2020-07-23 17:13:47 +12:00
end
@impl true
def add_calculations(query, calculations, resource, select? \\ true) do
AshSql.Calculation.add_calculations(query, calculations, resource, 0, select?)
2021-04-27 08:45:47 +12:00
end
def add_known_binding(query, data, known_binding) do
bindings = query.__ash_bindings__.bindings
new_ash_bindings = %{
query.__ash_bindings__
| bindings: Map.put(bindings, known_binding, data)
}
%{query | __ash_bindings__: new_ash_bindings}
end
2021-12-21 16:19:24 +13:00
@impl true
def transaction(resource, func, timeout \\ nil, reason \\ %{type: :custom, metadata: %{}}) do
repo =
case reason[:data_layer_context] do
%{repo: repo} when not is_nil(repo) ->
repo
_ ->
AshPostgres.DataLayer.Info.repo(resource, :read)
end
func = fn ->
repo.on_transaction_begin(reason)
func.()
end
2022-04-18 16:23:09 +12:00
if timeout do
repo.transaction(func, timeout: timeout)
2022-04-18 16:23:09 +12:00
else
repo.transaction(func)
2022-04-18 16:23:09 +12:00
end
end
@impl true
2020-07-08 12:01:01 +12:00
def rollback(resource, term) do
AshPostgres.DataLayer.Info.repo(resource, :mutate).rollback(term)
end
defp table(resource, changeset) do
2022-08-24 11:56:46 +12:00
changeset.context[:data_layer][:table] || AshPostgres.DataLayer.Info.table(resource)
end
defp raise_table_error!(resource, operation) do
2022-08-24 11:56:46 +12:00
if AshPostgres.DataLayer.Info.polymorphic?(resource) do
raise """
Could not determine table for #{operation} on #{inspect(resource)}.
Polymorphic resources require that the `data_layer[:table]` context is provided.
See the guide on polymorphic resources for more information.
"""
else
raise """
Could not determine table for #{operation} on #{inspect(resource)}.
"""
end
end
defp resolve_source(resource, changeset) do
if table = changeset.context[:data_layer][:table] do
{table, resource}
else
resource
end
end
end