2023-09-23 14:52:22 +12:00
defmodule AshSqlite.DataLayer do
@index % Spark.Dsl.Entity {
name : :index ,
describe : """
Add an index to be managed by the migration generator .
""" ,
examples : [
" index [ \" column \" , \" column2 \" ], unique: true, where: \" thing = TRUE \" "
] ,
target : AshSqlite.CustomIndex ,
schema : AshSqlite.CustomIndex . schema ( ) ,
transform : { AshSqlite.CustomIndex , :transform , [ ] } ,
args : [ :fields ]
}
@custom_indexes % Spark.Dsl.Section {
name : :custom_indexes ,
describe : """
A section for configuring indexes to be created by the migration generator .
In general , prefer to use ` identities ` for simple unique constraints . This is a tool to allow
for declaring more complex indexes .
""" ,
examples : [
"""
custom_indexes do
index [ :column1 , :column2 ] , unique : true , where : " thing = TRUE "
end
"""
] ,
entities : [
@index
]
}
@statement % Spark.Dsl.Entity {
name : :statement ,
describe : """
Add a custom statement for migrations .
""" ,
examples : [
"""
statement :pgweb_idx do
up " CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body)); "
down " DROP INDEX pgweb_idx; "
end
"""
] ,
target : AshSqlite.Statement ,
schema : AshSqlite.Statement . schema ( ) ,
args : [ :name ]
}
@custom_statements % Spark.Dsl.Section {
name : :custom_statements ,
describe : """
A section for configuring custom statements to be added to migrations .
Changing custom statements may require manual intervention , because Ash can ' t determine what order they should run
in ( i . e if they depend on table structure that you ' ve added, or vice versa). As such, any `down` statements we run
for custom statements happen first , and any ` up ` statements happen last .
Additionally , when changing a custom statement , we must make some assumptions , i . e that we should migrate
the old structure down using the previously configured ` down ` and recreate it .
This may not be desired , and so what you may end up doing is simply modifying the old migration and deleting whatever was
generated by the migration generator . As always : read your migrations after generating them!
""" ,
examples : [
"""
custom_statements do
# the name is used to detect if you remove or modify the statement
statement :pgweb_idx do
up " CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body)); "
down " DROP INDEX pgweb_idx; "
end
end
"""
] ,
entities : [
@statement
]
}
@reference % Spark.Dsl.Entity {
name : :reference ,
describe : """
Configures the reference for a relationship in resource migrations .
Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys .
In those cases , you only need to configure the ` reference ` behavior for one of them . Any conflicts will result
in an error , across this resource and any other resources that share a table with this one . For this reason ,
instead of adding a reference configuration for ` :nothing ` , its best to just leave the configuration out , as that
is the default behavior if * no * relationship anywhere has configured the behavior of that reference .
""" ,
examples : [
" reference :post, on_delete: :delete, on_update: :update, name: \" comments_to_posts_fkey \" "
] ,
args : [ :relationship ] ,
target : AshSqlite.Reference ,
schema : AshSqlite.Reference . schema ( )
}
@references % Spark.Dsl.Section {
name : :references ,
describe : """
A section for configuring the references ( foreign keys ) in resource migrations .
This section is only relevant if you are using the migration generator with this resource .
Otherwise , it has no effect .
""" ,
examples : [
"""
references do
reference :post , on_delete : :delete , on_update : :update , name : " comments_to_posts_fkey "
end
"""
] ,
entities : [ @reference ] ,
schema : [
polymorphic_on_delete : [
type : { :one_of , [ :delete , :nilify , :nothing , :restrict ] } ,
doc :
" For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables. "
] ,
polymorphic_on_update : [
type : { :one_of , [ :update , :nilify , :nothing , :restrict ] } ,
doc :
" For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables. "
] ,
polymorphic_name : [
type : { :one_of , [ :update , :nilify , :nothing , :restrict ] } ,
doc :
" For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables. "
]
]
}
@references % Spark.Dsl.Section {
name : :references ,
describe : """
A section for configuring the references ( foreign keys ) in resource migrations .
This section is only relevant if you are using the migration generator with this resource .
Otherwise , it has no effect .
""" ,
examples : [
"""
references do
reference :post , on_delete : :delete , on_update : :update , name : " comments_to_posts_fkey "
end
"""
] ,
entities : [ @reference ] ,
schema : [
polymorphic_on_delete : [
type : { :one_of , [ :delete , :nilify , :nothing , :restrict ] } ,
doc :
" For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables. "
] ,
polymorphic_on_update : [
type : { :one_of , [ :update , :nilify , :nothing , :restrict ] } ,
doc :
" For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables. "
] ,
polymorphic_name : [
type : { :one_of , [ :update , :nilify , :nothing , :restrict ] } ,
doc :
" For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables. "
]
]
}
@sqlite % Spark.Dsl.Section {
name : :sqlite ,
describe : """
Sqlite data layer configuration
""" ,
sections : [
@custom_indexes ,
@custom_statements ,
2023-09-23 17:32:12 +12:00
@references
2023-09-23 14:52:22 +12:00
] ,
modules : [
:repo
] ,
examples : [
"""
sqlite do
repo MyApp.Repo
table " organizations "
end
"""
] ,
schema : [
repo : [
type : :atom ,
required : true ,
doc :
" The repo that will be used to fetch your data. See the `AshSqlite.Repo` documentation for more "
] ,
migrate? : [
type : :boolean ,
default : true ,
doc :
" Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations` "
] ,
migration_types : [
type : :keyword_list ,
default : [ ] ,
doc :
" A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults. "
] ,
migration_defaults : [
type : :keyword_list ,
default : [ ] ,
doc : """
A keyword list of attribute names to the ecto migration default that should be used for that attribute . The string you use will be placed verbatim in the migration . Use fragments like ` fragment ( \\ \\ " now() \\ \\ " ) ` , or for ` nil ` , use ` \\ \\ " nil \\ \\ " ` .
"""
] ,
base_filter_sql : [
type : :string ,
doc :
" A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter "
] ,
skip_unique_indexes : [
type : { :wrap_list , :atom } ,
default : false ,
doc : " Skip generating unique indexes when generating migrations "
] ,
unique_index_names : [
type :
{ :list ,
{ :or ,
[ { :tuple , [ { :list , :atom } , :string ] } , { :tuple , [ { :list , :atom } , :string , :string ] } ] } } ,
default : [ ] ,
doc : """
A list of unique index names that could raise errors that are not configured in identities , or an mfa to a function that takes a changeset and returns the list . In the format ` { [ :affected , :keys ] , " name_of_constraint " } ` or ` { [ :affected , :keys ] , " name_of_constraint " , " custom error message " } `
"""
] ,
exclusion_constraint_names : [
type : :any ,
default : [ ] ,
doc : """
A list of exclusion constraint names that could raise errors . Must be in the format ` { :affected_key , " name_of_constraint " } ` or ` { :affected_key , " name_of_constraint " , " custom error message " } `
"""
] ,
identity_index_names : [
type : :any ,
default : [ ] ,
doc : """
A keyword list of identity names to the unique index name that they should use when being managed by the migration generator .
"""
] ,
foreign_key_names : [
type : { :list , { :or , [ { :tuple , [ :atom , :string ] } , { :tuple , [ :string , :string ] } ] } } ,
default : [ ] ,
doc : """
A list of foreign keys that could raise errors , or an mfa to a function that takes a changeset and returns a list . In the format : ` { :key , " name_of_constraint " } ` or ` { :key , " name_of_constraint " , " custom error message " } `
"""
] ,
migration_ignore_attributes : [
type : { :list , :atom } ,
default : [ ] ,
doc : """
A list of attributes that will be ignored when generating migrations .
"""
] ,
table : [
type : :string ,
doc : """
The table to store and read the resource from . If this is changed , the migration generator will not remove the old table .
"""
] ,
polymorphic? : [
type : :boolean ,
default : false ,
doc : """
Declares this resource as polymorphic . See the [ polymorphic resources guide ] ( / documentation / topics / polymorphic_resources . md ) for more .
"""
]
]
}
alias Ash.Filter
alias Ash.Query . { BooleanExpression , Not , Ref }
@behaviour Ash.DataLayer
@sections [ @sqlite ]
@moduledoc """
A sqlite data layer that leverages Ecto ' s sqlite capabilities.
"""
use Spark.Dsl.Extension ,
sections : @sections ,
transformers : [
AshSqlite.Transformers.ValidateReferences ,
AshSqlite.Transformers.VerifyRepo ,
AshSqlite.Transformers.EnsureTableOrPolymorphic
]
def migrate ( args ) do
# TODO: take args that we care about
Mix.Task . run ( " ash_sqlite.migrate " , args )
end
def codegen ( args ) do
# TODO: take args that we care about
Mix.Task . run ( " ash_sqlite.generate_migrations " , args )
end
def setup ( args ) do
# TODO: take args that we care about
Mix.Task . run ( " ash_sqlite.create " , args )
Mix.Task . run ( " ash_sqlite.migrate " , args )
end
def tear_down ( args ) do
# TODO: take args that we care about
Mix.Task . run ( " ash_sqlite.drop " , args )
end
import Ecto.Query , only : [ from : 2 , subquery : 1 ]
@impl true
2023-09-23 17:32:12 +12:00
def can? ( _ , :async_engine ) , do : false
2023-09-23 14:52:22 +12:00
def can? ( _ , :bulk_create ) , do : true
2023-09-23 17:32:12 +12:00
def can? ( _ , { :lock , _ } ) , do : false
2023-09-23 14:52:22 +12:00
def can? ( _ , :transact ) , do : true
def can? ( _ , :composite_primary_key ) , do : true
def can? ( _ , { :atomic , :update } ) , do : true
def can? ( _ , :upsert ) , do : true
def can? ( _ , :changeset_filter ) , do : true
def can? ( resource , { :join , other_resource } ) do
data_layer = Ash.DataLayer . data_layer ( resource )
other_data_layer = Ash.DataLayer . data_layer ( other_resource )
data_layer == other_data_layer and
AshSqlite.DataLayer.Info . repo ( resource ) == AshSqlite.DataLayer.Info . repo ( other_resource )
end
2023-09-23 15:11:45 +12:00
def can? ( _resource , { :lateral_join , _ } ) do
2023-09-23 14:52:22 +12:00
false
end
def can? ( _ , :boolean_filter ) , do : true
def can? ( _ , { :aggregate , _type } ) , do : false
def can? ( _ , :aggregate_filter ) , do : false
def can? ( _ , :aggregate_sort ) , do : false
def can? ( _ , :expression_calculation ) , do : true
def can? ( _ , :expression_calculation_sort ) , do : true
def can? ( _ , :create ) , do : true
def can? ( _ , :select ) , do : true
def can? ( _ , :read ) , do : true
def can? ( resource , action ) when action in ~w[ update destroy ]a do
resource
|> Ash.Resource.Info . primary_key ( )
|> Enum . any? ( )
end
def can? ( _ , :filter ) , do : true
def can? ( _ , :limit ) , do : true
def can? ( _ , :offset ) , do : true
def can? ( _ , :multitenancy ) , do : false
def can? ( _ , { :filter_relationship , %{ manual : { module , _ } } } ) do
Spark . implements_behaviour? ( module , AshSqlite.ManualRelationship )
end
def can? ( _ , { :filter_relationship , _ } ) , do : true
def can? ( _ , { :aggregate_relationship , _ } ) , do : false
def can? ( _ , :timeout ) , do : true
def can? ( _ , { :filter_expr , _ } ) , do : true
def can? ( _ , :nested_expressions ) , do : true
def can? ( _ , { :query_aggregate , :count } ) , do : false
def can? ( _ , :sort ) , do : true
def can? ( _ , :distinct_sort ) , do : true
def can? ( _ , :distinct ) , do : true
def can? ( _ , { :sort , _ } ) , do : true
def can? ( _ , _ ) , do : false
@impl true
def in_transaction? ( resource ) do
AshSqlite.DataLayer.Info . repo ( resource ) . in_transaction? ( )
end
@impl true
def limit ( query , nil , _ ) , do : { :ok , query }
def limit ( query , limit , _resource ) do
{ :ok , from ( row in query , limit : ^ limit ) }
end
@impl true
def source ( resource ) do
AshSqlite.DataLayer.Info . table ( resource ) || " "
end
@impl true
def set_context ( resource , data_layer_query , context ) do
start_bindings = context [ :data_layer ] [ :start_bindings_at ] || 0
data_layer_query = from ( row in data_layer_query , as : ^ start_bindings )
data_layer_query =
if context [ :data_layer ] [ :table ] do
%{
data_layer_query
| from : %{ data_layer_query . from | source : { context [ :data_layer ] [ :table ] , resource } }
}
else
data_layer_query
end
2023-09-23 17:32:12 +12:00
{ :ok , default_bindings ( data_layer_query , resource , context ) }
2023-09-23 14:52:22 +12:00
end
@impl true
def offset ( query , nil , _ ) , do : query
def offset ( %{ offset : old_offset } = query , 0 , _resource ) when old_offset in [ 0 , nil ] do
{ :ok , query }
end
def offset ( query , offset , _resource ) do
{ :ok , from ( row in query , offset : ^ offset ) }
end
@impl true
def run_query ( query , resource ) do
query = default_bindings ( query , resource )
with_sort_applied =
if query . __ash_bindings__ [ :sort_applied? ] do
{ :ok , query }
else
apply_sort ( query , query . __ash_bindings__ [ :sort ] , resource )
end
case with_sort_applied do
{ :error , error } ->
{ :error , error }
{ :ok , query } ->
query =
if query . __ash_bindings__ [ :__order__? ] && query . windows [ :order ] do
if query . distinct do
query_with_order =
from ( row in query , select_merge : %{ __order__ : over ( row_number ( ) , :order ) } )
query_without_limit_and_offset =
query_with_order
|> Ecto.Query . exclude ( :limit )
|> Ecto.Query . exclude ( :offset )
from ( row in subquery ( query_without_limit_and_offset ) ,
select : row ,
order_by : row . __order__
)
|> Map . put ( :limit , query . limit )
|> Map . put ( :offset , query . offset )
else
order_by = %{ query . windows [ :order ] | expr : query . windows [ :order ] . expr [ :order_by ] }
%{
query
| windows : Keyword . delete ( query . windows , :order ) ,
order_bys : [ order_by ]
}
end
else
%{ query | windows : Keyword . delete ( query . windows , :order ) }
end
if AshSqlite.DataLayer.Info . polymorphic? ( resource ) && no_table? ( query ) do
raise_table_error! ( resource , :read )
else
{ :ok , dynamic_repo ( resource , query ) . all ( query , repo_opts ( nil , nil , resource ) ) }
end
end
rescue
e ->
handle_raised_error ( e , __STACKTRACE__ , query , resource )
end
defp no_table? ( %{ from : %{ source : { " " , _ } } } ) , do : true
defp no_table? ( _ ) , do : false
2023-09-23 15:11:45 +12:00
defp repo_opts ( timeout , nil , _resource ) do
2023-09-23 17:32:12 +12:00
[ ]
|> add_timeout ( timeout )
2023-09-23 14:52:22 +12:00
end
defp repo_opts ( timeout , _resource ) do
add_timeout ( [ ] , timeout )
end
defp add_timeout ( opts , timeout ) when not is_nil ( timeout ) do
Keyword . put ( opts , :timeout , timeout )
end
defp add_timeout ( opts , _ ) , do : opts
@impl true
def functions ( _resource ) do
[
AshSqlite.Functions.Fragment ,
2023-09-23 17:32:12 +12:00
AshSqlite.Functions.Like
2023-09-23 14:52:22 +12:00
]
end
@impl true
def resource_to_query ( resource , _ ) do
from ( row in { AshSqlite.DataLayer.Info . table ( resource ) || " " , resource } , [ ] )
end
@impl true
def bulk_create ( resource , stream , options ) do
opts = repo_opts ( nil , resource )
opts =
if options . return_records? do
Keyword . put ( opts , :returning , true )
else
opts
end
opts =
if options [ :upsert? ] do
opts
|> Keyword . put ( :on_conflict , { :replace , options [ :upsert_fields ] || [ ] } )
|> Keyword . put (
:conflict_target ,
conflict_target (
resource ,
options [ :upsert_keys ] || Ash.Resource.Info . primary_key ( resource )
)
)
else
opts
end
changesets = Enum . to_list ( stream )
ecto_changesets = Enum . map ( changesets , & &1 . attributes )
source =
if table = Enum . at ( changesets , 0 ) . context [ :data_layer ] [ :table ] do
{ table , resource }
else
resource
end
repo = dynamic_repo ( resource , Enum . at ( changesets , 0 ) )
source
|> repo . insert_all ( ecto_changesets , opts )
|> case do
{ _ , nil } ->
:ok
{ _ , results } ->
if options [ :single? ] do
{ :ok , results }
else
{ :ok ,
Stream . zip_with ( results , changesets , fn result , changeset ->
Ash.Resource . put_metadata (
result ,
:bulk_create_index ,
changeset . context . bulk_create . index
)
end ) }
end
end
rescue
e ->
changeset = Ash.Changeset . new ( resource )
handle_raised_error (
e ,
__STACKTRACE__ ,
{ :bulk_create , ecto_changeset ( changeset . data , changeset , :create , false ) } ,
resource
)
end
@impl true
def create ( resource , changeset ) do
changeset = %{
changeset
| data :
Map . update! (
changeset . data ,
:__meta__ ,
& Map . put ( &1 , :source , table ( resource , changeset ) )
)
}
case bulk_create ( resource , [ changeset ] , %{
single? : true ,
return_records? : true
} ) do
{ :ok , [ result ] } ->
{ :ok , result }
{ :error , error } ->
{ :error , error }
end
end
defp handle_errors ( { :error , % Ecto.Changeset { errors : errors } } ) do
{ :error , Enum . map ( errors , & to_ash_error / 1 ) }
end
defp to_ash_error ( { field , { message , vars } } ) do
Ash.Error.Changes.InvalidAttribute . exception (
field : field ,
message : message ,
private_vars : vars
)
end
defp ecto_changeset ( record , changeset , type , table_error? \\ true ) do
filters =
if changeset . action_type == :create do
%{ }
else
Map . get ( changeset , :filters , %{ } )
end
filters =
if changeset . action_type == :create do
filters
else
changeset . resource
|> Ash.Resource.Info . primary_key ( )
|> Enum . reduce ( filters , fn key , filters ->
Map . put ( filters , key , Map . get ( record , key ) )
end )
end
attributes =
changeset . resource
|> Ash.Resource.Info . attributes ( )
|> Enum . map ( & &1 . name )
attributes_to_change =
Enum . reject ( attributes , fn attribute ->
Keyword . has_key? ( changeset . atomics , attribute )
end )
ecto_changeset =
record
|> to_ecto ( )
|> set_table ( changeset , type , table_error? )
|> Ecto.Changeset . change ( Map . take ( changeset . attributes , attributes_to_change ) )
|> Map . update! ( :filters , & Map . merge ( &1 , filters ) )
|> add_configured_foreign_key_constraints ( record . __struct__ )
|> add_unique_indexes ( record . __struct__ , changeset )
|> add_exclusion_constraints ( record . __struct__ )
case type do
:create ->
ecto_changeset
|> add_my_foreign_key_constraints ( record . __struct__ )
type when type in [ :upsert , :update ] ->
ecto_changeset
|> add_my_foreign_key_constraints ( record . __struct__ )
|> add_related_foreign_key_constraints ( record . __struct__ )
:delete ->
ecto_changeset
|> add_related_foreign_key_constraints ( record . __struct__ )
end
end
defp handle_raised_error (
% Ecto.StaleEntryError { changeset : %{ data : % resource { } , filters : filters } } ,
stacktrace ,
context ,
resource
) do
handle_raised_error (
Ash.Error.Changes.StaleRecord . exception ( resource : resource , filters : filters ) ,
stacktrace ,
context ,
resource
)
end
defp handle_raised_error ( % Ecto.Query.CastError { } = e , stacktrace , context , resource ) do
handle_raised_error (
Ash.Error.Query.InvalidFilterValue . exception ( value : e . value , context : context ) ,
stacktrace ,
context ,
resource
)
end
defp handle_raised_error ( error , stacktrace , _ecto_changeset , _resource ) do
{ :error , Ash.Error . to_ash_error ( error , stacktrace ) }
end
defp set_table ( record , changeset , operation , table_error? ) do
if AshSqlite.DataLayer.Info . polymorphic? ( record . __struct__ ) do
table =
changeset . context [ :data_layer ] [ :table ] ||
AshSqlite.DataLayer.Info . table ( record . __struct__ )
2023-09-23 17:32:12 +12:00
if table do
Ecto . put_meta ( record , source : table )
else
if table_error? do
raise_table_error! ( changeset . resource , operation )
2023-09-23 14:52:22 +12:00
else
2023-09-23 17:32:12 +12:00
record
2023-09-23 14:52:22 +12:00
end
2023-09-23 17:32:12 +12:00
end
else
2023-09-23 14:52:22 +12:00
record
end
end
def from_ecto ( { :ok , result } ) , do : { :ok , from_ecto ( result ) }
def from_ecto ( { :error , _ } = other ) , do : other
def from_ecto ( nil ) , do : nil
def from_ecto ( value ) when is_list ( value ) do
Enum . map ( value , & from_ecto / 1 )
end
def from_ecto ( % resource { } = record ) do
if Spark.Dsl . is? ( resource , Ash.Resource ) do
empty = struct ( resource )
resource
|> Ash.Resource.Info . relationships ( )
|> Enum . reduce ( record , fn relationship , record ->
case Map . get ( record , relationship . name ) do
% Ecto.Association.NotLoaded { } ->
Map . put ( record , relationship . name , Map . get ( empty , relationship . name ) )
value ->
Map . put ( record , relationship . name , from_ecto ( value ) )
end
end )
else
record
end
end
def from_ecto ( other ) , do : other
def to_ecto ( nil ) , do : nil
def to_ecto ( value ) when is_list ( value ) do
Enum . map ( value , & to_ecto / 1 )
end
def to_ecto ( % resource { } = record ) do
if Spark.Dsl . is? ( resource , Ash.Resource ) do
resource
|> Ash.Resource.Info . relationships ( )
|> Enum . reduce ( record , fn relationship , record ->
value =
case Map . get ( record , relationship . name ) do
% Ash.NotLoaded { } ->
% Ecto.Association.NotLoaded {
__field__ : relationship . name ,
__cardinality__ : relationship . cardinality
}
value ->
to_ecto ( value )
end
Map . put ( record , relationship . name , value )
end )
else
record
end
end
def to_ecto ( other ) , do : other
defp add_exclusion_constraints ( changeset , resource ) do
resource
|> AshSqlite.DataLayer.Info . exclusion_constraint_names ( )
|> Enum . reduce ( changeset , fn constraint , changeset ->
case constraint do
{ key , name } ->
Ecto.Changeset . exclusion_constraint ( changeset , key , name : name )
{ key , name , message } ->
Ecto.Changeset . exclusion_constraint ( changeset , key , name : name , message : message )
end
end )
end
defp add_related_foreign_key_constraints ( changeset , resource ) do
# TODO: this doesn't guarantee us to get all of them, because if something is related to this
# schema and there is no back-relation, then this won't catch it's foreign key constraints
resource
|> Ash.Resource.Info . relationships ( )
|> Enum . map ( & &1 . destination )
|> Enum . uniq ( )
|> Enum . flat_map ( fn related ->
related
|> Ash.Resource.Info . relationships ( )
|> Enum . filter ( & ( &1 . destination == resource ) )
|> Enum . map ( & Map . take ( &1 , [ :source , :source_attribute , :destination_attribute , :name ] ) )
end )
|> Enum . reduce ( changeset , fn %{
source : source ,
source_attribute : source_attribute ,
destination_attribute : destination_attribute ,
name : relationship_name
} ,
changeset ->
case AshSqlite.DataLayer.Info . reference ( resource , relationship_name ) do
%{ name : name } when not is_nil ( name ) ->
Ecto.Changeset . foreign_key_constraint ( changeset , destination_attribute ,
name : name ,
message : " would leave records behind "
)
_ ->
Ecto.Changeset . foreign_key_constraint ( changeset , destination_attribute ,
name : " #{ AshSqlite.DataLayer.Info . table ( source ) } _ #{ source_attribute } _fkey " ,
message : " would leave records behind "
)
end
end )
end
defp add_my_foreign_key_constraints ( changeset , resource ) do
resource
|> Ash.Resource.Info . relationships ( )
|> Enum . reduce ( changeset , & Ecto.Changeset . foreign_key_constraint ( &2 , &1 . source_attribute ) )
end
defp add_configured_foreign_key_constraints ( changeset , resource ) do
resource
|> AshSqlite.DataLayer.Info . foreign_key_names ( )
|> case do
{ m , f , a } -> List . wrap ( apply ( m , f , [ changeset | a ] ) )
value -> List . wrap ( value )
end
|> Enum . reduce ( changeset , fn
{ key , name } , changeset ->
Ecto.Changeset . foreign_key_constraint ( changeset , key , name : name )
{ key , name , message } , changeset ->
Ecto.Changeset . foreign_key_constraint ( changeset , key , name : name , message : message )
end )
end
defp add_unique_indexes ( changeset , resource , ash_changeset ) do
changeset =
resource
|> Ash.Resource.Info . identities ( )
|> Enum . reduce ( changeset , fn identity , changeset ->
name =
AshSqlite.DataLayer.Info . identity_index_names ( resource ) [ identity . name ] ||
" #{ table ( resource , ash_changeset ) } _ #{ identity . name } _index "
opts =
if Map . get ( identity , :message ) do
[ name : name , message : identity . message ]
else
[ name : name ]
end
Ecto.Changeset . unique_constraint ( changeset , identity . keys , opts )
end )
changeset =
resource
|> AshSqlite.DataLayer.Info . custom_indexes ( )
|> Enum . reduce ( changeset , fn index , changeset ->
opts =
if index . message do
[ name : index . name , message : index . message ]
else
[ name : index . name ]
end
Ecto.Changeset . unique_constraint ( changeset , index . fields , opts )
end )
names =
resource
|> AshSqlite.DataLayer.Info . unique_index_names ( )
|> case do
{ m , f , a } -> List . wrap ( apply ( m , f , [ changeset | a ] ) )
value -> List . wrap ( value )
end
names =
case Ash.Resource.Info . primary_key ( resource ) do
[ ] ->
names
fields ->
if table = table ( resource , ash_changeset ) do
[ { fields , table <> " _pkey " } | names ]
else
[ ]
end
end
Enum . reduce ( names , changeset , fn
{ keys , name } , changeset ->
Ecto.Changeset . unique_constraint ( changeset , List . wrap ( keys ) , name : name )
{ keys , name , message } , changeset ->
Ecto.Changeset . unique_constraint ( changeset , List . wrap ( keys ) , name : name , message : message )
end )
end
@impl true
def upsert ( resource , changeset , keys \\ nil ) do
2023-09-23 17:32:12 +12:00
keys = keys || Ash.Resource.Info . primary_key ( resource )
2023-09-23 14:52:22 +12:00
2023-09-23 17:32:12 +12:00
explicitly_changing_attributes =
Enum . map (
Map . keys ( changeset . attributes ) -- Map . get ( changeset , :defaults , [ ] ) -- keys ,
fn key ->
{ key , Ash.Changeset . get_attribute ( changeset , key ) }
end
)
2023-09-23 14:52:22 +12:00
2023-09-23 17:32:12 +12:00
on_conflict =
changeset
|> update_defaults ( )
|> Keyword . merge ( explicitly_changing_attributes )
2023-09-23 14:52:22 +12:00
2023-09-23 17:32:12 +12:00
case bulk_create ( resource , [ changeset ] , %{
single? : true ,
upsert? : true ,
upsert_keys : keys ,
upsert_fields : Keyword . keys ( on_conflict ) ,
return_records? : true
} ) do
{ :ok , [ result ] } ->
{ :ok , result }
{ :error , error } ->
{ :error , error }
end
2023-09-23 14:52:22 +12:00
end
defp conflict_target ( resource , keys ) do
if Ash.Resource.Info . base_filter ( resource ) do
base_filter_sql =
AshSqlite.DataLayer.Info . base_filter_sql ( resource ) ||
raise """
Cannot use upserts with resources that have a base_filter without also adding ` base_filter_sql ` in the sqlite section .
"""
sources =
Enum . map ( keys , fn key ->
~s( " #{ Ash.Resource.Info . attribute ( resource , key ) . source || key } " )
end )
{ :unsafe_fragment , " ( " <> Enum . join ( sources , " , " ) <> " ) WHERE ( #{ base_filter_sql } ) " }
else
keys
end
end
defp update_defaults ( changeset ) do
attributes =
changeset . resource
|> Ash.Resource.Info . attributes ( )
|> Enum . reject ( & is_nil ( &1 . update_default ) )
attributes
|> static_defaults ( )
|> Enum . concat ( lazy_matching_defaults ( attributes ) )
|> Enum . concat ( lazy_non_matching_defaults ( attributes ) )
end
defp static_defaults ( attributes ) do
attributes
|> Enum . reject ( & get_default_fun ( &1 ) )
|> Enum . map ( & { &1 . name , &1 . update_default } )
end
defp lazy_non_matching_defaults ( attributes ) do
attributes
|> Enum . filter ( & ( ! &1 . match_other_defaults? && get_default_fun ( &1 ) ) )
|> Enum . map ( fn attribute ->
default_value =
case attribute . update_default do
function when is_function ( function ) ->
function . ( )
{ m , f , a } when is_atom ( m ) and is_atom ( f ) and is_list ( a ) ->
apply ( m , f , a )
end
{ attribute . name , default_value }
end )
end
defp lazy_matching_defaults ( attributes ) do
attributes
|> Enum . filter ( & ( &1 . match_other_defaults? && get_default_fun ( &1 ) ) )
|> Enum . group_by ( & &1 . update_default )
|> Enum . flat_map ( fn { default_fun , attributes } ->
default_value =
case default_fun do
function when is_function ( function ) ->
function . ( )
{ m , f , a } when is_atom ( m ) and is_atom ( f ) and is_list ( a ) ->
apply ( m , f , a )
end
Enum . map ( attributes , & { &1 . name , default_value } )
end )
end
defp get_default_fun ( attribute ) do
if is_function ( attribute . update_default ) or match? ( { _ , _ , _ } , attribute . update_default ) do
attribute . update_default
end
end
@impl true
def update ( resource , changeset ) do
ecto_changeset =
changeset . data
|> Map . update! ( :__meta__ , & Map . put ( &1 , :source , table ( resource , changeset ) ) )
|> ecto_changeset ( changeset , :update )
try do
query = from ( row in resource , as : ^ 0 )
select = Keyword . keys ( changeset . atomics ) ++ Ash.Resource.Info . primary_key ( resource )
query =
query
|> default_bindings ( resource , changeset . context )
|> Ecto.Query . select ( ^ select )
query =
Enum . reduce ( ecto_changeset . filters , query , fn { key , value } , query ->
from ( row in query ,
where : field ( row , ^ key ) == ^ value
)
end )
atomics_result =
Enum . reduce_while ( changeset . atomics , { :ok , query , [ ] } , fn { field , expr } ,
{ :ok , query , set } ->
with { :ok , query } <-
AshSqlite.Join . join_all_relationships (
query ,
% Ash.Filter {
resource : resource ,
expression : expr
} ,
left_only? : true
) ,
dynamic <-
AshSqlite.Expr . dynamic_expr ( query , expr , query . __ash_bindings__ ) do
{ :cont , { :ok , query , Keyword . put ( set , field , dynamic ) } }
else
other ->
{ :halt , other }
end
end )
case atomics_result do
{ :ok , query , dynamics } ->
{ params , set , count } =
ecto_changeset . changes
|> Map . to_list ( )
|> Enum . reduce ( { [ ] , [ ] , 0 } , fn { key , value } , { params , set , count } ->
{ [ { value , { 0 , key } } | params ] , [ { key , { :^ , [ ] , [ count ] } } | set ] , count + 1 }
end )
{ params , set , _ } =
Enum . reduce ( dynamics , { params , set , count } , fn { key , value } , { params , set , count } ->
case AshSqlite.Expr . dynamic_expr ( query , value , query . __ash_bindings__ ) do
% Ecto.Query.DynamicExpr { } = dynamic ->
result =
Ecto.Query.Builder.Dynamic . partially_expand (
:select ,
query ,
dynamic ,
params ,
count
)
expr = elem ( result , 0 )
new_params = elem ( result , 1 )
new_count =
result |> Tuple . to_list ( ) |> List . last ( )
{ new_params , [ { key , expr } | set ] , new_count }
other ->
{ [ { other , { 0 , key } } | params ] , [ { key , { :^ , [ ] , [ count ] } } | set ] , count + 1 }
end
end )
case set do
[ ] ->
{ :ok , changeset . data }
set ->
query =
Map . put ( query , :updates , [
% Ecto.Query.QueryExpr {
# why do I have to reverse the `set`???
# it breaks if I don't
expr : [ set : Enum . reverse ( set ) ] ,
params : Enum . reverse ( params )
}
] )
repo_opts = repo_opts ( changeset . timeout , changeset . resource )
repo_opts =
Keyword . put ( repo_opts , :returning , Keyword . keys ( changeset . atomics ) )
result =
dynamic_repo ( resource , changeset ) . update_all (
query ,
[ ] ,
repo_opts
)
case result do
{ 0 , [ ] } ->
{ :error ,
Ash.Error.Changes.StaleRecord . exception (
resource : resource ,
filters : ecto_changeset . filters
) }
{ 1 , [ result ] } ->
record =
changeset . data
|> Map . merge ( changeset . attributes )
|> Map . merge ( Map . take ( result , Keyword . keys ( changeset . atomics ) ) )
{ :ok , record }
end
end
{ :error , error } ->
{ :error , error }
end
rescue
e ->
handle_raised_error ( e , __STACKTRACE__ , ecto_changeset , resource )
end
end
@impl true
def destroy ( resource , %{ data : record } = changeset ) do
ecto_changeset = ecto_changeset ( record , changeset , :delete )
try do
ecto_changeset
|> dynamic_repo ( resource , changeset ) . delete (
repo_opts ( changeset . timeout , changeset . resource )
)
|> from_ecto ( )
|> case do
{ :ok , _record } ->
:ok
{ :error , error } ->
handle_errors ( { :error , error } )
end
rescue
e ->
handle_raised_error ( e , __STACKTRACE__ , ecto_changeset , resource )
end
end
@impl true
def sort ( query , sort , _resource ) do
{ :ok , Map . update! ( query , :__ash_bindings__ , & Map . put ( &1 , :sort , sort ) ) }
end
@impl true
def select ( query , select , resource ) do
query = default_bindings ( query , resource )
{ :ok ,
from ( row in query ,
select : struct ( row , ^ Enum . uniq ( select ) )
) }
end
@impl true
def distinct_sort ( query , sort , _ ) when sort in [ nil , [ ] ] do
{ :ok , query }
end
def distinct_sort ( query , sort , _ ) do
{ :ok , Map . update! ( query , :__ash_bindings__ , & Map . put ( &1 , :distinct_sort , sort ) ) }
end
# If the order by does not match the initial sort clause, then we use a subquery
# to limit to only distinct rows. This may not perform that well, so we may need
# to come up with alternatives here.
@impl true
def distinct ( query , empty , resource ) when empty in [ nil , [ ] ] do
query |> apply_sort ( query . __ash_bindings__ [ :sort ] , resource )
end
def distinct ( query , distinct_on , resource ) do
case get_distinct_statement ( query , distinct_on ) do
{ :ok , distinct_statement } ->
%{ query | distinct : distinct_statement }
|> apply_sort ( query . __ash_bindings__ [ :sort ] , resource )
{ :error , distinct_statement } ->
query
|> Ecto.Query . exclude ( :order_by )
|> default_bindings ( resource )
|> Map . put ( :distinct , distinct_statement )
|> apply_sort (
query . __ash_bindings__ [ :distinct_sort ] || query . __ash_bindings__ [ :sort ] ,
resource ,
true
)
|> case do
{ :ok , distinct_query } ->
on =
Enum . reduce ( Ash.Resource.Info . primary_key ( resource ) , nil , fn key , dynamic ->
if dynamic do
Ecto.Query . dynamic (
[ row , distinct ] ,
^ dynamic and field ( row , ^ key ) == field ( distinct , ^ key )
)
else
Ecto.Query . dynamic ( [ row , distinct ] , field ( row , ^ key ) == field ( distinct , ^ key ) )
end
end )
joined_query_source =
Enum . reduce (
[
:join ,
:order_by ,
:group_by ,
:having ,
:distinct ,
:select ,
:combinations ,
:with_ctes ,
:limit ,
:offset ,
:preload ,
:update ,
:where
] ,
query ,
& Ecto.Query . exclude ( &2 , &1 )
)
joined_query =
from ( row in joined_query_source ,
join : distinct in subquery ( distinct_query ) ,
on : ^ on
)
from ( [ row , distinct ] in joined_query ,
select : distinct
)
|> default_bindings ( resource )
|> apply_sort ( query . __ash_bindings__ [ :sort ] , resource )
|> case do
{ :ok , joined_query } ->
{ :ok ,
Map . update! (
joined_query ,
:__ash_bindings__ ,
& Map . put ( &1 , :__order__? , query . __ash_bindings__ [ :__order__? ] || false )
) }
{ :error , error } ->
{ :error , error }
end
{ :error , error } ->
{ :error , error }
end
end
end
defp apply_sort ( query , sort , resource , directly? \\ false )
defp apply_sort ( query , sort , _resource , _ ) when sort in [ nil , [ ] ] do
{ :ok , query |> set_sort_applied ( ) }
end
defp apply_sort ( query , sort , resource , directly? ) do
query
|> AshSqlite.Sort . sort ( sort , resource , [ ] , 0 , directly? )
|> case do
{ :ok , sort } when directly? ->
{ :ok , query |> Ecto.Query . order_by ( ^ sort ) |> set_sort_applied ( ) }
{ :ok , query } ->
{ :ok , query |> set_sort_applied ( ) }
{ :error , error } ->
{ :error , error }
end
end
defp set_sort_applied ( query ) do
Map . update! ( query , :__ash_bindings__ , & Map . put ( &1 , :sort_applied? , true ) )
end
defp get_distinct_statement ( query , distinct_on ) do
has_distinct_sort? = match? ( %{ __ash_bindings__ : %{ distinct_sort : _ } } , query )
if has_distinct_sort? do
{ :error , default_distinct_statement ( query , distinct_on ) }
else
sort = query . __ash_bindings__ [ :sort ] || [ ]
distinct =
query . distinct ||
% Ecto.Query.QueryExpr {
expr : [ ] ,
params : [ ]
}
if sort == [ ] do
{ :ok , default_distinct_statement ( query , distinct_on ) }
else
distinct_on
|> Enum . reduce_while ( { sort , [ ] , [ ] , Enum . count ( distinct . params ) } , fn
_ , { [ ] , _distinct_statement , _ , _count } ->
{ :halt , :error }
distinct_on , { [ order_by | rest_order_by ] , distinct_statement , params , count } ->
case order_by do
{ ^ distinct_on , order } ->
{ distinct_expr , params , count } =
distinct_on_expr ( query , distinct_on , params , count )
{ :cont ,
{ rest_order_by , [ { order , distinct_expr } | distinct_statement ] , params , count } }
_ ->
{ :halt , :error }
end
end )
|> case do
:error ->
{ :error , default_distinct_statement ( query , distinct_on ) }
{ _ , result , params , _ } ->
{ :ok ,
%{
distinct
| expr : distinct . expr ++ Enum . reverse ( result ) ,
params : distinct . params ++ Enum . reverse ( params )
} }
end
end
end
end
defp default_distinct_statement ( query , distinct_on ) do
distinct =
query . distinct ||
% Ecto.Query.QueryExpr {
expr : [ ]
}
{ expr , params , _ } =
Enum . reduce ( distinct_on , { [ ] , [ ] , Enum . count ( distinct . params ) } , fn
{ distinct_on_field , order } , { expr , params , count } ->
{ distinct_expr , params , count } =
distinct_on_expr ( query , distinct_on_field , params , count )
{ [ { order , distinct_expr } | expr ] , params , count }
distinct_on_field , { expr , params , count } ->
{ distinct_expr , params , count } =
distinct_on_expr ( query , distinct_on_field , params , count )
{ [ { :asc , distinct_expr } | expr ] , params , count }
end )
%{
distinct
| expr : distinct . expr ++ Enum . reverse ( expr ) ,
params : distinct . params ++ Enum . reverse ( params )
}
end
defp distinct_on_expr ( query , field , params , count ) do
resource = query . __ash_bindings__ . resource
ref =
case field do
% Ash.Query.Calculation { } = calc ->
% Ref { attribute : calc , relationship_path : [ ] , resource : resource }
field ->
% Ref {
attribute : Ash.Resource.Info . field ( resource , field ) ,
relationship_path : [ ] ,
resource : resource
}
end
dynamic = AshSqlite.Expr . dynamic_expr ( query , ref , query . __ash_bindings__ )
result =
Ecto.Query.Builder.Dynamic . partially_expand (
:distinct ,
query ,
dynamic ,
params ,
count
)
expr = elem ( result , 0 )
new_params = elem ( result , 1 )
new_count = result |> Tuple . to_list ( ) |> List . last ( )
{ expr , new_params , new_count }
end
@impl true
def filter ( query , filter , resource , opts \\ [ ] ) do
query = default_bindings ( query , resource )
query
|> AshSqlite.Join . join_all_relationships ( filter , opts )
|> case do
{ :ok , query } ->
{ :ok , add_filter_expression ( query , filter ) }
{ :error , error } ->
{ :error , error }
end
end
@doc false
def default_bindings ( query , resource , context \\ %{ } ) do
start_bindings = context [ :data_layer ] [ :start_bindings_at ] || 0
Map . put_new ( query , :__ash_bindings__ , %{
resource : resource ,
current : Enum . count ( query . joins ) + 1 + start_bindings ,
in_group? : false ,
calculations : %{ } ,
parent_resources : [ ] ,
context : context ,
bindings : %{ start_bindings = > %{ path : [ ] , type : :root , source : resource } }
} )
end
@impl true
def add_calculations ( query , calculations , resource ) do
AshSqlite.Calculation . add_calculations ( query , calculations , resource , 0 )
end
@doc false
def get_binding ( resource , path , query , type , name_match \\ nil )
def get_binding ( resource , path , %{ __ash_bindings__ : _ } = query , type , name_match ) do
types = List . wrap ( type )
Enum . find_value ( query . __ash_bindings__ . bindings , fn
{ binding , %{ path : candidate_path , type : binding_type } = data } ->
if binding_type in types do
if name_match do
if data [ :name ] == name_match do
if Ash.SatSolver . synonymous_relationship_paths? ( resource , candidate_path , path ) do
binding
end
end
else
if Ash.SatSolver . synonymous_relationship_paths? ( resource , candidate_path , path ) do
binding
else
false
end
end
end
_ ->
nil
end )
end
def get_binding ( _ , _ , _ , _ , _ ) , do : nil
defp add_filter_expression ( query , filter ) do
filter
|> split_and_statements ( )
|> Enum . reduce ( query , fn filter , query ->
dynamic = AshSqlite.Expr . dynamic_expr ( query , filter , query . __ash_bindings__ )
Ecto.Query . where ( query , ^ dynamic )
end )
end
defp split_and_statements ( % Filter { expression : expression } ) do
split_and_statements ( expression )
end
defp split_and_statements ( % BooleanExpression { op : :and , left : left , right : right } ) do
split_and_statements ( left ) ++ split_and_statements ( right )
end
defp split_and_statements ( % Not { expression : % Not { expression : expression } } ) do
split_and_statements ( expression )
end
defp split_and_statements ( % Not {
expression : % BooleanExpression { op : :or , left : left , right : right }
} ) do
split_and_statements ( % BooleanExpression {
op : :and ,
left : % Not { expression : left } ,
right : % Not { expression : right }
} )
end
defp split_and_statements ( other ) , do : [ other ]
@doc false
def add_binding ( query , data , additional_bindings \\ 0 ) do
current = query . __ash_bindings__ . current
bindings = query . __ash_bindings__ . bindings
new_ash_bindings = %{
query . __ash_bindings__
| bindings : Map . put ( bindings , current , data ) ,
current : current + 1 + additional_bindings
}
%{ query | __ash_bindings__ : new_ash_bindings }
end
def add_known_binding ( query , data , known_binding ) do
bindings = query . __ash_bindings__ . bindings
new_ash_bindings = %{
query . __ash_bindings__
| bindings : Map . put ( bindings , known_binding , data )
}
%{ query | __ash_bindings__ : new_ash_bindings }
end
@impl true
def transaction ( resource , func , timeout \\ nil , reason \\ %{ type : :custom , metadata : %{ } } ) do
repo =
case reason [ :data_layer_context ] do
%{ repo : repo } when not is_nil ( repo ) ->
repo
_ ->
AshSqlite.DataLayer.Info . repo ( resource )
end
func = fn ->
repo . on_transaction_begin ( reason )
func . ( )
end
if timeout do
repo . transaction ( func , timeout : timeout )
else
repo . transaction ( func )
end
end
@impl true
def rollback ( resource , term ) do
AshSqlite.DataLayer.Info . repo ( resource ) . rollback ( term )
end
defp table ( resource , changeset ) do
changeset . context [ :data_layer ] [ :table ] || AshSqlite.DataLayer.Info . table ( resource )
end
defp raise_table_error! ( resource , operation ) do
if AshSqlite.DataLayer.Info . polymorphic? ( resource ) do
raise """
Could not determine table for #{operation} on #{inspect(resource)}.
Polymorphic resources require that the ` data_layer [ :table ] ` context is provided .
See the guide on polymorphic resources for more information .
"""
else
raise """
Could not determine table for #{operation} on #{inspect(resource)}.
"""
end
end
defp dynamic_repo ( resource , %{ __ash_bindings__ : %{ context : %{ data_layer : %{ repo : repo } } } } ) do
repo || AshSqlite.DataLayer.Info . repo ( resource )
end
defp dynamic_repo ( resource , %{ context : %{ data_layer : %{ repo : repo } } } ) do
repo || AshSqlite.DataLayer.Info . repo ( resource )
end
defp dynamic_repo ( resource , _ ) do
AshSqlite.DataLayer.Info . repo ( resource )
end
end