defmodule AshPostgres.DataLayer do @manage_tenant %Ash.Dsl.Section{ name: :manage_tenant, describe: """ Configuration for the behavior of a resource that manages a tenant """, examples: [ """ manage_tenant do template ["organization_", :id] create? true update? false end """ ], schema: [ template: [ type: {:custom, __MODULE__, :tenant_template, []}, required: true, doc: """ A template that will cause the resource to create/manage the specified schema. Use this if you have a resource that, when created, it should create a new tenant for you. For example, if you have a `customer` resource, and you want to create a schema for each customer based on their id, e.g `customer_10` set this option to `["customer_", :id]`. Then, when this is created, it will create a schema called `["customer_", :id]`, and run your tenant migrations on it. Then, if you were to change that customer's id to `20`, it would rename the schema to `customer_20`. Generally speaking you should avoid changing the tenant id. """ ], create?: [ type: :boolean, default: true, doc: "Whether or not to automatically create a tenant when a record is created" ], update?: [ type: :boolean, default: true, doc: "Whether or not to automatically update the tenant name if the record is udpated" ] ] } @postgres %Ash.Dsl.Section{ name: :postgres, describe: """ Postgres data layer configuration """, sections: [ @manage_tenant ], modules: [ :repo ], examples: [ """ postgres do repo MyApp.Repo table "organizations" end """ ], schema: [ repo: [ type: :atom, required: true, doc: "The repo that will be used to fetch your data. See the `AshPostgres.Repo` documentation for more" ], migrate?: [ type: :boolean, default: true, doc: "Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`" ], base_filter_sql: [ type: :string, doc: "A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter" ], skip_unique_indexes: [ type: {:custom, __MODULE__, :validate_skip_unique_indexes, []}, default: false, doc: "Skip generating unique indexes when generating migrations" ], unique_index_names: [ type: :any, default: [], doc: """ A list of unique index names that could raise errors, or an mfa to a function that takes a changeset and returns a list of names in the format `{[:affected, :keys], "name_of_constraint"}` """ ], table: [ type: :string, required: true, doc: "The table to store and read the resource from" ] ] } alias Ash.Filter alias Ash.Query.{BooleanExpression, Not, Ref} alias Ash.Query.Function.{Ago, Contains} alias Ash.Query.Operator.IsNil alias AshPostgres.Functions.{Fragment, TrigramSimilarity, Type} import AshPostgres, only: [table: 1, repo: 1] @behaviour Ash.DataLayer @sections [@postgres] @moduledoc """ A postgres data layer that levereges Ecto's postgres capabilities. # Table of Contents #{Ash.Dsl.Extension.doc_index(@sections)} #{Ash.Dsl.Extension.doc(@sections)} """ use Ash.Dsl.Extension, sections: @sections, transformers: [AshPostgres.Transformers.VerifyRepo] @doc false def tenant_template(value) do value = List.wrap(value) if Enum.all?(value, &(is_binary(&1) || is_atom(&1))) do {:ok, value} else {:error, "Expected all values for `manages_tenant` to be strings or atoms"} end end @doc false def validate_skip_unique_indexes(indexes) do indexes = List.wrap(indexes) if Enum.all?(indexes, &is_atom/1) do {:ok, indexes} else {:error, "All indexes to skip must be atoms"} end end import Ecto.Query, only: [from: 2, subquery: 1] @impl true def can?(_, :async_engine), do: true def can?(_, :transact), do: true def can?(_, :composite_primary_key), do: true def can?(_, :upsert), do: true def can?(resource, {:join, other_resource}) do data_layer = Ash.Resource.data_layer(resource) other_data_layer = Ash.Resource.data_layer(other_resource) data_layer == other_data_layer and repo(data_layer) == repo(other_data_layer) end def can?(resource, {:lateral_join, other_resource}) do data_layer = Ash.Resource.data_layer(resource) other_data_layer = Ash.Resource.data_layer(other_resource) data_layer == other_data_layer and repo(data_layer) == repo(other_data_layer) end def can?(_, :boolean_filter), do: true def can?(_, {:aggregate, :count}), do: true def can?(_, :aggregate_filter), do: true def can?(_, :aggregate_sort), do: true def can?(_, :create), do: true def can?(_, :read), do: true def can?(_, :update), do: true def can?(_, :destroy), do: true def can?(_, :filter), do: true def can?(_, :limit), do: true def can?(_, :offset), do: true def can?(_, :multitenancy), do: true def can?(_, {:filter_expr, _}), do: true def can?(_, :nested_expressions), do: true def can?(_, {:query_aggregate, :count}), do: true def can?(_, :sort), do: true def can?(_, {:sort, _}), do: true def can?(_, _), do: false @impl true def in_transaction?(resource) do repo(resource).in_transaction?() end @impl true def limit(query, nil, _), do: {:ok, query} def limit(query, limit, _resource) do {:ok, from(row in query, limit: ^limit)} end @impl true def source(resource) do table(resource) end @impl true def offset(query, nil, _), do: query def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do {:ok, query} end def offset(query, offset, _resource) do {:ok, from(row in query, offset: ^offset)} end @impl true def run_query(query, resource) do {:ok, repo(resource).all(query, repo_opts(query))} end defp repo_opts(%Ash.Changeset{tenant: tenant, resource: resource}) do repo_opts(%{tenant: tenant, resource: resource}) end defp repo_opts(%{tenant: tenant, resource: resource}) when not is_nil(tenant) do if Ash.Resource.multitenancy_strategy(resource) == :context do [prefix: tenant] else [] end end defp repo_opts(_), do: [] @impl true def functions(resource) do config = repo(resource).config() functions = [AshPostgres.Functions.Type, AshPostgres.Functions.Fragment] if "pg_trgm" in (config[:installed_extensions] || []) do functions ++ [ AshPostgres.Functions.TrigramSimilarity ] else functions end end @impl true def run_aggregate_query(query, aggregates, resource) do subquery = from(row in subquery(query), select: %{}) query = Enum.reduce( aggregates, subquery, &add_subquery_aggregate_select(&2, &1, resource) ) {:ok, repo(resource).one(query, repo_opts(query))} end @impl true def set_tenant(_resource, query, tenant) do {:ok, Ecto.Query.put_query_prefix(query, to_string(tenant))} end @impl true def run_aggregate_query_with_lateral_join( query, aggregates, root_data, source_resource, destination_resource, source_field, destination_field ) do lateral_join_query = lateral_join_query( query, root_data, source_resource, source_field, destination_field ) subquery = from(row in subquery(lateral_join_query), select: %{}) query = Enum.reduce( aggregates, subquery, &add_subquery_aggregate_select(&2, &1, destination_resource) ) {:ok, repo(source_resource).one(query, repo_opts(:query))} end @impl true def run_query_with_lateral_join( query, root_data, source_resource, _destination_resource, source_field, destination_field ) do query = lateral_join_query( query, root_data, source_resource, source_field, destination_field ) {:ok, repo(source_resource).all(query, repo_opts(query))} end defp lateral_join_query( query, root_data, source_resource, source_field, destination_field ) do source_values = Enum.map(root_data, &Map.get(&1, source_field)) subquery = subquery( from(destination in query, where: field(destination, ^destination_field) == field(parent_as(:source_record), ^source_field) ) ) source_resource |> Ash.Query.new() |> Ash.Query.data_layer_query() |> case do {:ok, data_layer_query} -> from(source in data_layer_query, as: :source_record, where: field(source, ^source_field) in ^source_values, inner_lateral_join: destination in ^subquery, on: field(source, ^source_field) == field(destination, ^destination_field), select: destination ) {:error, error} -> {:error, error} end end @impl true def resource_to_query(resource, _), do: Ecto.Queryable.to_query({table(resource), resource}) @impl true def create(resource, changeset) do changeset.data |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource))) |> ecto_changeset(changeset) |> repo(resource).insert(repo_opts(changeset)) |> handle_errors() |> case do {:ok, result} -> maybe_create_tenant!(resource, result) {:ok, result} {:error, error} -> {:error, error} end rescue e -> {:error, e} end defp maybe_create_tenant!(resource, result) do if AshPostgres.manage_tenant_create?(resource) do tenant_name = tenant_name(resource, result) AshPostgres.MultiTenancy.create_tenant!(tenant_name, repo(resource)) else :ok end end defp maybe_update_tenant(resource, changeset, result) do if AshPostgres.manage_tenant_update?(resource) do changing_tenant_name? = resource |> AshPostgres.manage_tenant_template() |> Enum.filter(&is_atom/1) |> Enum.any?(&Ash.Changeset.changing_attribute?(changeset, &1)) if changing_tenant_name? do old_tenant_name = tenant_name(resource, changeset.data) new_tenant_name = tenant_name(resource, result) AshPostgres.MultiTenancy.rename_tenant(repo(resource), old_tenant_name, new_tenant_name) end end :ok end defp tenant_name(resource, result) do resource |> AshPostgres.manage_tenant_template() |> Enum.map_join(fn item -> if is_binary(item) do item else result |> Map.get(item) |> to_string() end end) end defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do {:error, Enum.map(errors, &to_ash_error/1)} end defp handle_errors({:ok, val}), do: {:ok, val} defp to_ash_error({field, {message, vars}}) do Ash.Error.Changes.InvalidAttribute.exception(field: field, message: message, vars: vars) end defp ecto_changeset(record, changeset) do record |> Ecto.Changeset.change(changeset.attributes) |> add_unique_indexes(record.__struct__, changeset.tenant) end defp add_unique_indexes(changeset, resource, tenant) do changeset = resource |> Ash.Resource.identities() |> Enum.reduce(changeset, fn identity, changeset -> name = if tenant do "#{tenant}_#{table(resource)}_#{identity.name}_unique_index" else "#{table(resource)}_#{identity.name}_unique_index" end opts = if Map.get(identity, :message) do [name: name, message: identity.message] else [name: name] end Ecto.Changeset.unique_constraint(changeset, identity.keys, opts) end) names = resource |> AshPostgres.unique_index_names() |> case do {m, f, a} -> List.wrap(apply(m, f, [changeset | a])) value -> List.wrap(value) end names = [{Ash.Resource.primary_key(resource), table(resource) <> "_pkey"} | names] Enum.reduce(names, changeset, fn {keys, name}, changeset -> Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name) end) end @impl true def upsert(resource, changeset) do repo_opts = changeset |> repo_opts() |> Keyword.put(:on_conflict, {:replace, Map.keys(changeset.attributes)}) |> Keyword.put(:conflict_target, Ash.Resource.primary_key(resource)) if AshPostgres.manage_tenant_update?(resource) do {:error, "Cannot currently upsert a resource that owns a tenant"} else changeset.data |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource))) |> ecto_changeset(changeset) |> repo(resource).insert(repo_opts) |> handle_errors() end rescue e -> {:error, e} end @impl true def update(resource, changeset) do changeset.data |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource))) |> ecto_changeset(changeset) |> repo(resource).update(repo_opts(changeset)) |> handle_errors() |> case do {:ok, result} -> maybe_update_tenant(resource, changeset, result) {:ok, result} {:error, error} -> {:error, error} end rescue e -> {:error, e} end @impl true def destroy(resource, %{data: record} = changeset) do case repo(resource).delete(record, repo_opts(changeset)) do {:ok, _record} -> :ok {:error, error} -> handle_errors({:error, error}) end rescue e -> {:error, e} end @impl true def sort(query, sort, resource) do query = default_bindings(query, resource) sort |> sanitize_sort() |> Enum.reduce({:ok, query}, fn {order, sort}, {:ok, query} -> binding = case Map.fetch(query.__ash_bindings__.aggregates, sort) do {:ok, binding} -> binding :error -> 0 end new_query = Map.update!(query, :order_bys, fn order_bys -> order_bys = order_bys || [] sort_expr = %Ecto.Query.QueryExpr{ expr: [ {order, {{:., [], [{:&, [], [binding]}, sort]}, [], []}} ] } order_bys ++ [sort_expr] end) {:ok, new_query} end) end defp sanitize_sort(sort) do sort |> List.wrap() |> Enum.map(fn {sort, :asc_nils_last} -> {:asc_nulls_last, sort} {sort, :asc_nils_first} -> {:asc_nulls_first, sort} {sort, :desc_nils_last} -> {:desc_nulls_last, sort} {sort, :desc_nils_first} -> {:desc_nulls_first, sort} {sort, order} -> {order, sort} sort -> sort end) end @impl true def filter(query, %{expression: false}, _resource) do impossible_query = from(row in query, where: false) {:ok, Map.put(impossible_query, :__impossible__, true)} end def filter(query, filter, _resource) do relationship_paths = filter |> Filter.relationship_paths() |> Enum.map(fn path -> if can_inner_join?(path, filter) do {:inner, relationship_path_to_relationships(filter.resource, path)} else {:left, relationship_path_to_relationships(filter.resource, path)} end end) new_query = query |> join_all_relationships(relationship_paths) |> add_filter_expression(filter) {:ok, new_query} end defp default_bindings(query, resource) do Map.put_new(query, :__ash_bindings__, %{ current: Enum.count(query.joins) + 1, aggregates: %{}, bindings: %{0 => %{path: [], type: :root, source: resource}} }) end defp can_inner_join?(path, expr, seen_an_or? \\ false) defp can_inner_join?(path, %{expression: expr}, seen_an_or?), do: can_inner_join?(path, expr, seen_an_or?) defp can_inner_join?(_path, expr, _seen_an_or?) when expr in [nil, true, false], do: true defp can_inner_join?(path, %BooleanExpression{op: :and, left: left, right: right}, seen_an_or?) do can_inner_join?(path, left, seen_an_or?) || can_inner_join?(path, right, seen_an_or?) end defp can_inner_join?(path, %BooleanExpression{op: :or, left: left, right: right}, _) do can_inner_join?(path, left, true) && can_inner_join?(path, right, true) end defp can_inner_join?( path, %Not{expression: %BooleanExpression{op: :or, left: left, right: right}}, seen_an_or? ) do can_inner_join?( path, %BooleanExpression{ op: :and, left: %Not{expression: left}, right: %Not{expression: right} }, seen_an_or? ) end defp can_inner_join?(path, %Not{expression: expression}, seen_an_or?) do can_inner_join?(path, expression, seen_an_or?) end defp can_inner_join?( search_path, %{__operator__?: true, left: %Ref{relationship_path: relationship_path}}, seen_an_or? ) when search_path == relationship_path do not seen_an_or? end defp can_inner_join?( search_path, %{__function__?: true, arguments: arguments}, seen_an_or? ) do if Enum.any?(arguments, &match?(%Ref{relationship_path: ^search_path}, &1)) do not seen_an_or? else true end end defp can_inner_join?(_, _, _), do: true @impl true def add_aggregate(query, aggregate, _resource) do resource = aggregate.resource query = default_bindings(query, resource) {query, binding} = case get_binding(resource, aggregate.relationship_path, query, :aggregate) do nil -> relationship = Ash.Resource.relationship(resource, aggregate.relationship_path) subquery = aggregate_subquery(relationship, aggregate) new_query = join_all_relationships( query, [ {{:aggregate, aggregate.name, subquery}, relationship_path_to_relationships(resource, aggregate.relationship_path)} ] ) {new_query, get_binding(resource, aggregate.relationship_path, new_query, :aggregate)} binding -> {query, binding} end query_with_aggregate_binding = put_in( query.__ash_bindings__.aggregates, Map.put(query.__ash_bindings__.aggregates, aggregate.name, binding) ) new_query = query_with_aggregate_binding |> add_aggregate_to_subquery(resource, aggregate, binding) |> select_aggregate(resource, aggregate) {:ok, new_query} end defp select_aggregate(query, resource, aggregate) do binding = get_binding(resource, aggregate.relationship_path, query, :aggregate) query = if query.select do query else from(row in query, select: row, select_merge: %{aggregates: %{}} ) end %{query | select: add_to_select(query.select, binding, aggregate)} end defp add_to_select( %{expr: {:merge, _, [first, {:%{}, _, [{:aggregates, {:%{}, [], fields}}]}]}} = select, binding, %{load: nil} = aggregate ) do accessed = if aggregate.kind == :first do {:fragment, [], [ expr: {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []}, raw: "[1]" ]} else {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []} end field = {:type, [], [ accessed, Ash.Type.ecto_type(aggregate.type) ]} field_with_default = if is_nil(aggregate.default_value) do field else {:coalesce, [], [ field, aggregate.default_value ]} end new_fields = [ {aggregate.name, field_with_default} | fields ] %{select | expr: {:merge, [], [first, {:%{}, [], [{:aggregates, {:%{}, [], new_fields}}]}]}} end defp add_to_select( %{expr: expr} = select, binding, %{load: load_as} = aggregate ) do accessed = if aggregate.kind == :first do {:fragment, [], [ raw: "", expr: {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []}, raw: "[1]" ]} else {{:., [], [{:&, [], [binding]}, aggregate.name]}, [], []} end field = {:type, [], [ accessed, Ash.Type.ecto_type(aggregate.type) ]} field_with_default = if is_nil(aggregate.default_value) do field else {:coalesce, [], [ field, aggregate.default_value ]} end %{select | expr: {:merge, [], [expr, {:%{}, [], [{load_as, field_with_default}]}]}} end defp add_aggregate_to_subquery(query, resource, aggregate, binding) do new_joins = List.update_at(query.joins, binding - 1, fn join -> aggregate_query = if aggregate.authorization_filter do {:ok, filter} = filter( join.source.from.source.query, aggregate.authorization_filter, Ash.Resource.related(resource, aggregate.relationship_path) ) filter else join.source.from.source.query end new_aggregate_query = add_subquery_aggregate_select(aggregate_query, aggregate, resource) put_in(join.source.from.source.query, new_aggregate_query) end) %{ query | joins: new_joins } end defp aggregate_subquery(relationship, aggregate) do query = from(row in relationship.destination, group_by: ^relationship.destination_field, select: field(row, ^relationship.destination_field) ) if aggregate.query && aggregate.query.tenant do Ecto.Query.put_query_prefix(query, aggregate.query.tenant) else query end end defp order_to_postgres_order(dir) do case dir do :asc -> nil :asc_nils_last -> " ASC NULLS LAST" :asc_nils_first -> " ASC NULLS FIRST" :desc -> " DESC" :desc_nils_last -> " DESC NULLS LAST" :desc_nils_first -> " DESC NULLS FIRST" end end defp add_subquery_aggregate_select(query, %{kind: :first} = aggregate, _resource) do query = default_bindings(query, aggregate.resource) key = aggregate.field type = Ash.Type.ecto_type(aggregate.type) field = if aggregate.query && aggregate.query.sort && aggregate.query.sort != [] do sort_expr = aggregate.query.sort |> Enum.map(fn {sort, order} -> case order_to_postgres_order(order) do nil -> [expr: {{:., [], [{:&, [], [0]}, sort]}, [], []}] order -> [expr: {{:., [], [{:&, [], [0]}, sort]}, [], []}, raw: order] end end) |> Enum.intersperse(raw: ", ") |> List.flatten() {:fragment, [], [ raw: "array_agg(", expr: {{:., [], [{:&, [], [0]}, key]}, [], []}, raw: "ORDER BY " ] ++ sort_expr ++ [raw: ")"]} else {:fragment, [], [ raw: "array_agg(", expr: {{:., [], [{:&, [], [0]}, key]}, [], []}, raw: ")" ]} end {params, filtered} = if aggregate.query && aggregate.query.filter && not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do {params, expr} = filter_to_expr( aggregate.query.filter, query.__ash_bindings__.bindings, query.select.params ) {params, {:filter, [], [field, expr]}} else {[], field} end cast = {:type, [], [filtered, {:array, type}]} new_expr = {:merge, [], [query.select.expr, {:%{}, [], [{aggregate.name, cast}]}]} %{query | select: %{query.select | expr: new_expr, params: params}} end defp add_subquery_aggregate_select(query, %{kind: :count} = aggregate, resource) do query = default_bindings(query, aggregate.resource) key = aggregate.field || List.first(Ash.Resource.primary_key(resource)) type = Ash.Type.ecto_type(aggregate.type) field = {:count, [], [{{:., [], [{:&, [], [0]}, key]}, [], []}]} {params, filtered} = if aggregate.query && aggregate.query.filter && not match?(%Ash.Filter{expression: nil}, aggregate.query.filter) do {params, expr} = filter_to_expr( aggregate.query.filter, query.__ash_bindings__.bindings, query.select.params ) {params, {:filter, [], [field, expr]}} else {[], field} end cast = {:type, [], [filtered, type]} new_expr = {:merge, [], [query.select.expr, {:%{}, [], [{aggregate.name, cast}]}]} %{query | select: %{query.select | expr: new_expr, params: params}} end defp relationship_path_to_relationships(resource, path, acc \\ []) defp relationship_path_to_relationships(_resource, [], acc), do: Enum.reverse(acc) defp relationship_path_to_relationships(resource, [relationship | rest], acc) do relationship = Ash.Resource.relationship(resource, relationship) relationship_path_to_relationships(relationship.destination, rest, [relationship | acc]) end defp join_all_relationships(query, relationship_paths, path \\ [], source \\ nil) do query = default_bindings(query, source) Enum.reduce(relationship_paths, query, fn {_join_type, []}, query -> query {join_type, [relationship | rest_rels]}, query -> source = source || relationship.source current_path = path ++ [relationship] current_join_type = case join_type do {:aggregate, _name, _agg} when rest_rels != [] -> :left other -> other end if has_binding?(source, Enum.reverse(current_path), query, current_join_type) do query else joined_query = join_relationship( query, relationship, Enum.map(path, & &1.name), current_join_type, source ) joined_query_with_distinct = add_distinct(relationship, join_type, joined_query) join_all_relationships( joined_query_with_distinct, [{join_type, rest_rels}], current_path, source ) end end) end defp has_binding?(resource, path, query, {:aggregate, _, _}), do: has_binding?(resource, path, query, :aggregate) defp has_binding?(resource, candidate_path, %{__ash_bindings__: _} = query, type) do Enum.any?(query.__ash_bindings__.bindings, fn {_, %{path: path, source: source, type: ^type}} -> Ash.SatSolver.synonymous_relationship_paths?(resource, path, candidate_path, source) _ -> false end) end defp has_binding?(_, _, _, _), do: false defp get_binding(resource, path, %{__ash_bindings__: _} = query, type) do paths = Enum.flat_map(query.__ash_bindings__.bindings, fn {binding, %{path: path, type: ^type}} -> [{binding, path}] _ -> [] end) Enum.find_value(paths, fn {binding, candidate_path} -> Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) && binding end) end defp get_binding(_, _, _, _), do: nil defp add_distinct(relationship, join_type, joined_query) do if relationship.cardinality == :many and join_type == :left && !joined_query.distinct do from(row in joined_query, distinct: ^Ash.Resource.primary_key(relationship.destination) ) else joined_query end end defp join_relationship(query, relationship, path, join_type, source) do case Map.get(query.__ash_bindings__.bindings, path) do %{type: existing_join_type} when join_type != existing_join_type -> raise "unreachable?" nil -> do_join_relationship(query, relationship, path, join_type, source) _ -> query end end defp do_join_relationship(query, %{type: :many_to_many} = relationship, path, kind, source) do relationship_through = maybe_get_resource_query(relationship.through) relationship_destination = Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination)) current_binding = Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} -> if data.type == kind && data.path == Enum.reverse(path) do binding end end) new_query = case kind do {:aggregate, _, subquery} -> subquery = subquery( from(destination in subquery, where: field(destination, ^relationship.destination_field) == field( parent_as(:rel_through), ^relationship.destination_field_on_join_table ) ) ) from([{row, current_binding}] in query, left_join: through in ^relationship_through, as: :rel_through, on: field(row, ^relationship.source_field) == field(through, ^relationship.source_field_on_join_table), left_lateral_join: destination in ^subquery, on: field(destination, ^relationship.destination_field) == field(through, ^relationship.destination_field_on_join_table) ) :inner -> from([{row, current_binding}] in query, join: through in ^relationship_through, on: field(row, ^relationship.source_field) == field(through, ^relationship.source_field_on_join_table), join: destination in ^relationship_destination, on: field(destination, ^relationship.destination_field) == field(through, ^relationship.destination_field_on_join_table) ) _ -> from([{row, current_binding}] in query, left_join: through in ^relationship_through, on: field(row, ^relationship.source_field) == field(through, ^relationship.source_field_on_join_table), left_join: destination in ^relationship_destination, on: field(destination, ^relationship.destination_field) == field(through, ^relationship.destination_field_on_join_table) ) end join_path = Enum.reverse([String.to_existing_atom(to_string(relationship.name) <> "_join_assoc") | path]) full_path = Enum.reverse([relationship.name | path]) binding_data = case kind do {:aggregate, name, _agg} -> %{type: :aggregate, name: name, path: full_path, source: source} _ -> %{type: kind, path: full_path, source: source} end new_query |> add_binding(%{path: join_path, type: :left, source: source}) |> add_binding(binding_data) end defp do_join_relationship(query, relationship, path, kind, source) do relationship_destination = Ecto.Queryable.to_query(maybe_get_resource_query(relationship.destination)) current_binding = Enum.find_value(query.__ash_bindings__.bindings, 0, fn {binding, data} -> if data.type == kind && data.path == Enum.reverse(path) do binding end end) new_query = case kind do {:aggregate, _, subquery} -> subquery = from( sub in subquery( from(destination in subquery, where: field(destination, ^relationship.destination_field) == field(parent_as(:rel_source), ^relationship.source_field) ) ), select: field(sub, ^relationship.destination_field) ) from([{row, current_binding}] in query, as: :rel_source, left_lateral_join: destination in ^subquery, on: field(row, ^relationship.source_field) == field(destination, ^relationship.destination_field) ) :inner -> from([{row, current_binding}] in query, join: destination in ^relationship_destination, on: field(row, ^relationship.source_field) == field(destination, ^relationship.destination_field) ) _ -> from([{row, current_binding}] in query, left_join: destination in ^relationship_destination, on: field(row, ^relationship.source_field) == field(destination, ^relationship.destination_field) ) end full_path = Enum.reverse([relationship.name | path]) binding_data = case kind do {:aggregate, name, _agg} -> %{type: :aggregate, name: name, path: full_path, source: source} _ -> %{type: kind, path: full_path, source: source} end new_query |> add_binding(binding_data) end defp add_filter_expression(query, filter) do wheres = filter |> split_and_statements() |> Enum.map(fn filter -> {params, expr} = filter_to_expr(filter, query.__ash_bindings__.bindings, []) %Ecto.Query.BooleanExpr{ expr: expr, op: :and, params: params } end) %{query | wheres: query.wheres ++ wheres} end defp split_and_statements(%Filter{expression: expression}) do split_and_statements(expression) end defp split_and_statements(%BooleanExpression{op: :and, left: left, right: right}) do split_and_statements(left) ++ split_and_statements(right) end defp split_and_statements(%Not{expression: %Not{expression: expression}}) do split_and_statements(expression) end defp split_and_statements(%Not{ expression: %BooleanExpression{op: :or, left: left, right: right} }) do split_and_statements(%BooleanExpression{ op: :and, left: %Not{expression: left}, right: %Not{expression: right} }) end defp split_and_statements(other), do: [other] defp filter_to_expr(expr, bindings, params, embedded? \\ false, type \\ nil) defp filter_to_expr(%Filter{expression: expression}, bindings, params, embedded?, type) do filter_to_expr(expression, bindings, params, embedded?, type) end # A nil filter means "everything" defp filter_to_expr(nil, _, _, _, _), do: {[], true} # A true filter means "everything" defp filter_to_expr(true, _, _, _, _), do: {[], true} # A false filter means "nothing" defp filter_to_expr(false, _, _, _, _), do: {[], false} defp filter_to_expr(expression, bindings, params, embedded?, type) do do_filter_to_expr(expression, bindings, params, embedded?, type) end defp do_filter_to_expr(expr, bindings, params, embedded?, type \\ nil) defp do_filter_to_expr( %BooleanExpression{op: op, left: left, right: right}, bindings, params, embedded?, _type ) do {params, left_expr} = do_filter_to_expr(left, bindings, params, embedded?) {params, right_expr} = do_filter_to_expr(right, bindings, params, embedded?) {params, {op, [], [left_expr, right_expr]}} end defp do_filter_to_expr(%Not{expression: expression}, bindings, params, embedded?, _type) do {params, new_expression} = do_filter_to_expr(expression, bindings, params, embedded?) {params, {:not, [], [new_expression]}} end defp do_filter_to_expr( %TrigramSimilarity{arguments: [arg1, arg2], embedded?: pred_embedded?}, bindings, params, embedded?, _type ) do {params, arg1} = do_filter_to_expr(arg1, bindings, params, pred_embedded? || embedded?) {params, arg2} = do_filter_to_expr(arg2, bindings, params, pred_embedded? || embedded?) {params, {:fragment, [], [raw: "similarity(", expr: arg1, raw: ", ", expr: arg2, raw: ")"]}} end defp do_filter_to_expr( %Type{arguments: [arg1, arg2], embedded?: pred_embedded?}, bindings, params, embedded?, _type ) do {params, arg1} = do_filter_to_expr(arg1, bindings, params, pred_embedded? || embedded?) {params, arg2} = do_filter_to_expr(arg2, bindings, params, pred_embedded? || embedded?) {params, {:type, [], [arg1, arg2]}} end defp do_filter_to_expr( %Fragment{arguments: arguments, embedded?: pred_embedded?}, bindings, params, embedded?, _type ) do {params, fragment_data} = Enum.reduce(arguments, {params, []}, fn {:raw, str}, {params, fragment_data} -> {params, fragment_data ++ [{:raw, str}]} {:expr, expr}, {params, fragment_data} -> {params, expr} = do_filter_to_expr(expr, bindings, params, pred_embedded? || embedded?) {params, fragment_data ++ [{:expr, expr}]} end) {params, {:fragment, [], fragment_data}} end defp do_filter_to_expr( %IsNil{left: left, right: right, embedded?: pred_embedded?}, bindings, params, embedded?, _type ) do {params, left_expr} = do_filter_to_expr(left, bindings, params, pred_embedded? || embedded?) {params, right_expr} = do_filter_to_expr(right, bindings, params, pred_embedded? || embedded?) {params, {:==, [], [ {:is_nil, [], [left_expr]}, right_expr ]}} end defp do_filter_to_expr( %Ago{arguments: [left, right], embedded?: _pred_embedded?}, _bindings, params, _embedded?, _type ) when is_integer(left) and (is_binary(right) or is_atom(right)) do {params ++ [{DateTime.utc_now(), {:param, :any_datetime}}], {:datetime_add, [], [{:^, [], [Enum.count(params)]}, left * -1, to_string(right)]}} end # defp do_filter_to_expr( # %In{left: [left, right], embedded?: _pred_embedded?}, # _bindings, # params, # _embedded?, # _type # ) # when is_integer(left) and (is_binary(right) or is_atom(right)) do # {params ++ [{DateTime.utc_now(), {:param, :any_datetime}}], # {:datetime_add, [], [{:^, [], [Enum.count(params)]}, left * -1, to_string(right)]}} # end defp do_filter_to_expr( %Contains{arguments: [left, %Ash.CiString{} = right], embedded?: pred_embedded?}, bindings, params, embedded?, type ) do do_filter_to_expr( %Fragment{ embedded?: pred_embedded?, arguments: [ raw: "strpos(", expr: left, raw: "::citext, ", expr: right, raw: ") > 0" ] }, bindings, params, embedded?, type ) end defp do_filter_to_expr( %Contains{arguments: [left, right], embedded?: pred_embedded?}, bindings, params, embedded?, type ) do do_filter_to_expr( %Fragment{ embedded?: pred_embedded?, arguments: [ raw: "strpos(", expr: left, raw: ", ", expr: right, raw: ") > 0" ] }, bindings, params, embedded?, type ) end defp do_filter_to_expr( %mod{ __predicate__?: _, left: left, right: right, embedded?: pred_embedded?, operator: op }, bindings, params, embedded?, _type ) do {left_type, right_type} = case determine_type(mod, left) do nil -> case determine_type(mod, right, true) do nil -> {nil, nil} left_type -> {left_type, nil} end right_type -> if vague?(right_type) do case determine_type(mod, right, true) do nil -> {nil, right_type} left_type -> {left_type, nil} end else {nil, right_type} end end {params, left_expr} = do_filter_to_expr(left, bindings, params, pred_embedded? || embedded?, left_type) {params, right_expr} = do_filter_to_expr(right, bindings, params, pred_embedded? || embedded?, right_type) {params, {op, [], [ left_expr, right_expr ]}} end defp do_filter_to_expr( %Ref{attribute: %{name: name}} = ref, bindings, params, _embedded?, _type ) do {params, {{:., [], [{:&, [], [ref_binding(ref, bindings)]}, name]}, [], []}} end defp do_filter_to_expr({:embed, other}, _bindings, params, _true, _type) do {params, other} end defp do_filter_to_expr(%Ash.CiString{string: string}, bindings, params, embedded?, type) do do_filter_to_expr( %Fragment{ embedded?: embedded?, arguments: [ raw: "", expr: string, raw: "::citext" ] }, bindings, params, embedded?, type ) end defp do_filter_to_expr(%MapSet{} = mapset, bindings, params, embedded?, type) do do_filter_to_expr(Enum.to_list(mapset), bindings, params, embedded?, type) end defp do_filter_to_expr(other, _bindings, params, true, _type) do {params, other} end defp do_filter_to_expr(value, _bindings, params, false, type) do type = type || :any value = last_ditch_cast(value, type) {params ++ [{value, type}], {:^, [], [Enum.count(params)]}} end defp last_ditch_cast(value, :string) when is_atom(value) do to_string(value) end defp last_ditch_cast(value, _type) do value end defp determine_type(mod, ref, flip? \\ false) defp determine_type(mod, %Ref{attribute: %{type: type}}, flip?) do Enum.find_value(mod.types(), fn types -> types = case types do :same -> [type] :any -> [] other when is_list(other) -> other = if flip? do Enum.reverse(other) else other end Enum.map(other, fn {:array, :any} -> {:in, :any} {:array, :same} -> {:in, type} {:array, type} -> {:in, type} type -> type end) other -> [other] end types |> Enum.sort_by(&vague?/1) |> Enum.at(0) |> case do nil -> nil {:in, :any} -> {:in, :any} {:in, type} -> if Ash.Type.ash_type?(type) do {:in, Ash.Type.storage_type(type)} else {:in, type} end type -> if Ash.Type.ash_type?(type) do Ash.Type.storage_type(type) else type end end end) end defp determine_type(_mod, _, _), do: nil defp vague?({:in, :any}), do: true defp vague?(:any), do: true defp vague?(_), do: false defp ref_binding(ref, bindings) do case ref.attribute do %Ash.Resource.Attribute{} -> Enum.find_value(bindings, fn {binding, data} -> data.path == ref.relationship_path && data.type in [:inner, :left, :root] && binding end) %Ash.Query.Aggregate{} = aggregate -> Enum.find_value(bindings, fn {binding, data} -> data.path == aggregate.relationship_path && data.type == :aggregate && binding end) end end defp add_binding(query, data) do current = query.__ash_bindings__.current bindings = query.__ash_bindings__.bindings new_ash_bindings = %{ query.__ash_bindings__ | bindings: Map.put(bindings, current, data), current: current + 1 } %{query | __ash_bindings__: new_ash_bindings} end @impl true def transaction(resource, func) do repo(resource).transaction(func) end @impl true def rollback(resource, term) do repo(resource).rollback(term) end defp maybe_get_resource_query(resource) do case Ash.Query.data_layer_query(Ash.Query.new(resource), only_validate_filter?: false) do {:ok, query} -> query {:error, error} -> {:error, error} end end end