defmodule AshSqlite.DataLayer do @index %Spark.Dsl.Entity{ name: :index, describe: """ Add an index to be managed by the migration generator. """, examples: [ "index [\"column\", \"column2\"], unique: true, where: \"thing = TRUE\"" ], target: AshSqlite.CustomIndex, schema: AshSqlite.CustomIndex.schema(), transform: {AshSqlite.CustomIndex, :transform, []}, args: [:fields] } @custom_indexes %Spark.Dsl.Section{ name: :custom_indexes, describe: """ A section for configuring indexes to be created by the migration generator. In general, prefer to use `identities` for simple unique constraints. This is a tool to allow for declaring more complex indexes. """, examples: [ """ custom_indexes do index [:column1, :column2], unique: true, where: "thing = TRUE" end """ ], entities: [ @index ] } @statement %Spark.Dsl.Entity{ name: :statement, describe: """ Add a custom statement for migrations. """, examples: [ """ statement :pgweb_idx do up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));" down "DROP INDEX pgweb_idx;" end """ ], target: AshSqlite.Statement, schema: AshSqlite.Statement.schema(), args: [:name] } @custom_statements %Spark.Dsl.Section{ name: :custom_statements, describe: """ A section for configuring custom statements to be added to migrations. Changing custom statements may require manual intervention, because Ash can't determine what order they should run in (i.e if they depend on table structure that you've added, or vice versa). As such, any `down` statements we run for custom statements happen first, and any `up` statements happen last. Additionally, when changing a custom statement, we must make some assumptions, i.e that we should migrate the old structure down using the previously configured `down` and recreate it. This may not be desired, and so what you may end up doing is simply modifying the old migration and deleting whatever was generated by the migration generator. As always: read your migrations after generating them! """, examples: [ """ custom_statements do # the name is used to detect if you remove or modify the statement statement :pgweb_idx do up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));" down "DROP INDEX pgweb_idx;" end end """ ], entities: [ @statement ] } @reference %Spark.Dsl.Entity{ name: :reference, describe: """ Configures the reference for a relationship in resource migrations. Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys. In those cases, you only need to configure the `reference` behavior for one of them. Any conflicts will result in an error, across this resource and any other resources that share a table with this one. For this reason, instead of adding a reference configuration for `:nothing`, its best to just leave the configuration out, as that is the default behavior if *no* relationship anywhere has configured the behavior of that reference. """, examples: [ "reference :post, on_delete: :delete, on_update: :update, name: \"comments_to_posts_fkey\"" ], args: [:relationship], target: AshSqlite.Reference, schema: AshSqlite.Reference.schema() } @references %Spark.Dsl.Section{ name: :references, describe: """ A section for configuring the references (foreign keys) in resource migrations. This section is only relevant if you are using the migration generator with this resource. Otherwise, it has no effect. """, examples: [ """ references do reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey" end """ ], entities: [@reference], schema: [ polymorphic_on_delete: [ type: {:one_of, [:delete, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables." ], polymorphic_on_update: [ type: {:one_of, [:update, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables." ], polymorphic_name: [ type: {:one_of, [:update, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables." ] ] } @references %Spark.Dsl.Section{ name: :references, describe: """ A section for configuring the references (foreign keys) in resource migrations. This section is only relevant if you are using the migration generator with this resource. Otherwise, it has no effect. """, examples: [ """ references do reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey" end """ ], entities: [@reference], schema: [ polymorphic_on_delete: [ type: {:one_of, [:delete, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables." ], polymorphic_on_update: [ type: {:one_of, [:update, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables." ], polymorphic_name: [ type: {:one_of, [:update, :nilify, :nothing, :restrict]}, doc: "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables." ] ] } @sqlite %Spark.Dsl.Section{ name: :sqlite, describe: """ Sqlite data layer configuration """, sections: [ @custom_indexes, @custom_statements, @references ], modules: [ :repo ], examples: [ """ sqlite do repo MyApp.Repo table "organizations" end """ ], schema: [ repo: [ type: :atom, required: true, doc: "The repo that will be used to fetch your data. See the `AshSqlite.Repo` documentation for more" ], migrate?: [ type: :boolean, default: true, doc: "Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`" ], migration_types: [ type: :keyword_list, default: [], doc: "A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults." ], migration_defaults: [ type: :keyword_list, default: [], doc: """ A keyword list of attribute names to the ecto migration default that should be used for that attribute. The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\\\"now()\\\\")`, or for `nil`, use `\\\\"nil\\\\"`. """ ], base_filter_sql: [ type: :string, doc: "A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter" ], skip_unique_indexes: [ type: {:wrap_list, :atom}, default: false, doc: "Skip generating unique indexes when generating migrations" ], unique_index_names: [ type: {:list, {:or, [{:tuple, [{:list, :atom}, :string]}, {:tuple, [{:list, :atom}, :string, :string]}]}}, default: [], doc: """ A list of unique index names that could raise errors that are not configured in identities, or an mfa to a function that takes a changeset and returns the list. In the format `{[:affected, :keys], "name_of_constraint"}` or `{[:affected, :keys], "name_of_constraint", "custom error message"}` """ ], exclusion_constraint_names: [ type: :any, default: [], doc: """ A list of exclusion constraint names that could raise errors. Must be in the format `{:affected_key, "name_of_constraint"}` or `{:affected_key, "name_of_constraint", "custom error message"}` """ ], identity_index_names: [ type: :any, default: [], doc: """ A keyword list of identity names to the unique index name that they should use when being managed by the migration generator. """ ], foreign_key_names: [ type: {:list, {:or, [{:tuple, [:atom, :string]}, {:tuple, [:string, :string]}]}}, default: [], doc: """ A list of foreign keys that could raise errors, or an mfa to a function that takes a changeset and returns a list. In the format: `{:key, "name_of_constraint"}` or `{:key, "name_of_constraint", "custom error message"}` """ ], migration_ignore_attributes: [ type: {:list, :atom}, default: [], doc: """ A list of attributes that will be ignored when generating migrations. """ ], table: [ type: :string, doc: """ The table to store and read the resource from. If this is changed, the migration generator will not remove the old table. """ ], polymorphic?: [ type: :boolean, default: false, doc: """ Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/resources/polymorphic-resources.md) for more. """ ] ] } @behaviour Ash.DataLayer @sections [@sqlite] @moduledoc """ A sqlite data layer that leverages Ecto's sqlite capabilities. """ use Spark.Dsl.Extension, sections: @sections, transformers: [ AshSqlite.Transformers.ValidateReferences, AshSqlite.Transformers.VerifyRepo, AshSqlite.Transformers.EnsureTableOrPolymorphic ] def migrate(args) do # TODO: take args that we care about Mix.Task.run("ash_sqlite.migrate", args) end def rollback(args) do repos = AshSqlite.Mix.Helpers.repos!([], args) show_for_repo? = Enum.count_until(repos, 2) == 2 for repo <- repos do for_repo = if show_for_repo? do " for repo #{inspect(repo)}" else "" end migrations_path = AshSqlite.Mix.Helpers.migrations_path([], repo) files = migrations_path |> Path.join("**/*.exs") |> Path.wildcard() |> Enum.sort() |> Enum.reverse() |> Enum.take(20) |> Enum.map(&String.trim_leading(&1, migrations_path)) |> Enum.with_index() |> Enum.map(fn {file, index} -> "#{index + 1}: #{file}" end) n = Mix.shell().prompt( """ How many migrations should be rolled back#{for_repo}? (default: 0) Last 20 migration names, with the input you must provide to rollback up to *and including* that migration: #{Enum.join(files, "\n")} Rollback to: """ |> String.trim_trailing() ) |> String.trim() |> case do "" -> 0 n -> try do String.to_integer(n) rescue _ -> # credo:disable-for-next-line raise "Required an integer value, got: #{n}" end end Mix.Task.run("ash_postgres.rollback", args ++ ["-r", inspect(repo), "-n", to_string(n)]) Mix.Task.reenable("ash_postgres.rollback") end end def codegen(args) do # TODO: take args that we care about Mix.Task.run("ash_sqlite.generate_migrations", args) end def setup(args) do # TODO: take args that we care about Mix.Task.run("ash_sqlite.create", args) Mix.Task.run("ash_sqlite.migrate", args) end def tear_down(args) do # TODO: take args that we care about Mix.Task.run("ash_sqlite.drop", args) end import Ecto.Query, only: [from: 2, subquery: 1] @impl true def can?(_, :async_engine), do: false def can?(_, :bulk_create), do: true def can?(_, {:lock, _}), do: false def can?(_, :transact), do: false def can?(_, :composite_primary_key), do: true def can?(_, {:atomic, :update}), do: true def can?(_, {:atomic, :upsert}), do: true def can?(_, :upsert), do: true def can?(_, :changeset_filter), do: true def can?(resource, {:join, other_resource}) do data_layer = Ash.DataLayer.data_layer(resource) other_data_layer = Ash.DataLayer.data_layer(other_resource) data_layer == other_data_layer and AshSqlite.DataLayer.Info.repo(resource) == AshSqlite.DataLayer.Info.repo(other_resource) end def can?(_resource, {:lateral_join, _}) do false end def can?(_, :boolean_filter), do: true def can?(_, {:aggregate, _type}), do: false def can?(_, :aggregate_filter), do: false def can?(_, :aggregate_sort), do: false def can?(_, :expression_calculation), do: true def can?(_, :expression_calculation_sort), do: true def can?(_, :create), do: true def can?(_, :select), do: true def can?(_, :read), do: true def can?(resource, action) when action in ~w[update destroy]a do resource |> Ash.Resource.Info.primary_key() |> Enum.any?() end def can?(_, :filter), do: true def can?(_, :limit), do: true def can?(_, :offset), do: true def can?(_, :multitenancy), do: false def can?(_, {:filter_relationship, %{manual: {module, _}}}) do Spark.implements_behaviour?(module, AshSqlite.ManualRelationship) end def can?(_, {:filter_relationship, _}), do: true def can?(_, {:aggregate_relationship, _}), do: false def can?(_, :timeout), do: true def can?(_, {:filter_expr, %Ash.Query.Function.StringJoin{}}), do: false def can?(_, {:filter_expr, _}), do: true def can?(_, :nested_expressions), do: true def can?(_, {:query_aggregate, _}), do: true def can?(_, :sort), do: true def can?(_, :distinct_sort), do: false def can?(_, :distinct), do: false def can?(_, {:sort, _}), do: true def can?(_, _), do: false @impl true def limit(query, nil, _), do: {:ok, query} def limit(query, limit, _resource) do {:ok, from(row in query, limit: ^limit)} end @impl true def source(resource) do AshSqlite.DataLayer.Info.table(resource) || "" end @impl true def set_context(resource, data_layer_query, context) do start_bindings = context[:data_layer][:start_bindings_at] || 0 data_layer_query = from(row in data_layer_query, as: ^start_bindings) data_layer_query = if context[:data_layer][:table] do %{ data_layer_query | from: %{data_layer_query.from | source: {context[:data_layer][:table], resource}} } else data_layer_query end {:ok, AshSql.Bindings.default_bindings( data_layer_query, resource, AshSqlite.SqlImplementation, context )} end @impl true def offset(query, nil, _), do: query def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do {:ok, query} end def offset(query, offset, _resource) do {:ok, from(row in query, offset: ^offset)} end @impl true def run_aggregate_query(query, aggregates, resource) do {exists, aggregates} = Enum.split_with(aggregates, &(&1.kind == :exists)) query = AshSql.Bindings.default_bindings(query, resource, AshSqlite.SqlImplementation) query = if query.limit do query = query |> Ecto.Query.exclude(:select) |> Ecto.Query.exclude(:order_by) |> Map.put(:windows, []) from(row in subquery(query), as: ^0, select: %{}) else query |> Ecto.Query.exclude(:select) |> Ecto.Query.exclude(:order_by) |> Map.put(:windows, []) |> Ecto.Query.select(%{}) end query_before_select = query query = Enum.reduce( aggregates, query, fn agg, query -> AshSql.Aggregate.add_subquery_aggregate_select( query, agg.relationship_path |> Enum.drop(1), agg, resource, true, Ash.Resource.Info.relationship(resource, agg.relationship_path |> Enum.at(1)) ) end ) result = case aggregates do [] -> %{} _ -> dynamic_repo(resource, query).one(query, repo_opts(nil, nil, resource)) end {:ok, add_exists_aggs(result, resource, query_before_select, exists)} end defp add_exists_aggs(result, resource, query, exists) do repo = dynamic_repo(resource, query) repo_opts = repo_opts(nil, nil, resource) Enum.reduce(exists, result, fn agg, result -> {:ok, filtered} = case agg do %{query: %{filter: filter}} when not is_nil(filter) -> filter(query, filter, resource) _ -> {:ok, query} end Map.put( result || %{}, agg.name, repo.exists?(filtered, repo_opts) ) end) end @impl true def run_query(query, resource) do with_sort_applied = if query.__ash_bindings__[:sort_applied?] do {:ok, query} else AshSql.Sort.apply_sort(query, query.__ash_bindings__[:sort], resource) end case with_sort_applied do {:error, error} -> {:error, error} {:ok, query} -> query = if query.__ash_bindings__[:__order__?] && query.windows[:order] do order_by = %{query.windows[:order] | expr: query.windows[:order].expr[:order_by]} %{ query | windows: Keyword.delete(query.windows, :order), order_bys: [order_by] } else %{query | windows: Keyword.delete(query.windows, :order)} end if AshSqlite.DataLayer.Info.polymorphic?(resource) && no_table?(query) do raise_table_error!(resource, :read) else primary_key = Ash.Resource.Info.primary_key(resource) {:ok, dynamic_repo(resource, query).all(query, repo_opts(nil, nil, resource)) |> Enum.uniq_by(&Map.take(&1, primary_key))} end end rescue e -> handle_raised_error(e, __STACKTRACE__, query, resource) end defp no_table?(%{from: %{source: {"", _}}}), do: true defp no_table?(_), do: false defp repo_opts(timeout, nil, _resource) do [] |> add_timeout(timeout) end defp repo_opts(timeout, _resource) do add_timeout([], timeout) end defp add_timeout(opts, timeout) when not is_nil(timeout) do Keyword.put(opts, :timeout, timeout) end defp add_timeout(opts, _), do: opts @impl true def functions(_resource) do [ AshSqlite.Functions.Like, AshSqlite.Functions.ILike ] end @impl true def resource_to_query(resource, _) do from(row in {AshSqlite.DataLayer.Info.table(resource) || "", resource}, []) end @impl true def bulk_create(resource, stream, options) do # Cell-wise default values are not supported on INSERT statements by SQLite # This requires that we group changesets by what attributes are changing # And *omit* any defaults instead of using something like `(1, 2, DEFAULT)` # like we could with postgres stream |> Enum.group_by(&Map.keys(&1.attributes)) |> Enum.reduce_while({:ok, []}, fn {_, changesets}, {:ok, acc} -> opts = repo_opts(nil, options[:tenant], resource) opts = if options.return_records? do Keyword.put(opts, :returning, true) else opts end opts = if options[:upsert?] do # Ash groups changesets by atomics before dispatching them to the data layer # this means that all changesets have the same atomics %{atomics: atomics, filter: filter} = Enum.at(changesets, 0) query = from(row in resource, as: ^0) query = query |> AshSql.Bindings.default_bindings(resource, AshSqlite.SqlImplementation) upsert_set = upsert_set(resource, changesets, options) on_conflict = case AshSql.Atomics.query_with_atomics( resource, query, filter, atomics, %{}, upsert_set ) do :empty -> :nothing {:ok, query} -> query {:error, error} -> raise Ash.Error.to_ash_error(error) end opts |> Keyword.put(:on_conflict, on_conflict) |> Keyword.put( :conflict_target, conflict_target( resource, options[:upsert_keys] || Ash.Resource.Info.primary_key(resource) ) ) else opts end ecto_changesets = changesets |> Enum.map(& &1.attributes) source = if table = Enum.at(changesets, 0).context[:data_layer][:table] do {table, resource} else resource end repo = dynamic_repo(resource, Enum.at(changesets, 0)) source |> repo.insert_all(ecto_changesets, opts) |> case do {_, nil} -> {:cont, {:ok, acc}} {_, results} -> if options[:single?] do {:cont, {:ok, acc ++ results}} else {:cont, {:ok, acc ++ Enum.zip_with(results, changesets, fn result, changeset -> Ash.Resource.put_metadata( result, :bulk_create_index, changeset.context.bulk_create.index ) end)}} end end end) |> case do {:ok, records} -> if options[:return_records?] do {:ok, records} else :ok end {:error, error} -> {:error, error} end rescue e -> changeset = Ash.Changeset.new(resource) handle_raised_error( e, __STACKTRACE__, {:bulk_create, ecto_changeset(changeset.data, changeset, :create, false)}, resource ) end defp upsert_set(resource, changesets, options) do attributes_changing_anywhere = changesets |> Enum.flat_map(&Map.keys(&1.attributes)) |> Enum.uniq() update_defaults = update_defaults(resource) # We can't reference EXCLUDED if at least one of the changesets in the stream is not # changing the value (and we wouldn't want to even if we could as it would be unnecessary) upsert_fields = (options[:upsert_fields] || []) |> Enum.filter(&(&1 in attributes_changing_anywhere)) fields_to_upsert = (upsert_fields ++ Keyword.keys(update_defaults)) -- Keyword.keys(Enum.at(changesets, 0).atomics) Enum.map(fields_to_upsert, fn upsert_field -> # for safety, we check once more at the end that all values in # upsert_fields are names of attributes. This is because # below we use `literal/1` to bring them into the query if is_nil(resource.__schema__(:type, upsert_field)) do raise "Only attribute names can be used in upsert_fields" end case Keyword.fetch(update_defaults, upsert_field) do {:ok, default} -> if upsert_field in upsert_fields do {upsert_field, Ecto.Query.dynamic( [], fragment( "COALESCE(EXCLUDED.?, ?)", literal(^to_string(upsert_field)), ^default ) )} else {upsert_field, default} end :error -> {upsert_field, Ecto.Query.dynamic( [], fragment("EXCLUDED.?", literal(^to_string(upsert_field))) )} end end) end @impl true def create(resource, changeset) do changeset = %{ changeset | data: Map.update!( changeset.data, :__meta__, &Map.put(&1, :source, table(resource, changeset)) ) } case bulk_create(resource, [changeset], %{ single?: true, tenant: changeset.tenant, return_records?: true }) do {:ok, [result]} -> {:ok, result} {:error, error} -> {:error, error} end end defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do {:error, Enum.map(errors, &to_ash_error/1)} end defp to_ash_error({field, {message, vars}}) do Ash.Error.Changes.InvalidAttribute.exception( field: field, message: message, private_vars: vars ) end defp ecto_changeset(record, changeset, type, table_error? \\ true) do filters = if changeset.action_type == :create do %{} else Map.get(changeset, :filters, %{}) end filters = if changeset.action_type == :create do filters else changeset.resource |> Ash.Resource.Info.primary_key() |> Enum.reduce(filters, fn key, filters -> Map.put(filters, key, Map.get(record, key)) end) end attributes = changeset.resource |> Ash.Resource.Info.attributes() |> Enum.map(& &1.name) attributes_to_change = Enum.reject(attributes, fn attribute -> Keyword.has_key?(changeset.atomics, attribute) end) ecto_changeset = record |> to_ecto() |> set_table(changeset, type, table_error?) |> Ecto.Changeset.change(Map.take(changeset.attributes, attributes_to_change)) |> Map.update!(:filters, &Map.merge(&1, filters)) |> add_configured_foreign_key_constraints(record.__struct__) |> add_unique_indexes(record.__struct__, changeset) |> add_exclusion_constraints(record.__struct__) case type do :create -> ecto_changeset |> add_my_foreign_key_constraints(record.__struct__) type when type in [:upsert, :update] -> ecto_changeset |> add_my_foreign_key_constraints(record.__struct__) |> add_related_foreign_key_constraints(record.__struct__) :delete -> ecto_changeset |> add_related_foreign_key_constraints(record.__struct__) end end defp handle_raised_error( %Ecto.StaleEntryError{changeset: %{data: %resource{}, filters: filters}}, stacktrace, context, resource ) do handle_raised_error( Ash.Error.Changes.StaleRecord.exception(resource: resource, filters: filters), stacktrace, context, resource ) end defp handle_raised_error(%Ecto.Query.CastError{} = e, stacktrace, context, resource) do handle_raised_error( Ash.Error.Query.InvalidFilterValue.exception(value: e.value, context: context), stacktrace, context, resource ) end defp handle_raised_error( %Exqlite.Error{ message: "FOREIGN KEY constraint failed" }, stacktrace, context, resource ) do handle_raised_error( Ash.Error.Changes.InvalidChanges.exception( fields: Ash.Resource.Info.primary_key(resource), message: "referenced something that does not exist" ), stacktrace, context, resource ) end defp handle_raised_error( %Exqlite.Error{ message: "UNIQUE constraint failed: " <> fields }, _stacktrace, _context, resource ) do names = fields |> String.split(", ") |> Enum.map(fn field -> field |> String.split(".", trim: true) |> Enum.drop(1) |> Enum.at(0) end) |> Enum.map(fn field -> Ash.Resource.Info.attribute(resource, field) end) |> Enum.reject(&is_nil/1) |> Enum.map(fn %{name: name} -> name end) message = find_constraint_message(resource, names) {:error, names |> Enum.map(fn name -> Ash.Error.Changes.InvalidAttribute.exception( field: name, message: message ) end)} end defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do {:error, Ash.Error.to_ash_error(error, stacktrace)} end defp find_constraint_message(resource, names) do find_custom_index_message(resource, names) || find_identity_message(resource, names) || "has already been taken" end defp find_custom_index_message(resource, names) do resource |> AshSqlite.DataLayer.Info.custom_indexes() |> Enum.find(fn %{fields: fields} -> fields |> Enum.map(&to_string/1) |> Enum.sort() == names |> Enum.map(&to_string/1) |> Enum.sort() end) |> case do %{message: message} when is_binary(message) -> message _ -> nil end end defp find_identity_message(resource, names) do resource |> Ash.Resource.Info.identities() |> Enum.find(fn %{keys: fields} -> fields |> Enum.map(&to_string/1) |> Enum.sort() == names |> Enum.map(&to_string/1) |> Enum.sort() end) |> case do %{message: message} when is_binary(message) -> message _ -> nil end end defp set_table(record, changeset, operation, table_error?) do if AshSqlite.DataLayer.Info.polymorphic?(record.__struct__) do table = changeset.context[:data_layer][:table] || AshSqlite.DataLayer.Info.table(record.__struct__) if table do Ecto.put_meta(record, source: table) else if table_error? do raise_table_error!(changeset.resource, operation) else record end end else record end end def from_ecto({:ok, result}), do: {:ok, from_ecto(result)} def from_ecto({:error, _} = other), do: other def from_ecto(nil), do: nil def from_ecto(value) when is_list(value) do Enum.map(value, &from_ecto/1) end def from_ecto(%resource{} = record) do if Spark.Dsl.is?(resource, Ash.Resource) do empty = struct(resource) resource |> Ash.Resource.Info.relationships() |> Enum.reduce(record, fn relationship, record -> case Map.get(record, relationship.name) do %Ecto.Association.NotLoaded{} -> Map.put(record, relationship.name, Map.get(empty, relationship.name)) value -> Map.put(record, relationship.name, from_ecto(value)) end end) else record end end def from_ecto(other), do: other def to_ecto(nil), do: nil def to_ecto(value) when is_list(value) do Enum.map(value, &to_ecto/1) end def to_ecto(%resource{} = record) do if Spark.Dsl.is?(resource, Ash.Resource) do resource |> Ash.Resource.Info.relationships() |> Enum.reduce(record, fn relationship, record -> value = case Map.get(record, relationship.name) do %Ash.NotLoaded{} -> %Ecto.Association.NotLoaded{ __field__: relationship.name, __cardinality__: relationship.cardinality } value -> to_ecto(value) end Map.put(record, relationship.name, value) end) else record end end def to_ecto(other), do: other defp add_exclusion_constraints(changeset, resource) do resource |> AshSqlite.DataLayer.Info.exclusion_constraint_names() |> Enum.reduce(changeset, fn constraint, changeset -> case constraint do {key, name} -> Ecto.Changeset.exclusion_constraint(changeset, key, name: name) {key, name, message} -> Ecto.Changeset.exclusion_constraint(changeset, key, name: name, message: message) end end) end defp add_related_foreign_key_constraints(changeset, resource) do # TODO: this doesn't guarantee us to get all of them, because if something is related to this # schema and there is no back-relation, then this won't catch it's foreign key constraints resource |> Ash.Resource.Info.relationships() |> Enum.map(& &1.destination) |> Enum.uniq() |> Enum.flat_map(fn related -> related |> Ash.Resource.Info.relationships() |> Enum.filter(&(&1.destination == resource)) |> Enum.map(&Map.take(&1, [:source, :source_attribute, :destination_attribute, :name])) end) |> Enum.reduce(changeset, fn %{ source: source, source_attribute: source_attribute, destination_attribute: destination_attribute, name: relationship_name }, changeset -> case AshSqlite.DataLayer.Info.reference(resource, relationship_name) do %{name: name} when not is_nil(name) -> Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute, name: name, message: "would leave records behind" ) _ -> Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute, name: "#{AshSqlite.DataLayer.Info.table(source)}_#{source_attribute}_fkey", message: "would leave records behind" ) end end) end defp add_my_foreign_key_constraints(changeset, resource) do resource |> Ash.Resource.Info.relationships() |> Enum.reduce(changeset, &Ecto.Changeset.foreign_key_constraint(&2, &1.source_attribute)) end defp add_configured_foreign_key_constraints(changeset, resource) do resource |> AshSqlite.DataLayer.Info.foreign_key_names() |> case do {m, f, a} -> List.wrap(apply(m, f, [changeset | a])) value -> List.wrap(value) end |> Enum.reduce(changeset, fn {key, name}, changeset -> Ecto.Changeset.foreign_key_constraint(changeset, key, name: name) {key, name, message}, changeset -> Ecto.Changeset.foreign_key_constraint(changeset, key, name: name, message: message) end) end defp add_unique_indexes(changeset, resource, ash_changeset) do changeset = resource |> Ash.Resource.Info.identities() |> Enum.reduce(changeset, fn identity, changeset -> name = AshSqlite.DataLayer.Info.identity_index_names(resource)[identity.name] || "#{table(resource, ash_changeset)}_#{identity.name}_index" opts = if Map.get(identity, :message) do [name: name, message: identity.message] else [name: name] end Ecto.Changeset.unique_constraint(changeset, identity.keys, opts) end) changeset = resource |> AshSqlite.DataLayer.Info.custom_indexes() |> Enum.reduce(changeset, fn index, changeset -> opts = if index.message do [name: index.name, message: index.message] else [name: index.name] end Ecto.Changeset.unique_constraint(changeset, index.fields, opts) end) names = resource |> AshSqlite.DataLayer.Info.unique_index_names() |> case do {m, f, a} -> List.wrap(apply(m, f, [changeset | a])) value -> List.wrap(value) end names = case Ash.Resource.Info.primary_key(resource) do [] -> names fields -> if table = table(resource, ash_changeset) do [{fields, table <> "_pkey"} | names] else [] end end Enum.reduce(names, changeset, fn {keys, name}, changeset -> Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name) {keys, name, message}, changeset -> Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name, message: message) end) end @impl true def upsert(resource, changeset, keys \\ nil) do keys = keys || Ash.Resource.Info.primary_key(keys) explicitly_changing_attributes = Map.keys(changeset.attributes) -- Map.get(changeset, :defaults, []) -- keys upsert_fields = changeset.context[:private][:upsert_fields] || explicitly_changing_attributes case bulk_create(resource, [changeset], %{ single?: true, upsert?: true, tenant: changeset.tenant, upsert_keys: keys, upsert_fields: upsert_fields, return_records?: true }) do {:ok, [result]} -> {:ok, result} {:error, error} -> {:error, error} end end defp conflict_target(resource, keys) do if Ash.Resource.Info.base_filter(resource) do base_filter_sql = AshSqlite.DataLayer.Info.base_filter_sql(resource) || raise """ Cannot use upserts with resources that have a base_filter without also adding `base_filter_sql` in the sqlite section. """ sources = Enum.map(keys, fn key -> ~s("#{Ash.Resource.Info.attribute(resource, key).source || key}") end) {:unsafe_fragment, "(" <> Enum.join(sources, ", ") <> ") WHERE (#{base_filter_sql})"} else keys end end defp update_defaults(resource) do attributes = resource |> Ash.Resource.Info.attributes() |> Enum.reject(&is_nil(&1.update_default)) attributes |> static_defaults() |> Enum.concat(lazy_matching_defaults(attributes)) |> Enum.concat(lazy_non_matching_defaults(attributes)) end defp static_defaults(attributes) do attributes |> Enum.reject(&get_default_fun(&1)) |> Enum.map(&{&1.name, &1.update_default}) end defp lazy_non_matching_defaults(attributes) do attributes |> Enum.filter(&(!&1.match_other_defaults? && get_default_fun(&1))) |> Enum.map(fn attribute -> default_value = case attribute.update_default do function when is_function(function) -> function.() {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> apply(m, f, a) end {attribute.name, default_value} end) end defp lazy_matching_defaults(attributes) do attributes |> Enum.filter(&(&1.match_other_defaults? && get_default_fun(&1))) |> Enum.group_by(& &1.update_default) |> Enum.flat_map(fn {default_fun, attributes} -> default_value = case default_fun do function when is_function(function) -> function.() {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> apply(m, f, a) end Enum.map(attributes, &{&1.name, default_value}) end) end defp get_default_fun(attribute) do if is_function(attribute.update_default) or match?({_, _, _}, attribute.update_default) do attribute.update_default end end @impl true def update(resource, changeset) do ecto_changeset = changeset.data |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset))) |> ecto_changeset(changeset, :update) try do query = from(row in resource, as: ^0) select = Keyword.keys(changeset.atomics) ++ Ash.Resource.Info.primary_key(resource) query = query |> AshSql.Bindings.default_bindings( resource, AshSqlite.SqlImplementation, changeset.context ) |> Ecto.Query.select(^select) |> pkey_filter(changeset.data) case AshSql.Atomics.query_with_atomics( resource, query, changeset.filter, changeset.atomics, ecto_changeset.changes, [] ) do :empty -> {:ok, changeset.data} {:ok, query} -> repo_opts = repo_opts(changeset.timeout, changeset.tenant, changeset.resource) repo_opts = Keyword.put(repo_opts, :returning, Keyword.keys(changeset.atomics)) result = dynamic_repo(resource, changeset).update_all( query, [], repo_opts ) case result do {0, []} -> {:error, Ash.Error.Changes.StaleRecord.exception( resource: resource, filters: changeset.filter )} {1, [result]} -> record = changeset.data |> Map.merge(changeset.attributes) |> Map.merge(Map.take(result, Keyword.keys(changeset.atomics))) {:ok, record} end {:error, error} -> {:error, error} end rescue e -> handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end end defp pkey_filter(query, %resource{} = record) do pkey = record |> Map.take(Ash.Resource.Info.primary_key(resource)) |> Map.to_list() Ecto.Query.where(query, ^pkey) end @impl true def destroy(resource, %{data: record} = changeset) do ecto_changeset = ecto_changeset(record, changeset, :delete) try do ecto_changeset |> dynamic_repo(resource, changeset).delete( repo_opts(changeset.timeout, changeset.resource) ) |> from_ecto() |> case do {:ok, _record} -> :ok {:error, error} -> handle_errors({:error, error}) end rescue e -> handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource) end end @impl true def sort(query, sort, _resource) do {:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :sort, sort))} end @impl true def select(query, select, resource) do query = AshSql.Bindings.default_bindings(query, resource, AshSqlite.SqlImplementation) {:ok, from(row in query, select: struct(row, ^Enum.uniq(select)) )} end @doc false def unwrap_one([thing]), do: thing def unwrap_one([]), do: nil def unwrap_one(other), do: other @impl true def filter(query, filter, _resource, opts \\ []) do query |> AshSql.Join.join_all_relationships(filter, opts) |> case do {:ok, query} -> {:ok, AshSql.Filter.add_filter_expression(query, filter)} {:error, error} -> {:error, error} end end @impl true def add_calculations(query, calculations, resource) do AshSql.Calculation.add_calculations(query, calculations, resource, 0, true) end @doc false def get_binding(resource, path, query, type, name_match \\ nil) def get_binding(resource, path, %{__ash_bindings__: _} = query, type, name_match) do types = List.wrap(type) Enum.find_value(query.__ash_bindings__.bindings, fn {binding, %{path: candidate_path, type: binding_type} = data} -> if binding_type in types do if name_match do if data[:name] == name_match do if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do binding end end else if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do binding else false end end end _ -> nil end) end def get_binding(_, _, _, _, _), do: nil @doc false def add_binding(query, data, additional_bindings \\ 0) do current = query.__ash_bindings__.current bindings = query.__ash_bindings__.bindings new_ash_bindings = %{ query.__ash_bindings__ | bindings: Map.put(bindings, current, data), current: current + 1 + additional_bindings } %{query | __ash_bindings__: new_ash_bindings} end def add_known_binding(query, data, known_binding) do bindings = query.__ash_bindings__.bindings new_ash_bindings = %{ query.__ash_bindings__ | bindings: Map.put(bindings, known_binding, data) } %{query | __ash_bindings__: new_ash_bindings} end @impl true def rollback(resource, term) do AshSqlite.DataLayer.Info.repo(resource).rollback(term) end defp table(resource, changeset) do changeset.context[:data_layer][:table] || AshSqlite.DataLayer.Info.table(resource) end defp raise_table_error!(resource, operation) do if AshSqlite.DataLayer.Info.polymorphic?(resource) do raise """ Could not determine table for #{operation} on #{inspect(resource)}. Polymorphic resources require that the `data_layer[:table]` context is provided. See the guide on polymorphic resources for more information. """ else raise """ Could not determine table for #{operation} on #{inspect(resource)}. """ end end defp dynamic_repo(resource, %{__ash_bindings__: %{context: %{data_layer: %{repo: repo}}}}) do repo || AshSqlite.DataLayer.Info.repo(resource) end defp dynamic_repo(resource, %{context: %{data_layer: %{repo: repo}}}) do repo || AshSqlite.DataLayer.Info.repo(resource) end defp dynamic_repo(resource, _) do AshSqlite.DataLayer.Info.repo(resource) end end