ash_sqlite/lib/data_layer.ex

1477 lines
42 KiB
Elixir

defmodule AshSqlite.DataLayer do
@index %Spark.Dsl.Entity{
name: :index,
describe: """
Add an index to be managed by the migration generator.
""",
examples: [
"index [\"column\", \"column2\"], unique: true, where: \"thing = TRUE\""
],
target: AshSqlite.CustomIndex,
schema: AshSqlite.CustomIndex.schema(),
transform: {AshSqlite.CustomIndex, :transform, []},
args: [:fields]
}
@custom_indexes %Spark.Dsl.Section{
name: :custom_indexes,
describe: """
A section for configuring indexes to be created by the migration generator.
In general, prefer to use `identities` for simple unique constraints. This is a tool to allow
for declaring more complex indexes.
""",
examples: [
"""
custom_indexes do
index [:column1, :column2], unique: true, where: "thing = TRUE"
end
"""
],
entities: [
@index
]
}
@statement %Spark.Dsl.Entity{
name: :statement,
describe: """
Add a custom statement for migrations.
""",
examples: [
"""
statement :pgweb_idx do
up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
down "DROP INDEX pgweb_idx;"
end
"""
],
target: AshSqlite.Statement,
schema: AshSqlite.Statement.schema(),
args: [:name]
}
@custom_statements %Spark.Dsl.Section{
name: :custom_statements,
describe: """
A section for configuring custom statements to be added to migrations.
Changing custom statements may require manual intervention, because Ash can't determine what order they should run
in (i.e if they depend on table structure that you've added, or vice versa). As such, any `down` statements we run
for custom statements happen first, and any `up` statements happen last.
Additionally, when changing a custom statement, we must make some assumptions, i.e that we should migrate
the old structure down using the previously configured `down` and recreate it.
This may not be desired, and so what you may end up doing is simply modifying the old migration and deleting whatever was
generated by the migration generator. As always: read your migrations after generating them!
""",
examples: [
"""
custom_statements do
# the name is used to detect if you remove or modify the statement
statement :pgweb_idx do
up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
down "DROP INDEX pgweb_idx;"
end
end
"""
],
entities: [
@statement
]
}
@reference %Spark.Dsl.Entity{
name: :reference,
describe: """
Configures the reference for a relationship in resource migrations.
Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys.
In those cases, you only need to configure the `reference` behavior for one of them. Any conflicts will result
in an error, across this resource and any other resources that share a table with this one. For this reason,
instead of adding a reference configuration for `:nothing`, its best to just leave the configuration out, as that
is the default behavior if *no* relationship anywhere has configured the behavior of that reference.
""",
examples: [
"reference :post, on_delete: :delete, on_update: :update, name: \"comments_to_posts_fkey\""
],
args: [:relationship],
target: AshSqlite.Reference,
schema: AshSqlite.Reference.schema()
}
@references %Spark.Dsl.Section{
name: :references,
describe: """
A section for configuring the references (foreign keys) in resource migrations.
This section is only relevant if you are using the migration generator with this resource.
Otherwise, it has no effect.
""",
examples: [
"""
references do
reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
end
"""
],
entities: [@reference],
schema: [
polymorphic_on_delete: [
type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
],
polymorphic_on_update: [
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
],
polymorphic_name: [
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
]
]
}
@references %Spark.Dsl.Section{
name: :references,
describe: """
A section for configuring the references (foreign keys) in resource migrations.
This section is only relevant if you are using the migration generator with this resource.
Otherwise, it has no effect.
""",
examples: [
"""
references do
reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
end
"""
],
entities: [@reference],
schema: [
polymorphic_on_delete: [
type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
],
polymorphic_on_update: [
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
],
polymorphic_name: [
type: {:one_of, [:update, :nilify, :nothing, :restrict]},
doc:
"For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
]
]
}
@sqlite %Spark.Dsl.Section{
name: :sqlite,
describe: """
Sqlite data layer configuration
""",
sections: [
@custom_indexes,
@custom_statements,
@references
],
modules: [
:repo
],
examples: [
"""
sqlite do
repo MyApp.Repo
table "organizations"
end
"""
],
schema: [
repo: [
type: :atom,
required: true,
doc:
"The repo that will be used to fetch your data. See the `AshSqlite.Repo` documentation for more"
],
migrate?: [
type: :boolean,
default: true,
doc:
"Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`"
],
migration_types: [
type: :keyword_list,
default: [],
doc:
"A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults."
],
migration_defaults: [
type: :keyword_list,
default: [],
doc: """
A keyword list of attribute names to the ecto migration default that should be used for that attribute. The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\\\"now()\\\\")`, or for `nil`, use `\\\\"nil\\\\"`.
"""
],
base_filter_sql: [
type: :string,
doc:
"A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter"
],
skip_unique_indexes: [
type: {:wrap_list, :atom},
default: false,
doc: "Skip generating unique indexes when generating migrations"
],
unique_index_names: [
type:
{:list,
{:or,
[{:tuple, [{:list, :atom}, :string]}, {:tuple, [{:list, :atom}, :string, :string]}]}},
default: [],
doc: """
A list of unique index names that could raise errors that are not configured in identities, or an mfa to a function that takes a changeset and returns the list. In the format `{[:affected, :keys], "name_of_constraint"}` or `{[:affected, :keys], "name_of_constraint", "custom error message"}`
"""
],
exclusion_constraint_names: [
type: :any,
default: [],
doc: """
A list of exclusion constraint names that could raise errors. Must be in the format `{:affected_key, "name_of_constraint"}` or `{:affected_key, "name_of_constraint", "custom error message"}`
"""
],
identity_index_names: [
type: :any,
default: [],
doc: """
A keyword list of identity names to the unique index name that they should use when being managed by the migration generator.
"""
],
foreign_key_names: [
type: {:list, {:or, [{:tuple, [:atom, :string]}, {:tuple, [:string, :string]}]}},
default: [],
doc: """
A list of foreign keys that could raise errors, or an mfa to a function that takes a changeset and returns a list. In the format: `{:key, "name_of_constraint"}` or `{:key, "name_of_constraint", "custom error message"}`
"""
],
migration_ignore_attributes: [
type: {:list, :atom},
default: [],
doc: """
A list of attributes that will be ignored when generating migrations.
"""
],
table: [
type: :string,
doc: """
The table to store and read the resource from. If this is changed, the migration generator will not remove the old table.
"""
],
polymorphic?: [
type: :boolean,
default: false,
doc: """
Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/polymorphic_resources.md) for more.
"""
]
]
}
alias Ash.Filter
alias Ash.Query.{BooleanExpression, Not}
@behaviour Ash.DataLayer
@sections [@sqlite]
@moduledoc """
A sqlite data layer that leverages Ecto's sqlite capabilities.
"""
use Spark.Dsl.Extension,
sections: @sections,
transformers: [
AshSqlite.Transformers.ValidateReferences,
AshSqlite.Transformers.VerifyRepo,
AshSqlite.Transformers.EnsureTableOrPolymorphic
]
def migrate(args) do
# TODO: take args that we care about
Mix.Task.run("ash_sqlite.migrate", args)
end
def codegen(args) do
# TODO: take args that we care about
Mix.Task.run("ash_sqlite.generate_migrations", args)
end
def setup(args) do
# TODO: take args that we care about
Mix.Task.run("ash_sqlite.create", args)
Mix.Task.run("ash_sqlite.migrate", args)
end
def tear_down(args) do
# TODO: take args that we care about
Mix.Task.run("ash_sqlite.drop", args)
end
import Ecto.Query, only: [from: 2, subquery: 1]
@impl true
def can?(_, :async_engine), do: false
def can?(_, :bulk_create), do: true
def can?(_, {:lock, _}), do: false
def can?(_, :transact), do: false
def can?(_, :composite_primary_key), do: true
def can?(_, {:atomic, :update}), do: true
def can?(_, :upsert), do: true
def can?(_, :changeset_filter), do: true
def can?(resource, {:join, other_resource}) do
data_layer = Ash.DataLayer.data_layer(resource)
other_data_layer = Ash.DataLayer.data_layer(other_resource)
data_layer == other_data_layer and
AshSqlite.DataLayer.Info.repo(resource) == AshSqlite.DataLayer.Info.repo(other_resource)
end
def can?(_resource, {:lateral_join, _}) do
false
end
def can?(_, :boolean_filter), do: true
def can?(_, {:aggregate, _type}), do: false
def can?(_, :aggregate_filter), do: false
def can?(_, :aggregate_sort), do: false
def can?(_, :expression_calculation), do: true
def can?(_, :expression_calculation_sort), do: true
def can?(_, :create), do: true
def can?(_, :select), do: true
def can?(_, :read), do: true
def can?(resource, action) when action in ~w[update destroy]a do
resource
|> Ash.Resource.Info.primary_key()
|> Enum.any?()
end
def can?(_, :filter), do: true
def can?(_, :limit), do: true
def can?(_, :offset), do: true
def can?(_, :multitenancy), do: false
def can?(_, {:filter_relationship, %{manual: {module, _}}}) do
Spark.implements_behaviour?(module, AshSqlite.ManualRelationship)
end
def can?(_, {:filter_relationship, _}), do: true
def can?(_, {:aggregate_relationship, _}), do: false
def can?(_, :timeout), do: true
def can?(_, {:filter_expr, _}), do: true
def can?(_, :nested_expressions), do: true
def can?(_, {:query_aggregate, :count}), do: false
def can?(_, :sort), do: true
def can?(_, :distinct_sort), do: false
def can?(_, :distinct), do: false
def can?(_, {:sort, _}), do: true
def can?(_, _), do: false
@impl true
def limit(query, nil, _), do: {:ok, query}
def limit(query, limit, _resource) do
{:ok, from(row in query, limit: ^limit)}
end
@impl true
def source(resource) do
AshSqlite.DataLayer.Info.table(resource) || ""
end
@impl true
def set_context(resource, data_layer_query, context) do
start_bindings = context[:data_layer][:start_bindings_at] || 0
data_layer_query = from(row in data_layer_query, as: ^start_bindings)
data_layer_query =
if context[:data_layer][:table] do
%{
data_layer_query
| from: %{data_layer_query.from | source: {context[:data_layer][:table], resource}}
}
else
data_layer_query
end
{:ok, default_bindings(data_layer_query, resource, context)}
end
@impl true
def offset(query, nil, _), do: query
def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do
{:ok, query}
end
def offset(query, offset, _resource) do
{:ok, from(row in query, offset: ^offset)}
end
@impl true
def run_query(query, resource) do
query = default_bindings(query, resource)
with_sort_applied =
if query.__ash_bindings__[:sort_applied?] do
{:ok, query}
else
apply_sort(query, query.__ash_bindings__[:sort], resource)
end
case with_sort_applied do
{:error, error} ->
{:error, error}
{:ok, query} ->
query =
if query.__ash_bindings__[:__order__?] && query.windows[:order] do
order_by = %{query.windows[:order] | expr: query.windows[:order].expr[:order_by]}
%{
query
| windows: Keyword.delete(query.windows, :order),
order_bys: [order_by]
}
else
%{query | windows: Keyword.delete(query.windows, :order)}
end
if AshSqlite.DataLayer.Info.polymorphic?(resource) && no_table?(query) do
raise_table_error!(resource, :read)
else
primary_key = Ash.Resource.Info.primary_key(resource)
{:ok,
dynamic_repo(resource, query).all(query, repo_opts(nil, nil, resource))
|> Enum.uniq_by(&Map.take(&1, primary_key))}
end
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, query, resource)
end
defp no_table?(%{from: %{source: {"", _}}}), do: true
defp no_table?(_), do: false
defp repo_opts(timeout, nil, _resource) do
[]
|> add_timeout(timeout)
end
defp repo_opts(timeout, _resource) do
add_timeout([], timeout)
end
defp add_timeout(opts, timeout) when not is_nil(timeout) do
Keyword.put(opts, :timeout, timeout)
end
defp add_timeout(opts, _), do: opts
@impl true
def functions(_resource) do
[
AshSqlite.Functions.Fragment,
AshSqlite.Functions.Like
]
end
@impl true
def resource_to_query(resource, _) do
from(row in {AshSqlite.DataLayer.Info.table(resource) || "", resource}, [])
end
@impl true
def bulk_create(resource, stream, options) do
opts = repo_opts(nil, resource)
opts =
if options.return_records? do
Keyword.put(opts, :returning, true)
else
opts
end
opts =
if options[:upsert?] do
opts
|> Keyword.put(:on_conflict, {:replace, options[:upsert_fields] || []})
|> Keyword.put(
:conflict_target,
conflict_target(
resource,
options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)
)
)
else
opts
end
changesets = Enum.to_list(stream)
ecto_changesets = Enum.map(changesets, & &1.attributes)
source =
if table = Enum.at(changesets, 0).context[:data_layer][:table] do
{table, resource}
else
resource
end
repo = dynamic_repo(resource, Enum.at(changesets, 0))
source
|> repo.insert_all(ecto_changesets, opts)
|> case do
{_, nil} ->
:ok
{_, results} ->
if options[:single?] do
{:ok, results}
else
{:ok,
Stream.zip_with(results, changesets, fn result, changeset ->
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)
end)}
end
end
rescue
e ->
changeset = Ash.Changeset.new(resource)
handle_raised_error(
e,
__STACKTRACE__,
{:bulk_create, ecto_changeset(changeset.data, changeset, :create, false)},
resource
)
end
@impl true
def create(resource, changeset) do
changeset = %{
changeset
| data:
Map.update!(
changeset.data,
:__meta__,
&Map.put(&1, :source, table(resource, changeset))
)
}
case bulk_create(resource, [changeset], %{
single?: true,
return_records?: true
}) do
{:ok, [result]} ->
{:ok, result}
{:error, error} ->
{:error, error}
end
end
defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
{:error, Enum.map(errors, &to_ash_error/1)}
end
defp to_ash_error({field, {message, vars}}) do
Ash.Error.Changes.InvalidAttribute.exception(
field: field,
message: message,
private_vars: vars
)
end
defp ecto_changeset(record, changeset, type, table_error? \\ true) do
filters =
if changeset.action_type == :create do
%{}
else
Map.get(changeset, :filters, %{})
end
filters =
if changeset.action_type == :create do
filters
else
changeset.resource
|> Ash.Resource.Info.primary_key()
|> Enum.reduce(filters, fn key, filters ->
Map.put(filters, key, Map.get(record, key))
end)
end
attributes =
changeset.resource
|> Ash.Resource.Info.attributes()
|> Enum.map(& &1.name)
attributes_to_change =
Enum.reject(attributes, fn attribute ->
Keyword.has_key?(changeset.atomics, attribute)
end)
ecto_changeset =
record
|> to_ecto()
|> set_table(changeset, type, table_error?)
|> Ecto.Changeset.change(Map.take(changeset.attributes, attributes_to_change))
|> Map.update!(:filters, &Map.merge(&1, filters))
|> add_configured_foreign_key_constraints(record.__struct__)
|> add_unique_indexes(record.__struct__, changeset)
|> add_exclusion_constraints(record.__struct__)
case type do
:create ->
ecto_changeset
|> add_my_foreign_key_constraints(record.__struct__)
type when type in [:upsert, :update] ->
ecto_changeset
|> add_my_foreign_key_constraints(record.__struct__)
|> add_related_foreign_key_constraints(record.__struct__)
:delete ->
ecto_changeset
|> add_related_foreign_key_constraints(record.__struct__)
end
end
defp handle_raised_error(
%Ecto.StaleEntryError{changeset: %{data: %resource{}, filters: filters}},
stacktrace,
context,
resource
) do
handle_raised_error(
Ash.Error.Changes.StaleRecord.exception(resource: resource, filters: filters),
stacktrace,
context,
resource
)
end
defp handle_raised_error(%Ecto.Query.CastError{} = e, stacktrace, context, resource) do
handle_raised_error(
Ash.Error.Query.InvalidFilterValue.exception(value: e.value, context: context),
stacktrace,
context,
resource
)
end
defp handle_raised_error(
%Exqlite.Error{
message: "FOREIGN KEY constraint failed"
},
stacktrace,
context,
resource
) do
handle_raised_error(
Ash.Error.Changes.InvalidChanges.exception(
fields: Ash.Resource.Info.primary_key(resource),
message: "referenced something that does not exist"
),
stacktrace,
context,
resource
)
end
defp handle_raised_error(
%Exqlite.Error{
message: "UNIQUE constraint failed: " <> fields
},
stacktrace,
context,
resource
) do
names =
fields
|> String.split(", ")
|> Enum.map(fn field ->
field |> String.split(".", trim: true) |> Enum.drop(1) |> Enum.at(0)
end)
|> Enum.map(fn field ->
Ash.Resource.Info.attribute(resource, field)
end)
|> Enum.reject(&is_nil/1)
|> Enum.map(fn %{name: name} ->
name
end)
message = find_constraint_message(resource, names)
names
|> Enum.map(fn name ->
Ash.Error.Changes.InvalidAttribute.exception(
field: name,
message: message
)
end)
|> handle_raised_error(
stacktrace,
context,
resource
)
end
defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do
{:error, Ash.Error.to_ash_error(error, stacktrace)}
end
defp find_constraint_message(resource, names) do
find_custom_index_message(resource, names) || find_identity_message(resource, names) ||
"has already been taken"
end
defp find_custom_index_message(resource, names) do
resource
|> AshSqlite.DataLayer.Info.custom_indexes()
|> Enum.find(fn %{fields: fields} ->
fields |> Enum.map(&to_string/1) |> Enum.sort() ==
names |> Enum.map(&to_string/1) |> Enum.sort()
end)
|> case do
%{message: message} when is_binary(message) -> message
_ -> nil
end
end
defp find_identity_message(resource, names) do
resource
|> Ash.Resource.Info.identities()
|> Enum.find(fn %{keys: fields} ->
fields |> Enum.map(&to_string/1) |> Enum.sort() ==
names |> Enum.map(&to_string/1) |> Enum.sort()
end)
|> case do
%{message: message} when is_binary(message) ->
message
_ ->
nil
end
end
defp set_table(record, changeset, operation, table_error?) do
if AshSqlite.DataLayer.Info.polymorphic?(record.__struct__) do
table =
changeset.context[:data_layer][:table] ||
AshSqlite.DataLayer.Info.table(record.__struct__)
if table do
Ecto.put_meta(record, source: table)
else
if table_error? do
raise_table_error!(changeset.resource, operation)
else
record
end
end
else
record
end
end
def from_ecto({:ok, result}), do: {:ok, from_ecto(result)}
def from_ecto({:error, _} = other), do: other
def from_ecto(nil), do: nil
def from_ecto(value) when is_list(value) do
Enum.map(value, &from_ecto/1)
end
def from_ecto(%resource{} = record) do
if Spark.Dsl.is?(resource, Ash.Resource) do
empty = struct(resource)
resource
|> Ash.Resource.Info.relationships()
|> Enum.reduce(record, fn relationship, record ->
case Map.get(record, relationship.name) do
%Ecto.Association.NotLoaded{} ->
Map.put(record, relationship.name, Map.get(empty, relationship.name))
value ->
Map.put(record, relationship.name, from_ecto(value))
end
end)
else
record
end
end
def from_ecto(other), do: other
def to_ecto(nil), do: nil
def to_ecto(value) when is_list(value) do
Enum.map(value, &to_ecto/1)
end
def to_ecto(%resource{} = record) do
if Spark.Dsl.is?(resource, Ash.Resource) do
resource
|> Ash.Resource.Info.relationships()
|> Enum.reduce(record, fn relationship, record ->
value =
case Map.get(record, relationship.name) do
%Ash.NotLoaded{} ->
%Ecto.Association.NotLoaded{
__field__: relationship.name,
__cardinality__: relationship.cardinality
}
value ->
to_ecto(value)
end
Map.put(record, relationship.name, value)
end)
else
record
end
end
def to_ecto(other), do: other
defp add_exclusion_constraints(changeset, resource) do
resource
|> AshSqlite.DataLayer.Info.exclusion_constraint_names()
|> Enum.reduce(changeset, fn constraint, changeset ->
case constraint do
{key, name} ->
Ecto.Changeset.exclusion_constraint(changeset, key, name: name)
{key, name, message} ->
Ecto.Changeset.exclusion_constraint(changeset, key, name: name, message: message)
end
end)
end
defp add_related_foreign_key_constraints(changeset, resource) do
# TODO: this doesn't guarantee us to get all of them, because if something is related to this
# schema and there is no back-relation, then this won't catch it's foreign key constraints
resource
|> Ash.Resource.Info.relationships()
|> Enum.map(& &1.destination)
|> Enum.uniq()
|> Enum.flat_map(fn related ->
related
|> Ash.Resource.Info.relationships()
|> Enum.filter(&(&1.destination == resource))
|> Enum.map(&Map.take(&1, [:source, :source_attribute, :destination_attribute, :name]))
end)
|> Enum.reduce(changeset, fn %{
source: source,
source_attribute: source_attribute,
destination_attribute: destination_attribute,
name: relationship_name
},
changeset ->
case AshSqlite.DataLayer.Info.reference(resource, relationship_name) do
%{name: name} when not is_nil(name) ->
Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
name: name,
message: "would leave records behind"
)
_ ->
Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
name: "#{AshSqlite.DataLayer.Info.table(source)}_#{source_attribute}_fkey",
message: "would leave records behind"
)
end
end)
end
defp add_my_foreign_key_constraints(changeset, resource) do
resource
|> Ash.Resource.Info.relationships()
|> Enum.reduce(changeset, &Ecto.Changeset.foreign_key_constraint(&2, &1.source_attribute))
end
defp add_configured_foreign_key_constraints(changeset, resource) do
resource
|> AshSqlite.DataLayer.Info.foreign_key_names()
|> case do
{m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
value -> List.wrap(value)
end
|> Enum.reduce(changeset, fn
{key, name}, changeset ->
Ecto.Changeset.foreign_key_constraint(changeset, key, name: name)
{key, name, message}, changeset ->
Ecto.Changeset.foreign_key_constraint(changeset, key, name: name, message: message)
end)
end
defp add_unique_indexes(changeset, resource, ash_changeset) do
changeset =
resource
|> Ash.Resource.Info.identities()
|> Enum.reduce(changeset, fn identity, changeset ->
name =
AshSqlite.DataLayer.Info.identity_index_names(resource)[identity.name] ||
"#{table(resource, ash_changeset)}_#{identity.name}_index"
opts =
if Map.get(identity, :message) do
[name: name, message: identity.message]
else
[name: name]
end
Ecto.Changeset.unique_constraint(changeset, identity.keys, opts)
end)
changeset =
resource
|> AshSqlite.DataLayer.Info.custom_indexes()
|> Enum.reduce(changeset, fn index, changeset ->
opts =
if index.message do
[name: index.name, message: index.message]
else
[name: index.name]
end
Ecto.Changeset.unique_constraint(changeset, index.fields, opts)
end)
names =
resource
|> AshSqlite.DataLayer.Info.unique_index_names()
|> case do
{m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
value -> List.wrap(value)
end
names =
case Ash.Resource.Info.primary_key(resource) do
[] ->
names
fields ->
if table = table(resource, ash_changeset) do
[{fields, table <> "_pkey"} | names]
else
[]
end
end
Enum.reduce(names, changeset, fn
{keys, name}, changeset ->
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name)
{keys, name, message}, changeset ->
Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name, message: message)
end)
end
@impl true
def upsert(resource, changeset, keys \\ nil) do
keys = keys || Ash.Resource.Info.primary_key(resource)
explicitly_changing_attributes =
Enum.map(
Map.keys(changeset.attributes) -- Map.get(changeset, :defaults, []) -- keys,
fn key ->
{key, Ash.Changeset.get_attribute(changeset, key)}
end
)
on_conflict =
changeset
|> update_defaults()
|> Keyword.merge(explicitly_changing_attributes)
case bulk_create(resource, [changeset], %{
single?: true,
upsert?: true,
upsert_keys: keys,
upsert_fields: Keyword.keys(on_conflict),
return_records?: true
}) do
{:ok, [result]} ->
{:ok, result}
{:error, error} ->
{:error, error}
end
end
defp conflict_target(resource, keys) do
if Ash.Resource.Info.base_filter(resource) do
base_filter_sql =
AshSqlite.DataLayer.Info.base_filter_sql(resource) ||
raise """
Cannot use upserts with resources that have a base_filter without also adding `base_filter_sql` in the sqlite section.
"""
sources =
Enum.map(keys, fn key ->
~s("#{Ash.Resource.Info.attribute(resource, key).source || key}")
end)
{:unsafe_fragment, "(" <> Enum.join(sources, ", ") <> ") WHERE (#{base_filter_sql})"}
else
keys
end
end
defp update_defaults(changeset) do
attributes =
changeset.resource
|> Ash.Resource.Info.attributes()
|> Enum.reject(&is_nil(&1.update_default))
attributes
|> static_defaults()
|> Enum.concat(lazy_matching_defaults(attributes))
|> Enum.concat(lazy_non_matching_defaults(attributes))
end
defp static_defaults(attributes) do
attributes
|> Enum.reject(&get_default_fun(&1))
|> Enum.map(&{&1.name, &1.update_default})
end
defp lazy_non_matching_defaults(attributes) do
attributes
|> Enum.filter(&(!&1.match_other_defaults? && get_default_fun(&1)))
|> Enum.map(fn attribute ->
default_value =
case attribute.update_default do
function when is_function(function) ->
function.()
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
apply(m, f, a)
end
{attribute.name, default_value}
end)
end
defp lazy_matching_defaults(attributes) do
attributes
|> Enum.filter(&(&1.match_other_defaults? && get_default_fun(&1)))
|> Enum.group_by(& &1.update_default)
|> Enum.flat_map(fn {default_fun, attributes} ->
default_value =
case default_fun do
function when is_function(function) ->
function.()
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
apply(m, f, a)
end
Enum.map(attributes, &{&1.name, default_value})
end)
end
defp get_default_fun(attribute) do
if is_function(attribute.update_default) or match?({_, _, _}, attribute.update_default) do
attribute.update_default
end
end
@impl true
def update(resource, changeset) do
ecto_changeset =
changeset.data
|> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
|> ecto_changeset(changeset, :update)
try do
query = from(row in resource, as: ^0)
select = Keyword.keys(changeset.atomics) ++ Ash.Resource.Info.primary_key(resource)
query =
query
|> default_bindings(resource, changeset.context)
|> Ecto.Query.select(^select)
query =
Enum.reduce(ecto_changeset.filters, query, fn {key, value}, query ->
from(row in query,
where: field(row, ^key) == ^value
)
end)
atomics_result =
Enum.reduce_while(changeset.atomics, {:ok, query, []}, fn {field, expr},
{:ok, query, set} ->
with {:ok, query} <-
AshSqlite.Join.join_all_relationships(
query,
%Ash.Filter{
resource: resource,
expression: expr
},
left_only?: true
),
dynamic <-
AshSqlite.Expr.dynamic_expr(query, expr, query.__ash_bindings__) do
{:cont, {:ok, query, Keyword.put(set, field, dynamic)}}
else
other ->
{:halt, other}
end
end)
case atomics_result do
{:ok, query, dynamics} ->
{params, set, count} =
ecto_changeset.changes
|> Map.to_list()
|> Enum.reduce({[], [], 0}, fn {key, value}, {params, set, count} ->
{[{value, {0, key}} | params], [{key, {:^, [], [count]}} | set], count + 1}
end)
{params, set, _} =
Enum.reduce(dynamics, {params, set, count}, fn {key, value}, {params, set, count} ->
case AshSqlite.Expr.dynamic_expr(query, value, query.__ash_bindings__) do
%Ecto.Query.DynamicExpr{} = dynamic ->
result =
Ecto.Query.Builder.Dynamic.partially_expand(
:select,
query,
dynamic,
params,
count
)
expr = elem(result, 0)
new_params = elem(result, 1)
new_count =
result |> Tuple.to_list() |> List.last()
{new_params, [{key, expr} | set], new_count}
other ->
{[{other, {0, key}} | params], [{key, {:^, [], [count]}} | set], count + 1}
end
end)
case set do
[] ->
{:ok, changeset.data}
set ->
query =
Map.put(query, :updates, [
%Ecto.Query.QueryExpr{
# why do I have to reverse the `set`???
# it breaks if I don't
expr: [set: Enum.reverse(set)],
params: Enum.reverse(params)
}
])
repo_opts = repo_opts(changeset.timeout, changeset.resource)
repo_opts =
Keyword.put(repo_opts, :returning, Keyword.keys(changeset.atomics))
result =
dynamic_repo(resource, changeset).update_all(
query,
[],
repo_opts
)
case result do
{0, []} ->
{:error,
Ash.Error.Changes.StaleRecord.exception(
resource: resource,
filters: ecto_changeset.filters
)}
{1, [result]} ->
record =
changeset.data
|> Map.merge(changeset.attributes)
|> Map.merge(Map.take(result, Keyword.keys(changeset.atomics)))
{:ok, record}
end
end
{:error, error} ->
{:error, error}
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
@impl true
def destroy(resource, %{data: record} = changeset) do
ecto_changeset = ecto_changeset(record, changeset, :delete)
try do
ecto_changeset
|> dynamic_repo(resource, changeset).delete(
repo_opts(changeset.timeout, changeset.resource)
)
|> from_ecto()
|> case do
{:ok, _record} ->
:ok
{:error, error} ->
handle_errors({:error, error})
end
rescue
e ->
handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
end
end
@impl true
def sort(query, sort, _resource) do
{:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :sort, sort))}
end
@impl true
def select(query, select, resource) do
query = default_bindings(query, resource)
{:ok,
from(row in query,
select: struct(row, ^Enum.uniq(select))
)}
end
defp apply_sort(query, sort, resource, directly? \\ false)
defp apply_sort(query, sort, _resource, _) when sort in [nil, []] do
{:ok, query |> set_sort_applied()}
end
defp apply_sort(query, sort, resource, directly?) do
query
|> AshSqlite.Sort.sort(sort, resource, [], 0, directly?)
|> case do
{:ok, sort} when directly? ->
{:ok, query |> Ecto.Query.order_by(^sort) |> set_sort_applied()}
{:ok, query} ->
{:ok, query |> set_sort_applied()}
{:error, error} ->
{:error, error}
end
end
@doc false
def unwrap_one([thing]), do: thing
def unwrap_one([]), do: nil
def unwrap_one(other), do: other
defp set_sort_applied(query) do
Map.update!(query, :__ash_bindings__, &Map.put(&1, :sort_applied?, true))
end
@impl true
def filter(query, filter, resource, opts \\ []) do
query = default_bindings(query, resource)
query
|> AshSqlite.Join.join_all_relationships(filter, opts)
|> case do
{:ok, query} ->
{:ok, add_filter_expression(query, filter)}
{:error, error} ->
{:error, error}
end
end
@doc false
def default_bindings(query, resource, context \\ %{}) do
start_bindings = context[:data_layer][:start_bindings_at] || 0
Map.put_new(query, :__ash_bindings__, %{
resource: resource,
current: Enum.count(query.joins) + 1 + start_bindings,
in_group?: false,
calculations: %{},
parent_resources: [],
context: context,
bindings: %{start_bindings => %{path: [], type: :root, source: resource}}
})
end
@impl true
def add_calculations(query, calculations, resource) do
AshSqlite.Calculation.add_calculations(query, calculations, resource, 0)
end
@doc false
def get_binding(resource, path, query, type, name_match \\ nil)
def get_binding(resource, path, %{__ash_bindings__: _} = query, type, name_match) do
types = List.wrap(type)
Enum.find_value(query.__ash_bindings__.bindings, fn
{binding, %{path: candidate_path, type: binding_type} = data} ->
if binding_type in types do
if name_match do
if data[:name] == name_match do
if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do
binding
end
end
else
if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do
binding
else
false
end
end
end
_ ->
nil
end)
end
def get_binding(_, _, _, _, _), do: nil
defp add_filter_expression(query, filter) do
filter
|> split_and_statements()
|> Enum.reduce(query, fn filter, query ->
dynamic = AshSqlite.Expr.dynamic_expr(query, filter, query.__ash_bindings__)
Ecto.Query.where(query, ^dynamic)
end)
end
defp split_and_statements(%Filter{expression: expression}) do
split_and_statements(expression)
end
defp split_and_statements(%BooleanExpression{op: :and, left: left, right: right}) do
split_and_statements(left) ++ split_and_statements(right)
end
defp split_and_statements(%Not{expression: %Not{expression: expression}}) do
split_and_statements(expression)
end
defp split_and_statements(%Not{
expression: %BooleanExpression{op: :or, left: left, right: right}
}) do
split_and_statements(%BooleanExpression{
op: :and,
left: %Not{expression: left},
right: %Not{expression: right}
})
end
defp split_and_statements(other), do: [other]
@doc false
def add_binding(query, data, additional_bindings \\ 0) do
current = query.__ash_bindings__.current
bindings = query.__ash_bindings__.bindings
new_ash_bindings = %{
query.__ash_bindings__
| bindings: Map.put(bindings, current, data),
current: current + 1 + additional_bindings
}
%{query | __ash_bindings__: new_ash_bindings}
end
def add_known_binding(query, data, known_binding) do
bindings = query.__ash_bindings__.bindings
new_ash_bindings = %{
query.__ash_bindings__
| bindings: Map.put(bindings, known_binding, data)
}
%{query | __ash_bindings__: new_ash_bindings}
end
@impl true
def rollback(resource, term) do
AshSqlite.DataLayer.Info.repo(resource).rollback(term)
end
defp table(resource, changeset) do
changeset.context[:data_layer][:table] || AshSqlite.DataLayer.Info.table(resource)
end
defp raise_table_error!(resource, operation) do
if AshSqlite.DataLayer.Info.polymorphic?(resource) do
raise """
Could not determine table for #{operation} on #{inspect(resource)}.
Polymorphic resources require that the `data_layer[:table]` context is provided.
See the guide on polymorphic resources for more information.
"""
else
raise """
Could not determine table for #{operation} on #{inspect(resource)}.
"""
end
end
defp dynamic_repo(resource, %{__ash_bindings__: %{context: %{data_layer: %{repo: repo}}}}) do
repo || AshSqlite.DataLayer.Info.repo(resource)
end
defp dynamic_repo(resource, %{context: %{data_layer: %{repo: repo}}}) do
repo || AshSqlite.DataLayer.Info.repo(resource)
end
defp dynamic_repo(resource, _) do
AshSqlite.DataLayer.Info.repo(resource)
end
end