defmodule AshPostgres.DataLayer do require Ecto.Query @manage_tenant %Spark.Dsl.Section{ name: :manage_tenant, describe: """ Configuration for the behavior of a resource that manages a tenant """, examples: [ """ manage_tenant do template ["organization_", :id] create? true update? false end """ ], schema: [ template: [ type: {:wrap_list, {:or, [:string, :atom]}}, required: true, doc: """ A template that will cause the resource to create/manage the specified schema. """ ], create?: [ type: :boolean, default: true, doc: "Whether or not to automatically create a tenant when a record is created" ], update?: [ type: :boolean, default: true, doc: "Whether or not to automatically update the tenant name if the record is udpated" ] ] } @index %Spark.Dsl.Entity{ name: :index, describe: """ Add an index to be managed by the migration generator. """, examples: [ "index [\"column\", \"column2\"], unique: true, where: \"thing = TRUE\"" ], target: AshPostgres.CustomIndex, schema: AshPostgres.CustomIndex.schema(), transform: {AshPostgres.CustomIndex, :transform, []}, args: [:fields] } @custom_indexes %Spark.Dsl.Section{ name: :custom_indexes, describe: """ A section for configuring indexes to be created by the migration generator. In general, prefer to use `identities` for simple unique constraints. This is a tool to allow for declaring more complex indexes. """, examples: [ """ custom_indexes do index [:column1, :column2], unique: true, where: "thing = TRUE" end """ ], entities: [ @index ] } @statement %Spark.Dsl.Entity{ name: :statement, describe: """ Add a custom statement for migrations. """, examples: [ """ statement :pgweb_idx do up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));" down "DROP INDEX pgweb_idx;" end """ ], target: AshPostgres.Statement, schema: AshPostgres.Statement.schema(), args: [:name] } @custom_statements %Spark.Dsl.Section{ name: :custom_statements, describe: """ A section for configuring custom statements to be added to migrations. Changing custom statements may require manual intervention, because Ash can't determine what order they should run in (i.e if they depend on table structure that you've added, or vice versa). As such, any `down` statements we run for custom statements happen first, and any `up` statements happen last. Additionally, when changing a custom statement, we must make some assumptions, i.e that we should migrate the old structure down using the previously configured `down` and recreate it. This may not be desired, and so what you may end up doing is simply modifying the old migration and deleting whatever was generated by the migration generator. As always: read your migrations after generating them! """, examples: [ """ custom_statements do # the name is used to detect if you remove or modify the statement statement :pgweb_idx do up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));" down "DROP INDEX pgweb_idx;" end end """ ], entities: [ @statement ] } @reference %Spark.Dsl.Entity{ name: :reference, describe: """ Configures the reference for a relationship in resource migrations. Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys. In those cases, you only need to configure the `reference` behavior for one of them. Any conflicts will result in an error, across this resource and any other resources that share a table with this one. For this reason, instead of adding a reference configuration for `:nothing`, its best to just leave the configuration out, as that is the default behavior if *no* relationship anywhere has configured the behavior of that reference. """, examples: [ "reference :post, on_delete: :delete, on_update: :update, name: \"comments_to_posts_fkey\"" ], args: [:relationship], target: AshPostgres.Reference, schema: AshPostgres.Reference.schema() } @references %Spark.Dsl.Section{ name: :references, describe: """ A section for configuring the references (foreign keys) in resource migrations. This section is only relevant if you are using the migration generator with this resource. Otherwise, it has no effect. """, examples: [ """ references do reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey" end """ ], entities: [@reference], schema: [ polymorphic_on_delete: [ type: {:or, [ {:one_of, [:delete, :nilify, :nothing, :restrict]}, {:tagged_tuple, :nilify, {:wrap_list, :atom}} ]}, doc: "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables." ], polymorphic_on_update: [ type: {:one_of, [:update, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables." ], polymorphic_name: [ type: :string, doc: "For polymorphic resources, then index name to use for the foreign key to the source table." ] ] } @check_constraint %Spark.Dsl.Entity{ name: :check_constraint, describe: """ Add a check constraint to be validated. If a check constraint exists on the table but not in this section, and it produces an error, a runtime error will be raised. Provide a list of attributes instead of a single attribute to add the message to multiple attributes. By adding the `check` option, the migration generator will include it when generating migrations. """, examples: [ """ check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive" """ ], args: [:attribute, :name], target: AshPostgres.CheckConstraint, schema: AshPostgres.CheckConstraint.schema() } @check_constraints %Spark.Dsl.Section{ name: :check_constraints, describe: """ A section for configuring the check constraints for a given table. This can be used to automatically create those check constraints, or just to provide message when they are raised """, examples: [ """ check_constraints do check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive" end """ ], entities: [@check_constraint] } @references %Spark.Dsl.Section{ name: :references, describe: """ A section for configuring the references (foreign keys) in resource migrations. This section is only relevant if you are using the migration generator with this resource. Otherwise, it has no effect. """, examples: [ """ references do reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey" end """ ], entities: [@reference], schema: [ polymorphic_on_delete: [ type: {:or, [ {:one_of, [:delete, :nilify, :nothing, :restrict]}, {:tagged_tuple, :nilify, {:wrap_list, :atom}} ]}, doc: "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables." ], polymorphic_on_update: [ type: {:one_of, [:update, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables." ] ] } @postgres %Spark.Dsl.Section{ name: :postgres, describe: """ Postgres data layer configuration """, sections: [ @custom_indexes, @custom_statements, @manage_tenant, @references, @check_constraints ], modules: [ :repo ], examples: [ """ postgres do repo MyApp.Repo table "organizations" end """ ], schema: [ repo: [ type: {:or, [{:behaviour, Ecto.Repo}, {:fun, 2}]}, required: true, doc: "The repo that will be used to fetch your data. See the `AshPostgres.Repo` documentation for more. Can also be a function that takes a resource and a type `:read | :mutate` and returns the repo" ], migrate?: [ type: :boolean, default: true, doc: "Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`" ], migration_types: [ type: :keyword_list, default: [], doc: "A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults." ], migration_defaults: [ type: :keyword_list, default: [], doc: """ A keyword list of attribute names to the ecto migration default that should be used for that attribute. The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\\\"now()\\\\")`, or for `nil`, use `\\\\"nil\\\\"`. """ ], calculations_to_sql: [ type: :keyword_list, doc: "A keyword list of calculations and their SQL representation. Used when creating unique indexes for identities over calculations" ], identity_wheres_to_sql: [ type: :keyword_list, doc: "A keyword list of identity names and the SQL representation of their `where` clause. Used when creating unique indexes for identities over calculations" ], base_filter_sql: [ type: :string, doc: "A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter" ], simple_join_first_aggregates: [ type: {:list, :atom}, default: [], doc: """ A list of `:first` type aggregate names that can be joined to using a simple join. Use when you have a `:first` aggregate that uses a to-many relationship , but your `filter` statement ensures that there is only one result. Optimizes the generated query. """ ], skip_unique_indexes: [ type: {:wrap_list, :atom}, default: false, doc: "Skip generating unique indexes when generating migrations" ], unique_index_names: [ type: {:list, {:or, [{:tuple, [{:list, :atom}, :string]}, {:tuple, [{:list, :atom}, :string, :string]}]}}, default: [], doc: """ A list of unique index names that could raise errors that are not configured in identities, or an mfa to a function that takes a changeset and returns the list. In the format `{[:affected, :keys], "name_of_constraint"}` or `{[:affected, :keys], "name_of_constraint", "custom error message"}` """ ], exclusion_constraint_names: [ type: :any, default: [], doc: """ A list of exclusion constraint names that could raise errors. Must be in the format `{:affected_key, "name_of_constraint"}` or `{:affected_key, "name_of_constraint", "custom error message"}` """ ], identity_index_names: [ type: :any, default: [], doc: """ A keyword list of identity names to the unique index name that they should use when being managed by the migration generator. """ ], foreign_key_names: [ type: {:list, {:or, [ {:tuple, [{:or, [:atom, :string]}, :string]}, {:tuple, [{:or, [:atom, :string]}, :string, :string]} ]}}, default: [], doc: """ A list of foreign keys that could raise errors, or an mfa to a function that takes a changeset and returns a list. In the format: `{:key, "name_of_constraint"}` or `{:key, "name_of_constraint", "custom error message"}` """ ], migration_ignore_attributes: [ type: {:list, :atom}, default: [], doc: """ A list of attributes that will be ignored when generating migrations. """ ], table: [ type: :string, doc: """ The table to store and read the resource from. If this is changed, the migration generator will not remove the old table. """ ], schema: [ type: :string, doc: """ The schema that the table is located in. Schema-based multitenancy will supercede this option. If this is changed, the migration generator will not remove the old schema. """ ], polymorphic?: [ type: :boolean, default: false, doc: """ Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/resources/polymorphic-resources.md) for more. """ ] ] } @behaviour Ash.DataLayer @sections [@postgres] @moduledoc """ A postgres data layer that leverages Ecto's postgres capabilities. """ require Igniter.Code.Common use Spark.Dsl.Extension, sections: @sections, verifiers: [ AshPostgres.Verifiers.PreventMultidimensionalArrayAggregates, AshPostgres.Verifiers.ValidateReferences, AshPostgres.Verifiers.PreventAttributeMultitenancyAndNonFullMatchType, AshPostgres.Verifiers.EnsureTableOrPolymorphic, AshPostgres.Verifiers.ValidateIdentityIndexNames ] def migrate(args) do Mix.Task.run("ash_postgres.migrate", args) end def rollback(args) do repos = AshPostgres.Mix.Helpers.repos!([], args) show_for_repo? = Enum.count_until(repos, 2) == 2 for repo <- repos do {:ok, _, _} = Ecto.Migrator.with_repo(repo, fn repo -> for_repo = if show_for_repo? do " for repo #{inspect(repo)}" else "" end migrations_path = AshPostgres.Mix.Helpers.migrations_path([], repo) tenant_migrations_path = AshPostgres.Mix.Helpers.tenant_migrations_path([], repo) current_migrations = Ecto.Query.from(row in "schema_migrations", select: row.version ) |> repo.all() |> Enum.map(&to_string/1) files = migrations_path |> Path.join("**/*.exs") |> Path.wildcard() |> Enum.sort() |> Enum.reverse() |> Enum.filter(fn file -> Enum.any?(current_migrations, &String.starts_with?(Path.basename(file), &1)) end) |> Enum.take(20) |> Enum.map(&String.trim_leading(&1, migrations_path)) |> Enum.with_index() |> Enum.map(fn {file, index} -> "#{index + 1}: #{file}" end) n = Mix.shell().prompt( """ How many migrations should be rolled back#{for_repo}? (default: 0) Last 20 migration names, with the input you must provide to rollback up to *and including* that migration: #{Enum.join(files, "\n")} Rollback to: """ |> String.trim_trailing() ) |> String.trim() |> case do "" -> 0 n -> try do String.to_integer(n) rescue _ -> reraise "Required an integer value, got: #{n}", __STACKTRACE__ end end Mix.Task.run("ash_postgres.rollback", args ++ ["-r", inspect(repo), "-n", to_string(n)]) Mix.Task.reenable("ash_postgres.rollback") tenant_files = tenant_migrations_path |> Path.join("**/*.exs") |> Path.wildcard() |> Enum.sort() |> Enum.reverse() if !Enum.empty?(tenant_files) do first_tenant = repo.all_tenants() |> Enum.at(0) if first_tenant do current_tenant_migrations = Ecto.Query.from(row in "schema_migrations", select: row.version ) |> repo.all(prefix: first_tenant) |> Enum.map(&to_string/1) tenant_files = tenant_files |> Enum.filter(fn file -> Enum.any?( current_tenant_migrations, &String.starts_with?(Path.basename(file), &1) ) end) |> Enum.take(20) |> Enum.map(&String.trim_leading(&1, tenant_migrations_path)) |> Enum.with_index() |> Enum.map(fn {file, index} -> "#{index + 1}: #{file}" end) n = Mix.shell().prompt( """ How many _tenant_ migrations should be rolled back#{for_repo}? (default: 0) IMPORTANT: we are assuming that all of your tenants have all had the same migrations run. If each tenant may be in a different state: *abort this command and roll them back individually*. To do so, use the `--only-tenants` option to `mix ash_postgres.rollback`. Last 20 migration names, with the input you must provide to rollback up to *and including* that migration: #{Enum.join(tenant_files, "\n")} Rollback to: """ |> String.trim_trailing() ) |> String.trim() |> case do "" -> 0 n -> try do String.to_integer(n) rescue _ -> reraise "Required an integer value, got: #{n}", __STACKTRACE__ end end Mix.Task.run( "ash_postgres.rollback", args ++ ["--tenants", "-r", inspect(repo), "-n", to_string(n)] ) Mix.Task.reenable("ash_postgres.rollback") end end end) end end def codegen(args) do Mix.Task.run("ash_postgres.generate_migrations", args) end def setup(args) do # TODO: take args that we care about Mix.Task.run("ash_postgres.create", args) Mix.Task.run("ash_postgres.migrate", args) [] |> AshPostgres.Mix.Helpers.repos!(args) |> Enum.all?(&(not has_tenant_migrations?(&1))) |> case do true -> :ok _ -> Mix.Task.run("ash_postgres.migrate", ["--tenant" | args]) end end def tear_down(args) do # TODO: take args that we care about Mix.Task.run("ash_postgres.drop", args) end defp has_tenant_migrations?(repo) do [] |> AshPostgres.Mix.Helpers.tenant_migrations_path(repo) |> Path.join("**/*.exs") |> Path.wildcard() |> Enum.empty?() end import Ecto.Query, only: [from: 2, subquery: 1] @impl true def can?(_, :async_engine), do: true def can?(_, :bulk_create), do: true def can?(resource, :update_query) do # We can't currently support updating a record from a query # if that record manages a tenant on update !AshPostgres.DataLayer.Info.manage_tenant_update?(resource) end def can?(_, :destroy_query), do: true def can?(_, {:lock, :for_update}), do: true def can?(_, :composite_types), do: true def can?(_, {:lock, string}) do string = String.trim_trailing(string, " NOWAIT") String.upcase(string) in [ "FOR UPDATE", "FOR NO KEY UPDATE", "FOR SHARE", "FOR KEY SHARE" ] end def can?(_, :transact), do: true def can?(_, :composite_primary_key), do: true def can?(_resource, {:atomic, :update}), do: true def can?(_resource, {:atomic, :upsert}), do: true def can?(_, :upsert), do: true def can?(_, :changeset_filter), do: true def can?(resource, {:join, other_resource}) do data_layer = Ash.DataLayer.data_layer(resource) other_data_layer = Ash.DataLayer.data_layer(other_resource) data_layer == other_data_layer and AshPostgres.DataLayer.Info.repo(resource, :read) == AshPostgres.DataLayer.Info.repo(other_resource, :read) end def can?(resource, {:lateral_join, resources}) do repo = AshPostgres.DataLayer.Info.repo(resource, :read) data_layer = Ash.DataLayer.data_layer(resource) data_layer == __MODULE__ && Enum.all?(resources, fn resource -> Ash.DataLayer.data_layer(resource) == data_layer && AshPostgres.DataLayer.Info.repo(resource, :read) == repo end) end def can?(_, :boolean_filter), do: true def can?(_, {:aggregate, type}) when type in [:count, :sum, :first, :list, :avg, :max, :min, :exists, :custom], do: true def can?(_, :aggregate_filter), do: true def can?(_, :aggregate_sort), do: true def can?(_, :calculate), do: true def can?(_, :expression_calculation), do: true def can?(_, :expression_calculation_sort), do: true def can?(_, :create), do: true def can?(_, :select), do: true def can?(_, :read), do: true def can?(resource, action) when action in ~w[update destroy]a do resource |> Ash.Resource.Info.primary_key() |> Enum.any?() end def can?(_, :filter), do: true def can?(_, :limit), do: true def can?(_, :offset), do: true def can?(_, :multitenancy), do: true def can?(_, {:filter_relationship, %{manual: {module, _}}}) do Spark.implements_behaviour?(module, AshPostgres.ManualRelationship) end def can?(_, {:filter_relationship, _}), do: true def can?(_, {:aggregate_relationship, %{manual: {module, _}}}) do Spark.implements_behaviour?(module, AshPostgres.ManualRelationship) end def can?(_, {:aggregate_relationship, _}), do: true def can?(_, :timeout), do: true def can?(_, :expr_error), do: true def can?(resource, {:filter_expr, %Ash.Query.Function.Error{}}) do "ash-functions" in AshPostgres.DataLayer.Info.repo(resource, :read).installed_extensions() && "ash-functions" in AshPostgres.DataLayer.Info.repo(resource, :mutate).installed_extensions() end def can?(_, {:filter_expr, _}), do: true def can?(_, :nested_expressions), do: true def can?(_, {:query_aggregate, _}), do: true def can?(_, :sort), do: true def can?(_, :distinct_sort), do: true def can?(_, :distinct), do: true def can?(_, {:sort, _}), do: true def can?(_, _), do: false @impl true def in_transaction?(resource) do AshPostgres.DataLayer.Info.repo(resource, :mutate).in_transaction?() end @impl true def limit(query, nil, _), do: {:ok, query} def limit(query, limit, _resource) do {:ok, from(row in query, limit: ^limit)} end @impl true def source(resource) do AshPostgres.DataLayer.Info.table(resource) || "" end @impl true def set_context(resource, data_layer_query, context) do AshSql.Query.set_context(resource, data_layer_query, AshPostgres.SqlImplementation, context) end @impl true def offset(query, nil, _), do: query def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do {:ok, query} end def offset(query, offset, _resource) do {:ok, from(row in query, offset: ^offset)} end @impl true def return_query(query, resource) do AshSql.Query.return_query(query, resource) end @impl true def run_query(query, resource) do query = AshSql.Bindings.default_bindings(query, resource, AshPostgres.SqlImplementation) if AshPostgres.DataLayer.Info.polymorphic?(resource) && no_table?(query) do raise_table_error!(resource, :read) else repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, query) with_savepoint(repo, query, fn -> {:ok, repo.all( query, AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, nil, resource) ) |> AshSql.Query.remap_mapped_fields(query)} end) end rescue e -> handle_raised_error(e, __STACKTRACE__, query, resource) end defp no_table?(%{from: %{source: {"", _}}}), do: true defp no_table?(_), do: false @impl true def functions(resource) do config = AshPostgres.DataLayer.Info.repo(resource, :mutate).config() functions = [ AshPostgres.Functions.Like, AshPostgres.Functions.ILike ] functions = if "pg_trgm" in (config[:installed_extensions] || []) do functions ++ [ AshPostgres.Functions.TrigramSimilarity ] else functions end if "vector" in (config[:installed_extensions] || []) do functions ++ [ AshPostgres.Functions.VectorCosineDistance ] else functions end end @impl true def run_aggregate_query(original_query, aggregates, resource) do AshSql.AggregateQuery.run_aggregate_query( original_query, aggregates, resource, AshPostgres.SqlImplementation ) end @impl true def set_tenant(_resource, query, tenant) do {:ok, Map.put(Ecto.Query.put_query_prefix(query, to_string(tenant)), :__tenant__, tenant)} end @impl true def run_aggregate_query_with_lateral_join( query, aggregates, root_data, destination_resource, path ) do {can_group, cant_group} = aggregates |> Enum.split_with(&AshSql.Aggregate.can_group?(destination_resource, &1, query)) |> case do {[one], cant_group} -> {[], [one | cant_group]} {can_group, cant_group} -> {can_group, cant_group} end case lateral_join_query( query, root_data, path ) do {:ok, lateral_join_query} -> source_resource = path |> Enum.at(0) |> elem(0) |> Map.get(:resource) subquery = from(row in subquery(lateral_join_query), as: ^0, select: %{}) subquery = AshSql.Bindings.default_bindings( subquery, source_resource, AshPostgres.SqlImplementation ) {global_filter, can_group} = AshSql.Aggregate.extract_shared_filters(can_group) original_subquery = subquery subquery = case global_filter do {:ok, global_filter} -> filter(subquery, global_filter, destination_resource) :error -> {:ok, subquery} end case subquery do {:error, error} -> {:error, error} {:ok, subquery} -> query = Enum.reduce( can_group, subquery, fn agg, subquery -> has_exists? = Ash.Filter.find(agg.query && agg.query.filter, fn %Ash.Query.Exists{} -> true _ -> false end) first_relationship = Ash.Resource.Info.relationship( source_resource, agg.relationship_path |> Enum.at(0) ) AshSql.Aggregate.add_subquery_aggregate_select( subquery, agg.relationship_path |> Enum.drop(1), agg, destination_resource, has_exists?, first_relationship ) end ) result = case can_group do [] -> %{} _ -> repo = AshSql.dynamic_repo(source_resource, AshPostgres.SqlImplementation, query) repo.one( query, AshSql.repo_opts( repo, AshPostgres.SqlImplementation, nil, nil, source_resource ) ) end {:ok, AshSql.AggregateQuery.add_single_aggs( result, source_resource, original_subquery, cant_group, AshPostgres.SqlImplementation )} end {:error, error} -> {:error, error} end end @impl true def run_query_with_lateral_join( query, root_data, _destination_resource, path ) do {calculations_require_rewrite, aggregates_require_rewrite, query} = AshSql.Query.rewrite_nested_selects(query) case lateral_join_query( query, root_data, path ) do {:ok, lateral_join_query} -> source_resource = path |> Enum.at(0) |> elem(0) |> Map.get(:resource) repo = AshSql.dynamic_repo(source_resource, AshPostgres.SqlImplementation, lateral_join_query) results = repo.all( lateral_join_query, AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, nil, source_resource) ) |> AshSql.Query.remap_mapped_fields( query, calculations_require_rewrite, aggregates_require_rewrite ) {:ok, results} {:error, error} -> {:error, error} end end defp lateral_join_query( query, root_data, [{source_query, source_attribute, destination_attribute, relationship}] ) do source_query = Ash.Query.new(source_query) base_query = if query.__ash_bindings__[:__order__?] do from(row in query, select_merge: %{__order__: over(row_number(), :order)} ) else query end base_query = cond do Map.get(relationship, :manual) -> {module, opts} = relationship.manual module.ash_postgres_subquery( opts, 0, 0, base_query ) Map.get(relationship, :no_attributes?) -> base_query true -> from(destination in base_query, where: field(destination, ^destination_attribute) == field(parent_as(^0), ^source_attribute) ) end subquery = base_query |> set_subquery_prefix(source_query, relationship.destination) |> subquery() source_pkey = Ash.Resource.Info.primary_key(source_query.resource) case lateral_join_source_query(query, source_query) do {:ok, data_layer_query} -> source_values = Enum.map(root_data, &Map.get(&1, source_attribute)) data_layer_query = from(source in data_layer_query, where: field(source, ^source_attribute) in ^source_values ) if query.__ash_bindings__[:__order__?] do {:ok, from(source in data_layer_query, inner_lateral_join: destination in ^subquery, on: true, order_by: destination.__order__, select: destination, select_merge: %{__lateral_join_source__: map(source, ^source_pkey)}, distinct: true )} else {:ok, from(source in data_layer_query, inner_lateral_join: destination in ^subquery, on: true, select: destination, select_merge: %{__lateral_join_source__: map(source, ^source_pkey)}, distinct: true )} end {:error, error} -> {:error, error} end end defp lateral_join_query( query, root_data, [ {source_query, source_attribute, source_attribute_on_join_resource, relationship}, {through_resource, destination_attribute_on_join_resource, destination_attribute, through_relationship} ] ) do source_query = Ash.Query.new(source_query) source_values = Enum.map(root_data, &Map.get(&1, source_attribute)) source_pkey = Ash.Resource.Info.primary_key(source_query.resource) case lateral_join_source_query(query, source_query) do {:ok, data_layer_query} -> through_resource |> Ash.Query.new() |> Ash.Query.put_context(:data_layer, %{ start_bindings_at: data_layer_query.__ash_bindings__.current }) |> Ash.Query.set_context(through_relationship.context) |> Ash.Query.do_filter(through_relationship.filter) |> Ash.Query.set_tenant(source_query.tenant) |> set_lateral_join_prefix(query) |> case do %{valid?: true} = through_query -> through_query |> Ash.Query.data_layer_query() query -> {:error, query} end |> case do {:ok, through_query} -> if query.__ash_bindings__[:__order__?] do subquery = subquery( from( destination in query, select_merge: %{__order__: over(row_number(), :order)}, join: through in ^set_subquery_prefix( through_query, source_query, relationship.through ), as: ^data_layer_query.__ash_bindings__.current, on: field(through, ^destination_attribute_on_join_resource) == field(destination, ^destination_attribute), where: field(through, ^source_attribute_on_join_resource) == field( parent_as(^0), ^source_attribute ) ) |> set_subquery_prefix( source_query, relationship.destination ) ) {:ok, from(source in data_layer_query, where: field(source, ^source_attribute) in ^source_values, inner_lateral_join: destination in ^subquery, on: true, select: destination, select_merge: %{__lateral_join_source__: map(source, ^source_pkey)}, order_by: destination.__order__, distinct: true )} else subquery = subquery( from( destination in query, join: through in ^set_subquery_prefix( through_query, source_query, relationship.through ), as: ^data_layer_query.__ash_bindings__.current, on: field(through, ^destination_attribute_on_join_resource) == field(destination, ^destination_attribute), where: field(through, ^source_attribute_on_join_resource) == field( parent_as(^0), ^source_attribute ) ) |> set_subquery_prefix( source_query, relationship.destination ) ) {:ok, from(source in data_layer_query, where: field(source, ^source_attribute) in ^source_values, inner_lateral_join: destination in ^subquery, on: true, select: destination, select_merge: %{__lateral_join_source__: map(source, ^source_pkey)}, distinct: true )} end {:error, error} -> {:error, error} end {:error, error} -> {:error, error} end end defp lateral_join_source_query( %{ __ash_bindings__: %{ lateral_join_source_query: lateral_join_source_query } }, source_query ) when not is_nil(lateral_join_source_query) do {:ok, lateral_join_source_query |> set_subquery_prefix(source_query, lateral_join_source_query.__ash_bindings__.resource)} end defp lateral_join_source_query(query, source_query) do source_query.resource |> Ash.Query.set_context(%{:data_layer => source_query.context[:data_layer]}) |> Ash.Query.set_tenant(source_query.tenant) |> set_lateral_join_prefix(query) |> case do %{valid?: true} = query -> Ash.Query.data_layer_query(query) query -> {:error, query} end end @doc false def set_subquery_prefix(data_layer_query, source_query, resource) do repo = AshPostgres.DataLayer.Info.repo(resource, :mutate) config = repo.config() case data_layer_query do %{__ash_bindings__: %{context: %{data_layer: %{schema: schema}}}} when not is_nil(schema) -> data_layer_query _ -> query_tenant = case source_query do %{__tenant__: tenant} -> tenant %{tenant: tenant} -> tenant _ -> nil end if Ash.Resource.Info.multitenancy_strategy(resource) == :context do %{ data_layer_query | prefix: query_tenant || AshPostgres.DataLayer.Info.schema(resource) || config[:default_prefix] || "public" } else %{ data_layer_query | prefix: AshPostgres.DataLayer.Info.schema(resource) || config[:default_prefix] || "public" } end end end defp set_lateral_join_prefix(ash_query, query) do if Ash.Resource.Info.multitenancy_strategy(ash_query.resource) == :context do Ash.Query.set_tenant(ash_query, query.prefix) else ash_query end end @impl true def resource_to_query(resource, _) do AshSql.Query.resource_to_query(resource, AshPostgres.SqlImplementation) end @impl true def update_query(query, changeset, resource, options) do ecto_changeset = case changeset.data do %Ash.Changeset.OriginalDataNotAvailable{} -> changeset.resource.__struct__ data -> data end |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset))) |> ecto_changeset(changeset, :update, true) case bulk_updatable_query( query, resource, changeset.atomics, options[:calculations] || [], changeset.context ) do {:error, error} -> {:error, error} {:ok, query} -> try do repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset) repo_opts = AshSql.repo_opts( repo, AshPostgres.SqlImplementation, changeset.timeout, changeset.tenant, changeset.resource ) case AshSql.Atomics.query_with_atomics( resource, query, changeset.filter, changeset.atomics, ecto_changeset.changes, [] ) do :empty -> if options[:return_records?] do if changeset.context[:data_layer][:use_atomic_update_data?] do {:ok, [changeset.data]} else {:ok, repo.all(query)} end else :ok end {:ok, query} -> query = if options[:return_records?] do {:ok, query} = query |> Ecto.Query.exclude(:select) |> Ecto.Query.select([row], row) |> add_calculations(options[:calculations] || [], resource) query else Ecto.Query.exclude(query, :select) end {_, results} = with_savepoint(repo, query, fn -> repo.update_all( query, [], repo_opts ) end) if options[:return_records?] do {:ok, AshSql.Query.remap_mapped_fields(results, query)} else :ok end {:error, error} -> {:error, error} end rescue e -> handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end end end defp bulk_updatable_query(query, resource, atomics, calculations, context, type \\ :update) do Enum.reduce_while(atomics, {:ok, query}, fn {_, expr}, {:ok, query} -> used_aggregates = Ash.Filter.used_aggregates(expr, []) with {:ok, query} <- AshSql.Join.join_all_relationships( query, %Ash.Filter{ resource: resource, expression: expr }, left_only?: true ), {:ok, query} <- AshSql.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0) do {:cont, {:ok, query}} else {:error, error} -> {:halt, {:error, error}} end end) |> case do {:ok, query} -> requires_adding_inner_join? = case type do :update -> # could potentially optimize this to avoid the subquery by shuffling free # inner joins to the top of the query has_inner_join_to_start? = case Enum.at(query.joins, 0) do nil -> false %{qual: :inner} -> true _ -> false end cond do has_inner_join_to_start? -> false Enum.any?(query.joins, &(&1.qual != :inner)) -> true Enum.any?(atomics ++ calculations, fn {_, expr} -> Ash.Filter.list_refs(expr) |> Enum.any?(&(&1.relationship_path != [])) end) -> true true -> false end :destroy -> Enum.any?(query.joins, &(&1.qual != :inner)) || Enum.any?(atomics ++ calculations, fn {_, expr} -> expr |> Ash.Filter.list_refs() |> Enum.any?(&(&1.relationship_path != [])) end) end needs_to_join? = requires_adding_inner_join? || query.limit || query.offset query = if needs_to_join? do root_query = Ecto.Query.exclude(query, :select) root_query_result = cond do query.limit || query.offset -> with {:ok, root_query} <- AshSql.Atomics.select_atomics(resource, root_query, atomics) do select = Enum.map(Ash.Resource.Info.attributes(resource), & &1.name) root_query = Ecto.Query.select_merge( root_query, [record], map(record, ^select) ) {:ok, from(row in Ecto.Query.subquery(root_query), []), atomics != []} end !Enum.empty?(query.joins) -> with root_query <- Ecto.Query.exclude(root_query, :order_by), {:ok, root_query} <- AshSql.Atomics.select_atomics(resource, root_query, atomics) do select = Enum.map(Ash.Resource.Info.attributes(resource), & &1.name) root_query = Ecto.Query.select_merge( root_query, [record], map(record, ^select) ) {:ok, from(row in Ecto.Query.subquery(root_query), []), atomics != []} end true -> {:ok, Ecto.Query.exclude(root_query, :order_by), false} end case root_query_result do {:ok, root_query, selected_atomics?} -> dynamic = Enum.reduce(Ash.Resource.Info.primary_key(resource), nil, fn pkey, dynamic -> if dynamic do Ecto.Query.dynamic( [row, joining], field(row, ^pkey) == field(joining, ^pkey) and ^dynamic ) else Ecto.Query.dynamic( [row, joining], field(row, ^pkey) == field(joining, ^pkey) ) end end) faked_query = from(row in query.from.source, inner_join: limiter in ^root_query, as: ^0, on: ^dynamic ) |> AshSql.Bindings.default_bindings( query.__ash_bindings__.resource, AshPostgres.SqlImplementation, context ) |> then(fn query -> if selected_atomics? do Map.update!(query, :__ash_bindings__, &Map.put(&1, :atomics_in_binding, 0)) else query end end) {:ok, faked_query} {:error, error} -> {:error, error} end else {:ok, query |> Ecto.Query.exclude(:select) |> Ecto.Query.exclude(:order_by)} end case query do {:ok, query} -> Enum.reduce_while(calculations, {:ok, query}, fn {_, expr}, {:ok, query} -> used_aggregates = Ash.Filter.used_aggregates(expr, []) with {:ok, query} <- AshSql.Join.join_all_relationships( query, %Ash.Filter{ resource: resource, expression: expr }, left_only?: true ), {:ok, query} <- AshSql.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0) do {:cont, {:ok, query}} else {:error, error} -> {:halt, {:error, error}} end end) {:error, error} -> {:error, error} end {:error, error} -> {:error, error} end end @impl true def destroy_query(query, changeset, resource, options) do ecto_changeset = case changeset.data do %Ash.Changeset.OriginalDataNotAvailable{} -> changeset.resource.__struct__() data -> data end |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset))) |> ecto_changeset(changeset, :delete, true) case bulk_updatable_query( query, resource, [], options[:calculations] || [], changeset.context, :destroy ) do {:error, error} -> {:error, error} {:ok, query} -> try do repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset) repo_opts = AshSql.repo_opts( repo, AshPostgres.SqlImplementation, changeset.timeout, changeset.tenant, changeset.resource ) query = if options[:return_records?] do {:ok, query} = query |> Ecto.Query.exclude(:select) |> Ecto.Query.select([row], row) |> add_calculations(options[:calculations] || [], resource) query else Ecto.Query.exclude(query, :select) end {_, results} = with_savepoint(repo, query, fn -> repo.delete_all( query, repo_opts ) end) if options[:return_records?] do {:ok, AshSql.Query.remap_mapped_fields(results, query)} else :ok end rescue e -> handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end end end @impl true def calculate(resource, expressions, context) do ash_query = resource |> Ash.Query.new() |> Map.put(:context, context) {:ok, query} = Ash.Query.data_layer_query(ash_query) query = AshSql.Bindings.default_bindings(query, resource, AshPostgres.SqlImplementation) try do {dynamics, query} = Enum.reduce(expressions, {[], query}, fn expression, {dynamics, query} -> {dynamic, acc} = AshSql.Expr.dynamic_expr(query, expression, query.__ash_bindings__) dynamic = case dynamic do %Ecto.Query.DynamicExpr{} -> dynamic other -> Ecto.Query.dynamic(^other) end {[dynamic | dynamics], AshSql.Bindings.merge_expr_accumulator(query, acc)} end) dynamics = dynamics |> Enum.with_index() |> Map.new(fn {dynamic, index} -> {index, dynamic} end) query = Ecto.Query.from(row in fragment("UNNEST(ARRAY[1])"), select: ^dynamics) |> Map.put(:__ash_bindings__, query.__ash_bindings__) repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, ash_query) with_savepoint(repo, query, fn -> {:ok, repo.one(query) |> Enum.sort_by(&elem(&1, 0)) |> Enum.map(&elem(&1, 1)) |> Enum.reverse()} end) rescue e -> handle_raised_error(e, __STACKTRACE__, query, resource) end end @impl true def bulk_create(resource, stream, options) do changesets = Enum.to_list(stream) repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, Enum.at(changesets, 0)) opts = AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, options[:tenant], resource) opts = if options.return_records? do Keyword.put(opts, :returning, true) else opts end source = resolve_source(resource, Enum.at(changesets, 0)) try do opts = if options[:upsert?] do # Ash groups changesets by atomics before dispatching them to the data layer # this means that all changesets have the same atomics %{atomics: atomics, filter: filter} = Enum.at(changesets, 0) query = from(row in source, as: ^0) query = query |> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation) upsert_set = upsert_set(resource, changesets, options) on_conflict = case AshSql.Atomics.query_with_atomics( resource, query, filter, atomics, %{}, upsert_set ) do :empty -> {:replace, options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)} {:ok, query} -> query {:error, error} -> raise Ash.Error.to_ash_error(error) end opts |> Keyword.put(:on_conflict, on_conflict) |> Keyword.put( :conflict_target, conflict_target( resource, options[:identity], options[:upsert_keys] || Ash.Resource.Info.primary_key(resource) ) ) else opts end ecto_changesets = Enum.map(changesets, & &1.attributes) opts = if schema = Enum.at(changesets, 0).context[:data_layer][:schema] do Keyword.put(opts, :prefix, schema) else opts end result = with_savepoint(repo, opts[:on_conflict], fn -> repo.insert_all(source, ecto_changesets, opts) end) case result do {_, nil} -> :ok {_, results} -> if options[:single?] do Enum.each(results, &maybe_create_tenant!(resource, &1)) {:ok, results} else {:ok, Stream.zip_with(results, changesets, fn result, changeset -> if !opts[:upsert?] do maybe_create_tenant!(resource, result) end Ash.Resource.put_metadata( result, :bulk_create_index, changeset.context.bulk_create.index ) end)} end end rescue e -> changeset = Ash.Changeset.new(resource) handle_raised_error( e, __STACKTRACE__, {:bulk_create, ecto_changeset(changeset.data, changeset, :create, false)}, resource ) end end defp with_savepoint( repo, %{ __ash_bindings__: %{ expression_accumulator: %AshSql.Expr.ExprInfo{has_error?: true} } }, fun ) do if repo.in_transaction?() do savepoint_id = "a" <> (Ash.UUID.generate() |> String.replace("-", "_")) repo.query!("SAVEPOINT #{savepoint_id}") result = try do {:ok, fun.()} rescue e in Postgrex.Error -> case e do %Postgrex.Error{ postgres: %{ code: :raise_exception, message: "ash_error:" <> _, severity: "ERROR" } } -> repo.query!("ROLLBACK TO #{savepoint_id}") # This kind of exception won't trigger # a rollback {:exception, e, __STACKTRACE__} _ -> {:exception, e, __STACKTRACE__} end end case result do {:exception, e, stacktrace} -> reraise e, stacktrace {:ok, result} -> repo.query!("RELEASE #{savepoint_id}") result end else fun.() end end defp with_savepoint(_repo, _acc, fun) do fun.() end defp upsert_set(resource, changesets, options) do attributes_changing_anywhere = changesets |> Enum.flat_map(&Map.keys(&1.attributes)) |> Enum.uniq() update_defaults = update_defaults(resource) # We can't reference EXCLUDED if at least one of the changesets in the stream is not # changing the value (and we wouldn't want to even if we could as it would be unnecessary) upsert_fields = (options[:upsert_fields] || []) |> Enum.filter(&(&1 in attributes_changing_anywhere)) fields_to_upsert = upsert_fields -- Keyword.keys(Enum.at(changesets, 0).atomics) Enum.map(fields_to_upsert, fn upsert_field -> # for safety, we check once more at the end that all values in # upsert_fields are names of attributes. This is because # below we use `literal/1` to bring them into the query if is_nil(resource.__schema__(:type, upsert_field)) do raise "Only attribute names can be used in upsert_fields" end case Keyword.fetch(update_defaults, upsert_field) do {:ok, default} -> if upsert_field in upsert_fields do {upsert_field, Ecto.Query.dynamic( [], fragment( "COALESCE(EXCLUDED.?, ?)", literal(^to_string(get_source_for_upsert_field(upsert_field, resource))), ^default ) )} else {upsert_field, default} end :error -> {upsert_field, Ecto.Query.dynamic( [], fragment( "EXCLUDED.?", literal(^to_string(get_source_for_upsert_field(upsert_field, resource))) ) )} end end) end defp get_source_for_upsert_field(field, resource) do case Ash.Resource.Info.attribute(resource, field) do %{source: source} when not is_nil(source) -> source _ -> field end end @impl true def create(resource, changeset) do changeset = %{ changeset | data: Map.update!( changeset.data, :__meta__, &Map.put(&1, :source, table(resource, changeset)) ) } case bulk_create(resource, [changeset], %{ single?: true, tenant: Map.get(changeset, :to_tenant, changeset.tenant), return_records?: true }) do {:ok, [result]} -> {:ok, result} {:error, error} -> {:error, error} end end defp maybe_create_tenant!(resource, result) do if AshPostgres.DataLayer.Info.manage_tenant_create?(resource) do tenant_name = tenant_name(resource, result) AshPostgres.MultiTenancy.create_tenant!( tenant_name, AshPostgres.DataLayer.Info.repo(resource, :read) ) else :ok end end defp maybe_update_tenant(resource, changeset, result) do if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do changing_tenant_name? = resource |> AshPostgres.DataLayer.Info.manage_tenant_template() |> Enum.filter(&is_atom/1) |> Enum.any?(&Ash.Changeset.changing_attribute?(changeset, &1)) if changing_tenant_name? do old_tenant_name = tenant_name(resource, changeset.data) new_tenant_name = tenant_name(resource, result) AshPostgres.MultiTenancy.rename_tenant( AshPostgres.DataLayer.Info.repo(resource, :read), old_tenant_name, new_tenant_name ) end end :ok end defp tenant_name(resource, result) do resource |> AshPostgres.DataLayer.Info.manage_tenant_template() |> Enum.map_join(fn item -> if is_binary(item) do item else result |> Map.get(item) |> to_string() end end) end defp to_ash_error({field, {message, vars}}) do Ash.Error.Changes.InvalidAttribute.exception( field: field, message: message, private_vars: vars ) end defp ecto_changeset(record, changeset, type, table_error? \\ true) do attributes = changeset.resource |> Ash.Resource.Info.attributes() |> Enum.map(& &1.name) attributes_to_change = Enum.reject(attributes, fn attribute -> Keyword.has_key?(changeset.atomics, attribute) end) ecto_changeset = record |> to_ecto() |> set_table(changeset, type, table_error?) |> Ecto.Changeset.change(Map.take(changeset.attributes, attributes_to_change)) |> add_configured_foreign_key_constraints(record.__struct__) |> add_unique_indexes(record.__struct__, changeset) |> add_check_constraints(record.__struct__) |> add_exclusion_constraints(record.__struct__) case type do :create -> ecto_changeset |> add_my_foreign_key_constraints(record.__struct__) type when type in [:upsert, :update] -> ecto_changeset |> add_my_foreign_key_constraints(record.__struct__) |> add_related_foreign_key_constraints(record.__struct__) :delete -> ecto_changeset |> add_related_foreign_key_constraints(record.__struct__) end end defp handle_raised_error( %Ecto.StaleEntryError{changeset: %{data: %resource{}, filters: filters}}, stacktrace, context, resource ) do handle_raised_error( Ash.Error.Changes.StaleRecord.exception(resource: resource, filter: filters), stacktrace, context, resource ) end defp handle_raised_error( %Postgrex.Error{ postgres: %{ code: :lock_not_available, message: message } }, stacktrace, context, resource ) do handle_raised_error( Ash.Error.Invalid.Unavailable.exception( resource: resource, source: inspect(context, pretty: true), reason: message ), stacktrace, context, resource ) end defp handle_raised_error( %Postgrex.Error{} = error, stacktrace, {:bulk_create, fake_changeset}, _resource ) do case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do [] -> {:error, Ash.Error.to_ash_error(error, stacktrace)} constraints -> {:error, fake_changeset |> constraints_to_errors(:insert, constraints) |> Ash.Error.to_ash_error()} end end defp handle_raised_error(%Ecto.Query.CastError{} = e, stacktrace, context, resource) do handle_raised_error( Ash.Error.Query.InvalidFilterValue.exception(value: e.value, context: context), stacktrace, context, resource ) end defp handle_raised_error( %Postgrex.Error{ postgres: %{ code: :raise_exception, message: "ash_error: \"" <> json, severity: "ERROR" } }, _, _, _ ) do %{"exception" => exception, "input" => input} = json |> String.trim_trailing("\"") |> String.replace("\\\"", "\"") |> Jason.decode!() exception = Module.concat([exception]) {:error, :no_rollback, Ash.Error.from_json(exception, input)} end defp handle_raised_error( %Postgrex.Error{ postgres: %{ code: :raise_exception, message: "ash_error: " <> json, severity: "ERROR" } }, _, _, _ ) do %{"exception" => exception, "input" => input} = Jason.decode!(json) exception = Module.concat([exception]) {:error, :no_rollback, Ash.Error.from_json(exception, input)} end defp handle_raised_error( %Postgrex.Error{} = error, stacktrace, %{constraints: user_constraints}, _resource ) do case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do [{type, constraint}] -> user_constraint = Enum.find(user_constraints, fn c -> case {c.type, c.constraint, c.match} do {^type, ^constraint, :exact} -> true {^type, cc, :suffix} -> String.ends_with?(constraint, cc) {^type, cc, :prefix} -> String.starts_with?(constraint, cc) {^type, %Regex{} = r, _match} -> Regex.match?(r, constraint) _ -> false end end) case user_constraint do %{field: field, error_message: error_message, error_type: error_type} -> {:error, to_ash_error( {field, {error_message, [constraint: error_type, constraint_name: constraint]}} )} nil -> reraise error, stacktrace end _ -> reraise error, stacktrace end end defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do {:error, Ash.Error.to_ash_error(error, stacktrace)} end defp constraints_to_errors(%{constraints: user_constraints} = changeset, action, constraints) do Enum.map(constraints, fn {type, constraint} -> user_constraint = Enum.find(user_constraints, fn c -> case {c.type, c.constraint, c.match} do {^type, ^constraint, :exact} -> true {^type, cc, :suffix} -> String.ends_with?(constraint, cc) {^type, cc, :prefix} -> String.starts_with?(constraint, cc) {^type, %Regex{} = r, _match} -> Regex.match?(r, constraint) _ -> false end end) case user_constraint do %{field: field, error_message: error_message, type: type, constraint: constraint} -> Ash.Error.Changes.InvalidAttribute.exception( field: field, message: error_message, private_vars: [ constraint: constraint, constraint_type: type ] ) nil -> Ecto.ConstraintError.exception( action: action, type: type, constraint: constraint, changeset: changeset ) end end) end defp set_table(record, changeset, operation, table_error?) do if AshPostgres.DataLayer.Info.polymorphic?(record.__struct__) do table = changeset.context[:data_layer][:table] || AshPostgres.DataLayer.Info.table(record.__struct__) record = if table do Ecto.put_meta(record, source: table) else if table_error? do raise_table_error!(changeset.resource, operation) else record end end prefix = changeset.context[:data_layer][:schema] || AshPostgres.DataLayer.Info.schema(record.__struct__) if prefix do Ecto.put_meta(record, prefix: table) else record end else record end end def from_ecto({:ok, result}), do: {:ok, from_ecto(result)} def from_ecto({:error, _} = other), do: other def from_ecto(nil), do: nil def from_ecto(value) when is_list(value) do Enum.map(value, &from_ecto/1) end def from_ecto(%resource{} = record) do if Spark.Dsl.is?(resource, Ash.Resource) do empty = struct(resource) resource |> Ash.Resource.Info.relationships() |> Enum.reduce(record, fn relationship, record -> case Map.get(record, relationship.name) do %Ecto.Association.NotLoaded{} -> Map.put(record, relationship.name, Map.get(empty, relationship.name)) value -> Map.put(record, relationship.name, from_ecto(value)) end end) else record end end def from_ecto(other), do: other def to_ecto(nil), do: nil def to_ecto(value) when is_list(value) do Enum.map(value, &to_ecto/1) end def to_ecto(%resource{} = record) do if Spark.Dsl.is?(resource, Ash.Resource) do resource |> Ash.Resource.Info.relationships() |> Enum.reduce(record, fn relationship, record -> value = case Map.get(record, relationship.name) do %Ash.NotLoaded{} -> %Ecto.Association.NotLoaded{ __field__: relationship.name, __cardinality__: relationship.cardinality } value -> to_ecto(value) end Map.put(record, relationship.name, value) end) else record end end def to_ecto(other), do: other defp add_check_constraints(changeset, resource) do resource |> AshPostgres.DataLayer.Info.check_constraints() |> Enum.reduce(changeset, fn constraint, changeset -> constraint.attribute |> List.wrap() |> Enum.reduce(changeset, fn attribute, changeset -> Ecto.Changeset.check_constraint(changeset, attribute, name: constraint.name, message: constraint.message || "is invalid" ) end) end) end defp add_exclusion_constraints(changeset, resource) do resource |> AshPostgres.DataLayer.Info.exclusion_constraint_names() |> Enum.reduce(changeset, fn constraint, changeset -> case constraint do {key, name} -> Ecto.Changeset.exclusion_constraint(changeset, key, name: name) {key, name, message} -> Ecto.Changeset.exclusion_constraint(changeset, key, name: name, message: message) end end) end defp add_related_foreign_key_constraints(changeset, resource) do # TODO: this doesn't guarantee us to get all of them, because if something is related to this # schema and there is no back-relation, then this won't catch it's foreign key constraints resource |> Ash.Resource.Info.relationships() |> Enum.map(& &1.destination) |> Enum.uniq() |> Enum.flat_map(fn related -> related |> Ash.Resource.Info.relationships() |> Enum.filter(&(&1.destination == resource)) |> Enum.map(&Map.take(&1, [:source, :source_attribute, :destination_attribute, :name])) end) |> Enum.reduce(changeset, fn %{ source: source, source_attribute: source_attribute, destination_attribute: destination_attribute, name: relationship_name }, changeset -> case AshPostgres.DataLayer.Info.reference(resource, relationship_name) do %{name: name} when not is_nil(name) -> Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute, name: name, message: "would leave records behind" ) _ -> Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute, name: "#{AshPostgres.DataLayer.Info.table(source)}_#{source_attribute}_fkey", message: "would leave records behind" ) end end) end defp add_my_foreign_key_constraints(changeset, resource) do resource |> Ash.Resource.Info.relationships() |> Enum.reduce(changeset, &Ecto.Changeset.foreign_key_constraint(&2, &1.source_attribute)) end defp add_configured_foreign_key_constraints(changeset, resource) do resource |> AshPostgres.DataLayer.Info.foreign_key_names() |> case do {m, f, a} -> List.wrap(apply(m, f, [changeset | a])) value -> List.wrap(value) end |> Enum.reduce(changeset, fn {key, name}, changeset -> Ecto.Changeset.foreign_key_constraint(changeset, key, name: name) {key, name, message}, changeset -> Ecto.Changeset.foreign_key_constraint(changeset, key, name: name, message: message) end) end defp add_unique_indexes(changeset, resource, ash_changeset) do table = table(resource, ash_changeset) pkey = Ash.Resource.Info.primary_key(resource) changeset = resource |> Ash.Resource.Info.identities() |> Enum.reduce(changeset, fn identity, changeset -> name = AshPostgres.DataLayer.Info.identity_index_names(resource)[identity.name] || "#{table}_#{identity.name}_index" opts = if Map.get(identity, :message) do [name: name, message: identity.message] else [name: name] end fields = case identity.keys do [] -> pkey keys -> keys end Ecto.Changeset.unique_constraint(changeset, fields, opts) end) changeset = resource |> AshPostgres.DataLayer.Info.custom_indexes() |> Enum.reduce(changeset, fn index, changeset -> name = index.name || AshPostgres.CustomIndex.name(table, index) opts = if index.message do [name: name, message: index.message] else [name: name] end fields = if index.error_fields do case index.error_fields do [] -> pkey fields -> fields end else case Enum.filter(index.fields, &is_atom/1) do [] -> pkey fields -> fields end end Ecto.Changeset.unique_constraint(changeset, fields, opts) end) names = resource |> AshPostgres.DataLayer.Info.unique_index_names() |> case do {m, f, a} -> List.wrap(apply(m, f, [changeset | a])) value -> List.wrap(value) end names = case Ash.Resource.Info.primary_key(resource) do [] -> names fields -> if table = table(resource, ash_changeset) do [{fields, table <> "_pkey"} | names] else [] end end Enum.reduce(names, changeset, fn {keys, name}, changeset -> Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name) {keys, name, message}, changeset -> Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name, message: message) end) end @impl true def upsert(resource, changeset, keys, identity) do if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do {:error, "Cannot currently upsert a resource that owns a tenant"} else keys = keys || Ash.Resource.Info.primary_key(keys) update_defaults = update_defaults(resource) explicitly_changing_attributes = changeset.attributes |> Map.keys() |> Enum.concat(Keyword.keys(update_defaults)) |> Kernel.--(Map.get(changeset, :defaults, [])) |> Kernel.--(keys) upsert_fields = changeset.context[:private][:upsert_fields] || explicitly_changing_attributes case bulk_create(resource, [changeset], %{ single?: true, upsert?: true, tenant: changeset.tenant, identity: identity, upsert_keys: keys, upsert_fields: upsert_fields, return_records?: true }) do {:ok, [result]} -> {:ok, result} {:error, error} -> {:error, error} end end end defp conflict_target(resource, identity, keys) do identity_where = case identity do %{name: name, where: where} when not is_nil(where) -> AshPostgres.DataLayer.Info.identity_where_to_sql(resource, name) || raise( "Must provide an entry for :#{identity.name} in `postgres.identity_wheres_to_sql` to use it as an upsert_identity" ) _ -> nil end base_filter_sql = case Ash.Resource.Info.base_filter(resource) do nil -> nil _base_filter -> AshPostgres.DataLayer.Info.base_filter_sql(resource) || raise """ Cannot use upserts with resources that have a base_filter without also adding `base_filter_sql` in the postgres section. """ end where = case {base_filter_sql, identity_where} do {nil, nil} -> nil {base_filter_sql, nil} -> " WHERE (#{base_filter_sql})" {nil, identity_where} -> " WHERE (#{identity_where})" {base_filter_sql, identity_where} -> " WHERE ((#{base_filter_sql}) AND (#{identity_where}))" end if is_nil(where) && Enum.all?(keys, &Ash.Resource.Info.attribute(resource, &1)) do keys else sources = sources_to_sql(resource, keys) {:unsafe_fragment, "(" <> Enum.join(sources, ", ") <> ")#{where}"} end end defp sources_to_sql(resource, keys) do Enum.map(keys, fn key -> case Ash.Resource.Info.field(resource, key) do %Ash.Resource.Attribute{source: source, name: name} -> ~s("#{source || name}") %Ash.Resource.Calculation{name: name} -> if sql = AshPostgres.DataLayer.Info.calculation_to_sql(resource, name) do "(" <> sql <> ")" else raise ArgumentError, "Calculation #{inspect(key)} used in `AshPostgres.DataLayer` conflict target must have its sql defined in `calculations_to_sql`" end _other -> raise ArgumentError, "Unsupported field #{inspect(key)} used in `AshPostgres.DataLayer` conflict target" end end) end defp update_defaults(resource) do attributes = resource |> Ash.Resource.Info.attributes() |> Enum.reject(&is_nil(&1.update_default)) attributes |> static_defaults() |> Enum.concat(lazy_matching_defaults(attributes)) |> Enum.concat(lazy_non_matching_defaults(attributes)) end defp static_defaults(attributes) do attributes |> Enum.reject(&get_default_fun(&1)) |> Enum.map(&{&1.name, &1.update_default}) end defp lazy_non_matching_defaults(attributes) do attributes |> Enum.filter(&(!&1.match_other_defaults? && get_default_fun(&1))) |> Enum.map(fn attribute -> default_value = case attribute.update_default do function when is_function(function) -> function.() {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> apply(m, f, a) end {:ok, default_value} = Ash.Type.cast_input(attribute.type, default_value, attribute.constraints) {attribute.name, default_value} end) end defp lazy_matching_defaults(attributes) do attributes |> Enum.filter(&(&1.match_other_defaults? && get_default_fun(&1))) |> Enum.group_by(&{&1.update_default, &1.type, &1.constraints}) |> Enum.flat_map(fn {{default_fun, type, constraints}, attributes} -> default_value = case default_fun do function when is_function(function) -> function.() {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> apply(m, f, a) end {:ok, default_value} = Ash.Type.cast_input(type, default_value, constraints) Enum.map(attributes, &{&1.name, default_value}) end) end defp get_default_fun(attribute) do if is_function(attribute.update_default) or match?({_, _, _}, attribute.update_default) do attribute.update_default end end @impl true def update(resource, changeset) do ecto_changeset = changeset.data |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset))) |> ecto_changeset(changeset, :update) source = resolve_source(resource, changeset) query = from(row in source, as: ^0) |> AshSql.Bindings.default_bindings( resource, AshPostgres.SqlImplementation, changeset.context ) |> pkey_filter(changeset.data) case bulk_updatable_query(query, resource, changeset.atomics, [], changeset.context) do {:error, error} -> {:error, error} {:ok, query} -> modifying = Map.keys(changeset.attributes) ++ Keyword.keys(changeset.atomics) ++ Ash.Resource.Info.primary_key(resource) query = Ecto.Query.select(query, ^modifying) try do case AshSql.Atomics.query_with_atomics( resource, query, changeset.filter, changeset.atomics, ecto_changeset.changes, [] ) do :empty -> {:ok, changeset.data} {:ok, query} -> repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset) repo_opts = AshSql.repo_opts( repo, AshPostgres.SqlImplementation, changeset.timeout, changeset.tenant, changeset.resource ) repo_opts = Keyword.put(repo_opts, :returning, Keyword.keys(changeset.atomics)) result = with_savepoint(repo, query, fn -> repo.update_all( query, [], repo_opts ) end) case result do {0, []} -> {:error, Ash.Error.Changes.StaleRecord.exception( resource: resource, filter: changeset.filter )} {1, [result]} -> record = changeset.data |> Map.merge(Map.take(result, modifying)) maybe_update_tenant(resource, changeset, record) {:ok, record} end {:error, error} -> {:error, error} end rescue e -> handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end end end defp pkey_filter(query, %resource{} = record) do pkey = record |> Map.take(Ash.Resource.Info.primary_key(resource)) |> Map.to_list() Ecto.Query.where(query, ^pkey) end @impl true def destroy(resource, %{data: record} = changeset) do ecto_changeset = ecto_changeset(record, changeset, :delete) try do repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset) source = resolve_source(resource, changeset) from(row in source, as: ^0) |> AshSql.Bindings.default_bindings( resource, AshPostgres.SqlImplementation, changeset.context ) |> filter(changeset.filter, resource) |> case do {:ok, query} -> query |> pkey_filter(record) |> repo.delete_all( AshSql.repo_opts( repo, AshPostgres.SqlImplementation, changeset.timeout, changeset.tenant, changeset.resource ) ) :ok {:error, error} -> {:error, error} end rescue e -> handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end end @impl true def lock(query, :for_update, _) do if query.distinct do new_query = Ecto.Query.lock(%{query | distinct: nil}, [{^0, a}], fragment("FOR UPDATE OF ?", a)) q = from(row in subquery(new_query), []) {:ok, %{q | distinct: query.distinct}} else {:ok, Ecto.Query.lock(query, [{^0, a}], fragment("FOR UPDATE OF ?", a))} end end @locks [ "FOR UPDATE", "FOR NO KEY UPDATE", "FOR SHARE", "FOR KEY SHARE" ] for lock <- @locks do frag = "#{lock} OF ?" def lock(query, unquote(lock), _) do {:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))} end frag = "#{lock} OF ? NOWAIT" lock = "#{lock} NOWAIT" def lock(query, unquote(lock), _) do {:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))} end end @impl true def sort(query, sort, _resource) do {:ok, Map.update!( query, :__ash_bindings__, &Map.put(&1, :sort, sort) )} end @impl true def select(query, select, _resource) do {:ok, from(row in query, select: struct(row, ^Enum.uniq(select)))} end @impl true def distinct_sort(query, sort, _) when sort in [nil, []] do {:ok, query} end def distinct_sort(query, sort, _) do {:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :distinct_sort, sort))} end # If the order by does not match the initial sort clause, then we use a subquery # to limit to only distinct rows. This may not perform that well, so we may need # to come up with alternatives here. @impl true def distinct(query, distinct, resource) do AshSql.Distinct.distinct(query, distinct, resource) end @impl true def filter(query, filter, resource, opts \\ []) do used_aggregates = Ash.Filter.used_aggregates(filter, []) query |> AshSql.Join.join_all_relationships(filter, opts) |> case do {:ok, query} -> query |> AshSql.Aggregate.add_aggregates(used_aggregates, resource, false, 0) |> case do {:ok, query} -> {:ok, AshSql.Filter.add_filter_expression(query, filter)} {:error, error} -> {:error, error} end {:error, error} -> {:error, error} end end @impl true def add_aggregates(query, aggregates, resource) do AshSql.Aggregate.add_aggregates(query, aggregates, resource, true, 0) end @impl true def add_calculations(query, calculations, resource, select? \\ true) do AshSql.Calculation.add_calculations(query, calculations, resource, 0, select?) end def add_known_binding(query, data, known_binding) do bindings = query.__ash_bindings__.bindings new_ash_bindings = %{ query.__ash_bindings__ | bindings: Map.put(bindings, known_binding, data) } %{query | __ash_bindings__: new_ash_bindings} end @impl true def transaction(resource, func, timeout \\ nil, reason \\ %{type: :custom, metadata: %{}}) do repo = case reason[:data_layer_context] do %{repo: repo} when not is_nil(repo) -> repo _ -> AshPostgres.DataLayer.Info.repo(resource, :read) end func = fn -> repo.on_transaction_begin(reason) func.() end if timeout do repo.transaction(func, timeout: timeout) else repo.transaction(func) end end if Code.ensure_loaded?(Igniter) do def install(igniter, module, Ash.Resource, _path, _argv) do table_name = module |> Module.split() |> List.last() |> Macro.underscore() repo = Igniter.Code.Module.module_name("Repo") igniter |> Spark.Igniter.set_option(module, [:postgres, :table], table_name) |> Spark.Igniter.set_option(module, [:postgres, :repo], repo) end def install(igniter, _, _, _), do: igniter end @impl true def rollback(resource, term) do AshPostgres.DataLayer.Info.repo(resource, :mutate).rollback(term) end defp table(resource, changeset) do changeset.context[:data_layer][:table] || AshPostgres.DataLayer.Info.table(resource) end defp raise_table_error!(resource, operation) do if AshPostgres.DataLayer.Info.polymorphic?(resource) do raise """ Could not determine table for #{operation} on #{inspect(resource)}. Polymorphic resources require that the `data_layer[:table]` context is provided. See the guide on polymorphic resources for more information. """ else raise """ Could not determine table for #{operation} on #{inspect(resource)}. """ end end defp resolve_source(resource, changeset) do if table = changeset.context[:data_layer][:table] do {table, resource} else resource end end end