improvement: rewrite read actions to not use Ash.Engine (#836)

This commit is contained in:
Zach Daniel 2024-01-19 00:05:42 -05:00 committed by GitHub
parent 20ceee7a92
commit c48cb08026
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 3759 additions and 5088 deletions

View file

@ -5,15 +5,6 @@ See [Conventional Commits](Https://conventionalcommits.org) for commit guideline
<!-- changelog -->
## [v2.18.2](https://github.com/ash-project/ash/compare/v2.18.1...v2.18.2) (2024-01-18)
### Bug Fixes:
* don't ignore stream batch size
## [v2.18.1](https://github.com/ash-project/ash/compare/v2.18.0...v2.18.1) (2024-01-12)

View file

@ -10,7 +10,7 @@
```elixir
def deps do
[
{:ash, "~> 2.18.2"}
{:ash, "~> 2.18.1"}
]
end
```

View file

@ -1136,6 +1136,7 @@ end
| [`get?`](#actions-read-get?){: #actions-read-get? } | `boolean` | `false` | Expresses that this action innately only returns a single result. Used by extensions to validate and/or modify behavior. Causes code interfaces to return a single value instead of a list. See the [code interface guide](/documentation/topics/code-interface.md) for more. |
| [`modify_query`](#actions-read-modify_query){: #actions-read-modify_query } | `mfa \| (any, any -> any)` | | Allows direct manipulation of the data layer query via an MFA. The ash query and the data layer query will be provided as additional arguments. The result must be `{:ok, new_data_layer_query} \| {:error, error}`. |
| [`get_by`](#actions-read-get_by){: #actions-read-get_by } | `atom \| list(atom)` | | A helper to automatically generate a "get by X" action. Sets `get?` to true, add args for each of the specified fields, and adds a filter for each of the arguments. |
| [`timeout`](#actions-read-timeout){: #actions-read-timeout } | `pos_integer` | | The maximum amount of time, in milliseconds, that the action is allowed to run for. Ignored if the data layer doesn't support transactions *and* async is disabled. |
| [`primary?`](#actions-read-primary?){: #actions-read-primary? } | `boolean` | `false` | Whether or not this action should be used when no action is specified by the caller. |
| [`description`](#actions-read-description){: #actions-read-description } | `String.t` | | An optional description for the action |
| [`transaction?`](#actions-read-transaction?){: #actions-read-transaction? } | `boolean` | | Whether or not the action should be run in transactions. Reads default to false, while create/update/destroy actions default to `true`. |

View file

@ -219,10 +219,6 @@ you would see that no SQL queries are run. The calculation is run directly in El
Ash.Query.filter(exists(open_tickets, severity >= parent(severity_threshold)))
```
### COMING SOON
The following two examples do not work currently, but are being worked on
```elixir
has_many :relevant_tickets, Ticket do
filter expr(status == :open and severity >= parent(severity_threshold))

View file

@ -87,7 +87,7 @@ defp deps do
[
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
{:ash, "~> 2.18.2"} # <-- add this line
{:ash, "~> 2.18.1"} # <-- add this line
]
end
```

View file

@ -55,7 +55,8 @@ defmodule Ash.Actions.Aggregate do
query = Map.put(query, :aggregates, Map.new(aggregates, &{&1.name, &1}))
with {:ok, query} <- authorize_query(query, opts, agg_authorize?),
{:ok, data_layer_query} <- Ash.Query.data_layer_query(query),
{:ok, data_layer_query} <-
Ash.Query.data_layer_query(Ash.Query.unset(query, :aggregates)),
{:ok, result} <-
Ash.DataLayer.run_aggregate_query(data_layer_query, aggregates, query.resource) do
{:cont, {:ok, Map.merge(acc, result)}}

View file

@ -108,7 +108,7 @@ defmodule Ash.Actions.Helpers do
def add_context(query_or_changeset, opts) do
context = Process.get(:ash_context, %{}) || %{}
private_context = Map.new(Keyword.take(opts, [:actor, :authorize?]))
private_context = Map.new(Keyword.take(opts, [:actor, :authorize?, :tracer]))
context =
context
@ -437,6 +437,9 @@ defmodule Ash.Actions.Helpers do
defp runtime_calculations(query) do
query.calculations
|> Kernel.||(%{})
|> Enum.filter(fn {_name, calc} ->
calc.type
end)
|> Enum.reject(fn {_name, calc} ->
constraints = Map.get(calc, :constraints, [])

File diff suppressed because it is too large Load diff

View file

@ -913,7 +913,7 @@ defmodule Ash.Actions.ManagedRelationships do
defp sort_and_filter(query, relationship) do
# We cannot use relationship filters that reference the `parent`
# because the parent is not yet related.
if Ash.Actions.Load.do_has_parent_expr?(relationship.filter) do
if Ash.Actions.Read.Relationships.do_has_parent_expr?(relationship.filter) do
query
else
Ash.Query.do_filter(query, relationship.filter, parent_stack: relationship.source)

View file

@ -0,0 +1,104 @@
defmodule Ash.Actions.Read.AsyncLimiter do
@moduledoc """
A utility for limiting the number of concurrent async operations
Because this is an optimization, we opt to run something synchronously
if there is no async task available in the slot. The idea here is that
the *vast* majority of things we do async will be fast enough not to
warrant always waiting for an async slot to be free. We may add in some
smarter heuristics later (i.e choosing to wait for a task instead of
doing the work sync), but for now this is a good start.
"""
use Agent
def start_link(limit) do
Agent.start_link(fn -> {1, limit} end)
end
def async_or_inline(
%{resource: resource, context: %{private: %{async_limiter: async_limiter}}},
opts,
func
)
when not is_nil(async_limiter) do
if Ash.DataLayer.data_layer_can?(resource, :async_engine) do
claimed? =
Agent.get_and_update(async_limiter, fn
{limit, limit} ->
{false, {limit, limit}}
{count, limit} ->
{true, {count + 1, limit}}
end)
if claimed? do
try do
Ash.Engine.async(
fn ->
func.()
end,
opts
)
after
release(async_limiter)
end
else
func.()
end
else
func.()
end
end
def async_or_inline(_, _opts, func) do
func.()
end
def await_all(list) do
list
|> Enum.map(fn
%Task{} = task ->
Task.await(task, :infinity)
other ->
other
end)
end
def await_at_least_one([]), do: {[], []}
def await_at_least_one(list) do
list
|> Enum.map(fn
%Task{} = task ->
case Task.yield(task, 0) do
{:ok, term} ->
term
{:exit, term} ->
{:error, term}
nil ->
task
end
other ->
other
end)
|> Enum.split_with(&(!match?(%Task{}, &1)))
|> case do
{[], remaining} ->
await_at_least_one(remaining)
{complete, remaining} ->
{complete, remaining}
end
end
defp release(async_limiter) do
Agent.update(async_limiter, fn
{count, limit} ->
{count - 1, limit}
end)
end
end

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,897 @@
defmodule Ash.Actions.Read.Relationships do
@moduledoc false
require Ash.Query
def load([], _query, _lazy?) do
{:ok, []}
end
def load(record, query, lazy?) when not is_list(record) do
case load([record], query, lazy?) do
{:ok, [record]} -> {:ok, record}
{:error, error} -> {:error, error}
end
end
def load(records, %{load: load}, _lazy?) when load in [%{}, [], nil] do
{:ok, records}
end
def load(records, query, lazy?) do
query.load
|> with_related_queries(query, records, lazy?)
|> fetch_related_records(records)
|> attach_related_records(records)
end
defp attach_related_records(relationships_queries_and_related_records, records) do
Enum.reduce_while(relationships_queries_and_related_records, {:ok, records}, fn
{relationship, related_query, {:ok, related_records}}, {:ok, records} ->
{:cont,
{:ok, do_attach_related_records(records, relationship, related_records, related_query)}}
{relationship, _related_query, {:error, error}}, _ ->
{:halt, {:error, Ash.Error.set_path(error, relationship.name)}}
end)
end
defp fetch_related_records(relationships_and_queries, records) do
Enum.map(relationships_and_queries, fn
{relationship, {:lazy, query}} ->
{relationship, {:lazy, query}, lazy_related_records(records, relationship, query)}
{relationship, %{valid?: true} = related_query} ->
do_fetch_related_records(records, relationship, related_query)
{relationship, %{errors: errors} = related_query} ->
{relationship, related_query, {:error, errors}}
end)
|> Ash.Actions.Read.AsyncLimiter.await_all()
end
defp lazy_related_records(records, relationship, related_query) do
primary_key = Ash.Resource.Info.primary_key(relationship.source)
related_records_with_lazy_join_source =
Enum.flat_map(records, fn record ->
record_pkey = Map.take(record, primary_key)
record
|> Map.get(relationship.name)
|> case do
%Ash.NotLoaded{} ->
[]
%Ash.ForbiddenField{} ->
[]
record_or_records ->
record_or_records
end
|> List.wrap()
|> Enum.map(&Ash.Resource.set_metadata(&1, %{lazy_join_source: record_pkey}))
end)
related_query.api.load(related_records_with_lazy_join_source, related_query,
lazy?: true,
actor: related_query.context.private[:actor],
tenant: related_query.tenant,
authorize?: related_query.context.private[:authorize?]
)
end
defp with_related_queries(load, query, records, lazy?) do
Stream.map(load, fn {relationship_name, related_query} ->
if lazy? && Ash.Resource.loaded?(records, relationship_name, lists: :any) do
relationship = Ash.Resource.Info.relationship(query.resource, relationship_name)
related_query =
case related_query do
[] -> Ash.Query.new(relationship.destination)
query -> query
end
{relationship,
{:lazy,
related_query
|> Map.put(:api, query.api)
|> Ash.Query.set_context(%{
private: %{async_limiter: query.context[:private][:async_limiter]}
})}}
else
related_query(relationship_name, records, related_query, query)
end
end)
end
defp related_query(relationship_name, records, related_query, query) do
relationship = Ash.Resource.Info.relationship(query.resource, relationship_name)
read_action_name =
relationship.read_action ||
Ash.Resource.Info.primary_action!(relationship.destination, :read).name
related_query =
related_query
|> case do
%Ash.Query{} = related_query ->
related_query
[] ->
Ash.Query.new(relationship.destination)
end
|> Ash.Query.set_context(%{
private: %{async_limiter: query.context[:private][:async_limiter]}
})
|> Ash.Query.set_tenant(query.tenant)
|> Ash.Query.for_read(
read_action_name,
%{},
authorize?: query.context[:private][:authorize?],
actor: query.context[:private][:actor],
tracer: query.context[:private][:tracer]
)
|> Map.put(:api, relationship.api || query.api)
|> Ash.Query.sort(relationship.sort)
|> Ash.Query.do_filter(relationship.filter)
|> Ash.Query.set_context(relationship.context)
|> hydrate_refs(relationship.source)
|> with_lateral_join_query(query, relationship, records)
if !related_query.context[:data_layer][:lateral_join_source] &&
related_query.distinct not in [[], nil] do
raise ArgumentError, message: "Cannot yet use `distinct` when loading related records"
end
{relationship, related_query}
end
defp with_lateral_join_query(related_query, source_query, relationship, records) do
if lateral_join?(related_query, relationship, records) do
lateral_join_source_path =
if relationship.type == :many_to_many do
join_relationship =
Ash.Resource.Info.relationship(source_query.resource, relationship.join_relationship)
through_query =
relationship.through
|> Ash.Query.set_context(%{
accessing_from: %{source: relationship.source, name: relationship.join_relationship}
})
|> Ash.Query.select([
relationship.source_attribute_on_join_resource,
relationship.destination_attribute_on_join_resource
])
|> Map.put(:api, join_relationship.api || related_query.api)
|> hydrate_refs(relationship.source)
case through_query.api.can(through_query,
alter_source?: true,
run_queries?: false,
base_query: through_query
) do
{:ok, true} ->
{:ok,
[
{clear_lateral_join_source(source_query), relationship.source_attribute,
relationship.source_attribute_on_join_resource, relationship},
{clear_lateral_join_source(through_query),
relationship.destination_attribute_on_join_resource,
relationship.destination_attribute, join_relationship}
]}
{:ok, true, authorized_through_query} ->
{:ok,
[
{clear_lateral_join_source(source_query), relationship.source_attribute,
relationship.source_attribute_on_join_resource, relationship},
{clear_lateral_join_source(authorized_through_query),
relationship.destination_attribute_on_join_resource,
relationship.destination_attribute, join_relationship}
]}
{:error, error} ->
{:error, Ash.Error.set_path(error, join_relationship.name)}
end
else
{:ok,
[
{clear_lateral_join_source(source_query), relationship.source_attribute,
relationship.destination_attribute, relationship}
]}
end
case lateral_join_source_path do
{:ok, lateral_join_source_path} ->
Ash.Query.set_context(related_query, %{
data_layer: %{
lateral_join_source: {records, lateral_join_source_path}
}
})
{:error, error} ->
Ash.Query.add_error(related_query, error)
end
else
related_query
end
end
defp hydrate_refs(query, parent) do
case Ash.Filter.hydrate_refs(query.filter, %{
resource: query.resource,
parent_stack: [parent],
public?: false
}) do
{:ok, hydrated} ->
%{query | filter: hydrated}
{:error, error} ->
Ash.Query.add_error(query, error)
end
end
defp clear_lateral_join_source(source_query) do
Ash.Query.unset(source_query, [:load, :select, :sort])
end
defp do_fetch_related_records(
records,
%{manual: {module, opts}} = relationship,
related_query
) do
Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query,
Ash.context_to_opts(related_query.context),
fn ->
result =
module.load(records, opts, %{
relationship: relationship,
query:
related_query
|> Ash.Query.sort(relationship.sort)
|> Ash.Query.do_filter(relationship.filter)
|> Map.put(:load, [])
|> Ash.Query.set_context(%{
accessing_from: %{source: relationship.source, name: relationship.name}
}),
actor: related_query.context[:private][:actor],
authorize?: related_query.context[:private][:authorize?],
api: related_query.api,
tenant: related_query.tenant
})
|> case do
{:ok, records} ->
records
|> Enum.flat_map(fn {key, value} ->
value
|> List.wrap()
|> Enum.map(&Ash.Resource.put_metadata(&1, :manual_key, key))
end)
|> related_query.api.load(related_query,
actor: related_query.context[:private][:actor],
authorize?: related_query.context[:private][:authorize?],
tenant: related_query.tenant
)
|> case do
{:ok, results} ->
{:ok, regroup_manual_results(results, relationship)}
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
{relationship, related_query, result}
end
)
end
defp do_fetch_related_records(
_records,
%{no_attributes?: true} = relationship,
related_query
) do
Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query,
Ash.context_to_opts(related_query.context),
fn ->
result =
related_query
|> select_destination_attribute(relationship)
|> Ash.Query.set_context(%{
accessing_from: %{source: relationship.source, name: relationship.name}
})
|> Ash.Actions.Read.unpaginated_read()
{relationship, related_query, result}
end
)
end
defp do_fetch_related_records(
_records,
relationship,
%{context: %{data_layer: %{lateral_join_source: {_, _}}}} = related_query
) do
Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query,
Ash.context_to_opts(related_query.context),
fn ->
result =
related_query
|> select_destination_attribute(relationship)
|> Ash.Query.set_context(%{
accessing_from: %{source: relationship.source, name: relationship.name}
})
|> Ash.Actions.Read.unpaginated_read()
{relationship, related_query, result}
end
)
end
defp do_fetch_related_records(records, %{type: :many_to_many} = relationship, related_query) do
record_ids =
Enum.map(records, fn record ->
Map.get(record, relationship.source_attribute)
end)
join_relationship =
Ash.Resource.Info.relationship(relationship.source, relationship.join_relationship)
join_query =
relationship.through
|> Ash.Query.filter(ref(^relationship.source_attribute_on_join_resource) in ^record_ids)
|> Ash.Query.set_context(%{
accessing_from: %{source: relationship.source, name: relationship.join_relationship}
})
|> Ash.Query.select([
relationship.source_attribute_on_join_resource,
relationship.destination_attribute_on_join_resource
])
|> Map.put(:api, join_relationship.api || related_query.api)
Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query,
Ash.context_to_opts(related_query.context),
fn ->
case Ash.Actions.Read.unpaginated_read(join_query, nil,
authorize?: related_query.context[:private][:authorize?],
actor: related_query.context[:private][:actor],
tracer: related_query.context[:private][:tracer]
) do
{:ok, join_records} ->
{join_id_mapping, destination_ids} =
Enum.reduce(join_records, {%{}, MapSet.new()}, fn join_record,
{mapping, destination_ids} ->
destination_value =
Map.get(join_record, relationship.destination_attribute_on_join_resource)
source_value =
Map.get(join_record, relationship.source_attribute_on_join_resource)
new_destination_ids = MapSet.put(destination_ids, destination_value)
new_mapping =
Map.update(
mapping,
destination_value,
[
source_value
],
&[source_value | &1]
)
{new_mapping, new_destination_ids}
end)
related_query
|> select_destination_attribute(relationship)
|> Ash.Query.sort(relationship.sort)
|> Ash.Query.do_filter(relationship.filter)
|> Ash.Query.filter(ref(^relationship.destination_attribute) in ^destination_ids)
|> Ash.Actions.Read.unpaginated_read()
|> case do
{:ok, records} ->
{relationship, related_query,
{:ok,
Enum.flat_map(records, fn record ->
Enum.map(
join_id_mapping[Map.get(record, relationship.destination_attribute)] || [],
fn lateral_join_source ->
Map.put(
record,
:__lateral_join_source__,
lateral_join_source
)
end
)
end)}}
{:error, error} ->
{relationship, related_query, {:error, error}}
end
{:error, error} ->
{relationship, related_query, {:error, error}}
end
end
)
end
defp do_fetch_related_records(records, relationship, related_query) do
destination_attributes = Enum.map(records, &Map.get(&1, relationship.source_attribute))
Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query,
Ash.context_to_opts(related_query.context),
fn ->
result =
related_query
|> select_destination_attribute(relationship)
|> Ash.Query.filter(ref(^relationship.destination_attribute) in ^destination_attributes)
|> Ash.Query.unset([:limit, :offset, :distinct, :distinct_sort])
|> Ash.Query.set_context(%{
accessing_from: %{source: relationship.source, name: relationship.name}
})
|> Ash.Actions.Read.unpaginated_read()
{relationship, related_query, result}
end
)
end
defp regroup_manual_results(records, %{cardinality: :many}) do
Enum.group_by(records, & &1.__metadata__.manual_key, &delete_manual_key/1)
end
defp regroup_manual_results(records, %{cardinality: :one}) do
Map.new(records, &{&1.__metadata__.manual_key, delete_manual_key(&1)})
end
defp delete_manual_key(record) do
Map.update!(record, :__metadata__, &Map.delete(&1, :manual_key))
end
defp select_destination_attribute(related_query, relationship) do
if Map.get(relationship, :manual) &&
!Ash.Resource.Info.attribute(
relationship.destination,
relationship.destination_attribute
) do
related_query
else
Ash.Query.ensure_selected(related_query, [relationship.destination_attribute])
end
end
defp do_attach_related_records(
[%resource{} | _] = records,
relationship,
related_records,
{:lazy, _related_query}
) do
pkey = Ash.Resource.Info.primary_key(resource)
Enum.map(records, fn record ->
record_pkey = Map.take(record, pkey)
related = Enum.filter(related_records, &(&1.__metadata__.lazy_join_source == record_pkey))
related =
case relationship.cardinality do
:many ->
related
:one ->
Enum.at(related, 0)
end
case Map.get(record, relationship.name) do
%Ash.ForbiddenField{} -> record
%Ash.NotLoaded{} -> record
_ -> Map.put(record, relationship.name, related)
end
end)
end
defp do_attach_related_records(
[%resource{} | _] = records,
relationship,
related_records,
%{context: %{data_layer: %{lateral_join_source: {_, _}}}}
) do
source_attribute =
Ash.Resource.Info.attribute(relationship.source, relationship.source_attribute)
pkey_simple_equality? = Ash.Resource.Info.primary_key_simple_equality?(relationship.source)
source_attribute_simple_equality? = Ash.Type.simple_equality?(source_attribute.type)
primary_key = Ash.Resource.Info.primary_key(resource)
if pkey_simple_equality? && source_attribute_simple_equality? do
values =
if relationship.cardinality == :many do
Enum.group_by(related_records, & &1.__lateral_join_source__)
else
Map.new(Enum.reverse(related_records), &{&1.__lateral_join_source__, &1})
end
default =
if relationship.cardinality == :many do
[]
else
nil
end
Enum.map(records, fn record ->
with :error <- Map.fetch(values, Map.take(record, primary_key)),
:error <- Map.fetch(values, Map.get(record, relationship.source_attribute)) do
Map.put(record, relationship.name, default)
else
{:ok, value} ->
Map.put(record, relationship.name, value)
end
end)
else
Enum.map(records, fn record ->
func =
if relationship.cardinality == :one do
:find
else
:filter
end
related =
apply(Enum, func, [
related_records,
fn related_record ->
if is_map(related_record.__lateral_join_source__) do
resource.primary_key_matches?(record, related_record.__lateral_join_source__)
else
Ash.Type.equal?(
source_attribute.type,
related_record.__lateral_join_source__,
Map.get(record, relationship.source_attribute)
)
end
end
])
Map.put(record, relationship.name, related)
end)
end
end
defp do_attach_related_records(
records,
%{type: :many_to_many} = relationship,
related_records,
related_query
) do
do_attach_related_records(
records,
relationship,
related_records,
Ash.Query.set_context(related_query, %{data_layer: %{lateral_join_source: {nil, nil}}})
)
end
defp do_attach_related_records(
[%resource{} | _] = records,
%{manual: {_module, _opts}} = relationship,
map,
_related_query
) do
default =
case relationship.cardinality do
:one ->
nil
:many ->
[]
end
if Ash.Resource.Info.primary_key_simple_equality?(resource) do
pkey = Ash.Resource.Info.primary_key(resource)
single_match? =
case pkey do
[_] -> true
_ -> false
end
Enum.map(records, fn record ->
value =
if single_match? do
case Map.fetch(map, Map.get(record, Enum.at(pkey, 0))) do
{:ok, value} -> {:ok, value}
:error -> Map.fetch(map, Map.take(record, pkey))
end
else
Map.fetch(map, Map.take(record, pkey))
end
case value do
{:ok, result} ->
Map.put(record, relationship.name, result)
:error ->
Map.put(record, relationship.name, default)
end
end)
else
pkey = Ash.Resource.Info.primary_key(resource)
Enum.map(records, fn record ->
pkey_values = Map.take(record, pkey)
value =
Enum.find_value(map, fn {key, value} ->
if resource.primary_key_matches?(key, pkey_values) do
{:ok, value}
end
end) || :error
case value do
{:ok, result} ->
Map.put(record, relationship.name, result)
:error ->
Map.put(record, relationship.name, default)
end
end)
end
end
defp do_attach_related_records(
records,
%{no_attributes?: true} = relationship,
related_records,
_related_query
) do
Enum.map(records, fn record ->
Map.put(record, relationship.name, related_records)
end)
end
defp do_attach_related_records(records, relationship, related_records, related_query) do
attribute = Ash.Resource.Info.attribute(relationship.source, relationship.source_attribute)
simple_equality? = Ash.Type.simple_equality?(attribute.type)
related =
if simple_equality? do
if relationship.cardinality == :one do
Map.new(
Enum.reverse(related_records),
&{Map.get(&1, relationship.destination_attribute), &1}
)
else
Enum.group_by(related_records, &Map.get(&1, relationship.destination_attribute))
end
else
related_records
end
default =
if relationship.cardinality == :one do
nil
else
[]
end
if simple_equality? do
Enum.map(records, fn record ->
value = Map.get(record, relationship.source_attribute)
Map.put(
record,
relationship.name,
apply_runtime_query_operations(
Map.get(related, value) || default,
related_query
)
)
end)
else
Enum.map(records, fn record ->
value = Map.get(record, relationship.source_attribute)
related
|> Enum.find_value(:error, fn result ->
destination_value = Map.get(result, relationship.destination_attribute)
if Ash.Type.equal?(attribute.type, value, destination_value) do
{:ok, result}
end
end)
|> then(fn result ->
result =
apply_runtime_query_operations(result, related_query)
put_result(record, result, relationship.name, default)
end)
end)
end
end
defp apply_runtime_query_operations({:ok, value}, related_query) do
{:ok, apply_runtime_query_operations(value, related_query)}
end
defp apply_runtime_query_operations(:error, _related_query), do: :error
defp apply_runtime_query_operations(empty, _related_query) when empty in [nil, []], do: empty
defp apply_runtime_query_operations(value, related_query) when not is_list(value) do
[value] |> apply_runtime_query_operations(related_query) |> Enum.at(0)
end
defp apply_runtime_query_operations(value, related_query) do
value
|> apply_runtime_offset(related_query)
# |> apply_runtime_distinct(related_query)
|> apply_runtime_limit(related_query)
end
defp apply_runtime_offset(value, %{offset: nil}), do: value
defp apply_runtime_offset(value, %{offset: offset}), do: Enum.drop(value, offset)
defp apply_runtime_limit(value, %{limit: nil}), do: value
defp apply_runtime_limit(value, %{limit: limit}), do: Enum.take(value, limit)
defp put_result(record, {:ok, match}, key, _default) do
Map.put(
record,
key,
match
)
end
defp put_result(record, :error, key, default) do
Map.put(record, key, default)
end
defp lateral_join?(%{action: action} = query, relationship, source_data) do
if action.manual do
raise_if_parent_expr!(relationship, "manual actions")
false
else
{offset, limit} = offset_and_limit(query)
resources =
[relationship.source, Map.get(relationship, :through), relationship.destination]
|> Enum.reject(&is_nil/1)
has_distinct? = query.distinct not in [[], nil]
cond do
is_many_to_many_not_unique_on_join?(relationship) ->
raise_if_parent_expr!(
relationship,
"many to many relationships that don't have unique constraints on their join resource attributes"
)
false
limit == 1 && is_nil(relationship.context) && is_nil(relationship.filter) &&
is_nil(relationship.sort) && relationship.type != :many_to_many ->
has_parent_expr?(relationship)
limit == 1 && (source_data == :unknown || Enum.count_until(source_data, 2) == 1) &&
relationship.type != :many_to_many ->
has_parent_expr?(relationship)
has_parent_expr?(relationship) ->
true
!Ash.DataLayer.data_layer_can?(
relationship.source,
{:lateral_join, resources}
) ->
false
relationship.type == :many_to_many &&
Ash.DataLayer.prefer_lateral_join_for_many_to_many?(
Ash.DataLayer.data_layer(relationship.source)
) ->
true
limit || offset || has_distinct? ->
true
true ->
false
end
end
end
defp raise_if_parent_expr!(relationship, reason) do
if has_parent_expr?(relationship) do
raise ArgumentError, "Found `parent_expr` in unsupported context: #{reason}"
end
end
@doc false
def has_parent_expr?(%{destination: destination, filter: filter, sort: sort, context: context}) do
{:ok, sort} = Ash.Actions.Sort.process(destination, sort, %{}, context)
do_has_parent_expr?(filter) || has_parent_expr_in_sort?(sort)
end
defp has_parent_expr_in_sort?(sort) do
sort
|> List.wrap()
|> Enum.any?(fn
atom when is_atom(atom) ->
false
{atom, _} when is_atom(atom) ->
false
%Ash.Query.Calculation{} = calculation ->
expression = calculation.module.expression(calculation.opts, calculation.context)
do_has_parent_expr?(expression)
{%Ash.Query.Calculation{} = calculation, _} ->
expression = calculation.module.expression(calculation.opts, calculation.context)
do_has_parent_expr?(expression)
end)
end
@doc false
def do_has_parent_expr?(filter, depth \\ 0) do
not is_nil(
Ash.Filter.find(filter, fn
%Ash.Query.Call{name: :parent, args: [expr]} ->
if depth == 0 do
true
else
do_has_parent_expr?(expr, depth - 1)
end
%Ash.Query.Exists{expr: expr} ->
do_has_parent_expr?(expr, depth + 1)
%Ash.Query.Parent{expr: expr} ->
if depth == 0 do
true
else
do_has_parent_expr?(expr, depth - 1)
end
_ ->
false
end)
)
end
defp offset_and_limit(query) do
if query.offset == 0 do
{nil, query.limit}
else
{query.offset, query.limit}
end
end
defp is_many_to_many_not_unique_on_join?(%{type: :many_to_many} = relationship) do
join_keys =
Enum.sort([
relationship.source_attribute_on_join_resource,
relationship.destination_attribute_on_join_resource
])
primary_key_is_join_keys? =
Enum.sort(Ash.Resource.Info.primary_key(relationship.through)) == join_keys
is_unique_on_join_keys? =
Enum.any?(Ash.Resource.Info.identities(relationship.through), fn identity ->
Enum.sort(identity.keys) == join_keys
end)
not (primary_key_is_join_keys? || is_unique_on_join_keys?)
end
defp is_many_to_many_not_unique_on_join?(_), do: false
end

View file

@ -60,7 +60,7 @@ defmodule Ash.Api do
require Ash.Query
@dialyzer {:nowarn_function, unwrap_or_raise!: 3}
@dialyzer {:nowarn_function, unwrap_or_raise!: 2}
@type t() :: module
@ -125,6 +125,11 @@ defmodule Ash.Api do
type: :any,
doc: "A load statement to add onto the query"
],
max_concurrency: [
type: :non_neg_integer,
doc:
"The maximum number of processes allowed to be started for parallel loading of relationships and calculations. Defaults to `System.schedulers_online() * 2`"
],
lock: [
type: :any,
doc: "A lock statement to add onto the query"
@ -157,6 +162,19 @@ defmodule Ash.Api do
@doc false
def read_opts_schema, do: @read_opts_schema
@read_one_opts_schema merge_schemas(
[
not_found_error?: [
type: :boolean,
default: false,
doc:
"Whether or not to return an `Ash.Error.Query.NotFound` if no record is found."
]
],
@read_opts_schema,
"Read Options"
)
@stream_opts [
batch_size: [
type: :integer,
@ -820,6 +838,10 @@ defmodule Ash.Api do
end
end
defp alter_source({:ok, true, query}, api, actor, _subject, opts) do
alter_source({:ok, true}, api, actor, query, opts)
end
defp alter_source({:ok, true}, api, actor, subject, opts) do
if opts[:alter_source?] do
subject.resource
@ -926,6 +948,15 @@ defmodule Ash.Api do
{authorizer, authorizer_state, context}
end)
base_query =
case subject do
%Ash.Query{} = query ->
opts[:base_query] || query
_ ->
opts[:base_query]
end
case authorizers do
[] ->
{:ok, true}
@ -933,7 +964,7 @@ defmodule Ash.Api do
authorizers ->
authorizers
|> Enum.reduce_while(
{false, opts[:base_query]},
{false, base_query},
fn {authorizer, authorizer_state, context}, {_authorized?, query} ->
case authorizer.strict_check(authorizer_state, context) do
{:error, %{class: :forbidden} = e} when is_exception(e) ->
@ -957,29 +988,59 @@ defmodule Ash.Api do
"""
{:filter, _authorizer, filter} ->
query = query || Ash.Query.new(subject.resource, api) |> Ash.Query.select([])
{:cont, {true, query |> Ash.Query.filter(^filter)}}
{:cont, {true, Ash.Query.filter(or_query(query, subject.resource, api), ^filter)}}
{:filter, filter} ->
query = query || Ash.Query.new(subject.resource, api) |> Ash.Query.select([])
{:cont, {true, Ash.Query.filter(query, ^filter)}}
{:cont, {true, Ash.Query.filter(or_query(query, subject.resource, api), ^filter)}}
{:continue, authorizer_state} ->
if opts[:maybe_is] == false do
{:halt,
{false, Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)}}
if opts[:alter_source?] do
query_with_hook =
Ash.Query.authorize_results(or_query(query, subject.resource, api), fn query,
results ->
context = Map.merge(context, %{data: results, query: query})
case authorizer.check(authorizer_state, context) do
:authorized -> {:ok, results}
{:error, error} -> {:error, error}
{:data, data} -> {:ok, data}
end
end)
{:cont, {true, query_with_hook}}
else
{:halt, {:maybe, nil}}
if opts[:maybe_is] == false do
{:halt,
{false, Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)}}
else
{:halt, {:maybe, nil}}
end
end
{:filter_and_continue, _, authorizer_state} ->
if opts[:maybe_is] == false do
{:halt,
{false, Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)}}
{:filter_and_continue, filter, authorizer_state} ->
if opts[:alter_source?] do
query_with_hook =
query
|> or_query(subject.resource, api)
|> Ash.Query.filter(^filter)
|> Ash.Query.authorize_results(fn query, results ->
context = Map.merge(context, %{data: results, query: query})
case authorizer.check(authorizer_state, context) do
:authorized -> {:ok, results}
{:error, error} -> {:error, error}
{:data, data} -> {:ok, data}
end
end)
{:cont, {true, query_with_hook}}
else
{:halt, {:maybe, nil}}
if opts[:maybe_is] == false do
{:halt,
{false, Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)}}
else
{:halt, {:maybe, nil}}
end
end
end
end
@ -990,89 +1051,7 @@ defmodule Ash.Api do
{true, query} when not is_nil(query) ->
if opts[:run_queries?] do
case subject do
%Ash.Query{} ->
if opts[:data] do
data = List.wrap(opts[:data])
pkey = Ash.Resource.Info.primary_key(query.resource)
pkey_values = Enum.map(data, &Map.take(&1, pkey))
if Enum.any?(pkey_values, fn pkey_value ->
pkey_value |> Map.values() |> Enum.any?(&is_nil/1)
end) do
{:ok, :maybe}
else
query
|> Ash.Query.do_filter(or: pkey_values)
|> Ash.Query.data_layer_query()
|> case do
{:ok, data_layer_query} ->
data_layer_query
|> Ash.DataLayer.run_query(query.resource)
|> Ash.Actions.Helpers.rollback_if_in_transaction(query.resource, query)
|> case do
{:ok, results} ->
if Enum.count(results) == Enum.count(data) do
{:ok, true}
else
if opts[:return_forbidden_error?] do
{:ok, false, authorizer_exception(authorizers)}
else
{:ok, false}
end
end
{:error, error} ->
{:error, error}
end
end
end
else
{:ok, true}
end
%Ash.Changeset{data: data, action_type: type, resource: resource, tenant: tenant}
when type in [:update, :destroy] ->
pkey = Ash.Resource.Info.primary_key(resource)
pkey_value = Map.take(data, pkey)
if pkey_value |> Map.values() |> Enum.any?(&is_nil/1) do
{:ok, :maybe}
else
query
|> Ash.Query.do_filter(pkey_value)
|> Ash.Query.set_tenant(tenant)
|> Ash.Query.data_layer_query()
|> case do
{:ok, data_layer_query} ->
data_layer_query
|> Ash.DataLayer.run_query(resource)
|> Ash.Actions.Helpers.rollback_if_in_transaction(query.resource, query)
|> case do
{:ok, [_]} ->
{:ok, true}
{:error, error} ->
{:error, error}
_ ->
if opts[:return_forbidden_error?] do
{:ok, false, authorizer_exception(authorizers)}
else
{:ok, false}
end
end
end
end
%Ash.Changeset{} ->
if opts[:return_forbidden_error?] do
{:ok, false, authorizer_exception(authorizers)}
else
{:ok, false}
end
end
run_queries(subject, opts, authorizers, query)
else
if opts[:alter_source?] do
{:ok, true, query}
@ -1105,6 +1084,119 @@ defmodule Ash.Api do
end
end
defp run_queries(subject, opts, authorizers, query) do
case subject do
%Ash.Query{} ->
if opts[:data] do
data = List.wrap(opts[:data])
pkey = Ash.Resource.Info.primary_key(query.resource)
pkey_values = Enum.map(data, &Map.take(&1, pkey))
if Enum.any?(pkey_values, fn pkey_value ->
pkey_value |> Map.values() |> Enum.any?(&is_nil/1)
end) do
{:ok, :maybe}
else
query
|> Ash.Query.do_filter(or: pkey_values)
|> Ash.Query.data_layer_query()
|> case do
{:ok, data_layer_query} ->
data_layer_query
|> Ash.DataLayer.run_query(query.resource)
|> Ash.Actions.Helpers.rollback_if_in_transaction(query.resource, query)
|> case do
{:ok, results} ->
case Ash.Actions.Read.run_authorize_results(query, results) do
{:ok, results} ->
if Enum.count(results) == Enum.count(data) do
{:ok, true}
else
if opts[:return_forbidden_error?] do
{:ok, false, authorizer_exception(authorizers)}
else
{:ok, false}
end
end
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
end
end
else
{:ok, true}
end
%Ash.Changeset{data: data, action_type: type, resource: resource, tenant: tenant}
when type in [:update, :destroy] ->
pkey = Ash.Resource.Info.primary_key(resource)
pkey_value = Map.take(data, pkey)
if pkey_value |> Map.values() |> Enum.any?(&is_nil/1) do
{:ok, :maybe}
else
query
|> Ash.Query.do_filter(pkey_value)
|> Ash.Query.set_tenant(tenant)
|> Ash.Query.data_layer_query()
|> case do
{:ok, data_layer_query} ->
data_layer_query
|> Ash.DataLayer.run_query(resource)
|> Ash.Actions.Helpers.rollback_if_in_transaction(query.resource, query)
|> case do
{:ok, results} ->
case Ash.Actions.Read.run_authorize_results(query, results) do
{:ok, []} ->
if opts[:return_forbidden_error?] do
{:ok, false, authorizer_exception(authorizers)}
else
{:ok, false}
end
{:ok, [_]} ->
{:ok, true}
{:error, error} ->
if opts[:return_forbidden_error?] do
{:ok, false, error}
else
{:ok, false}
end
end
{:error, error} ->
{:error, error}
_ ->
if opts[:return_forbidden_error?] do
{:ok, false, authorizer_exception(authorizers)}
else
{:ok, false}
end
end
end
end
%Ash.Changeset{} ->
if opts[:return_forbidden_error?] do
{:ok, false, authorizer_exception(authorizers)}
else
{:ok, false}
end
end
end
defp or_query(query, resource, api) do
query || Ash.Query.new(resource, api)
end
defp authorizer_exception([{authorizer, authorizer_state, _context}]) do
Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)
end
@ -1195,7 +1287,7 @@ defmodule Ash.Api do
def run_action!(api, input, opts \\ []) do
api
|> run_action(input, opts)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc """
@ -1494,6 +1586,10 @@ defmodule Ash.Api do
This is useful if you have a query that doesn't include a primary key
but you know that it will only ever return a single result.
## Options
#{Spark.OptionsHelpers.docs(@read_one_opts_schema)}
"""
@callback read_one(Ash.Query.t() | Ash.Resource.t(), opts :: Keyword.t()) ::
{:ok, Ash.Resource.record()}
@ -1871,7 +1967,7 @@ defmodule Ash.Api do
api
|> get(resource, id, opts)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc false
@ -1956,11 +2052,9 @@ defmodule Ash.Api do
@doc false
def page!(api, keyset, request) do
{_, opts} = keyset.rerun
api
|> page(keyset, request)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc false
@ -2097,7 +2191,7 @@ defmodule Ash.Api do
api
|> load(data, query, opts)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc false
@ -2235,7 +2329,7 @@ defmodule Ash.Api do
api
|> read(query, opts)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc false
@ -2271,7 +2365,7 @@ defmodule Ash.Api do
def read_one!(api, query, opts) do
api
|> read_one(query, opts)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc false
@ -2279,7 +2373,7 @@ defmodule Ash.Api do
query = Ash.Query.to_query(query)
query = Ash.Query.set_api(query, api)
with {:ok, opts} <- Spark.OptionsHelpers.validate(opts, @read_opts_schema),
with {:ok, opts} <- Spark.OptionsHelpers.validate(opts, @read_one_opts_schema),
{:ok, action} <- get_action(query.resource, opts, :read, query.action),
{:ok, action} <- pagination_check(action, query.resource, opts) do
query
@ -2343,7 +2437,7 @@ defmodule Ash.Api do
def create!(api, changeset, opts) do
api
|> create(changeset, opts)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc false
@ -2578,7 +2672,7 @@ defmodule Ash.Api do
api
|> update(changeset, opts)
|> unwrap_or_raise!(opts[:stacktraces?])
|> unwrap_or_raise!()
end
@doc false
@ -2606,10 +2700,7 @@ defmodule Ash.Api do
api
|> destroy(changeset, opts)
|> unwrap_or_raise!(
opts[:stacktraces?],
!(opts[:return_notifications?] || opts[:return_destroyed?])
)
|> unwrap_or_raise!(!(opts[:return_notifications?] || opts[:return_destroyed?]))
end
@doc false
@ -2682,34 +2773,22 @@ defmodule Ash.Api do
end
end
defp unwrap_or_raise!(first, second, destroy? \\ false)
defp unwrap_or_raise!(:ok, _, _), do: :ok
defp unwrap_or_raise!({:ok, result}, _, false), do: result
defp unwrap_or_raise!({:ok, _result}, _, true), do: :ok
defp unwrap_or_raise!({:ok, result, other}, _, _), do: {result, other}
defp unwrap_or_raise!(first, destroy? \\ false)
defp unwrap_or_raise!(:ok, _), do: :ok
defp unwrap_or_raise!({:ok, result}, false), do: result
defp unwrap_or_raise!({:ok, _result}, true), do: :ok
defp unwrap_or_raise!({:ok, result, other}, _), do: {result, other}
defp unwrap_or_raise!({:error, error}, stacktraces?, destroy?) when is_list(error) do
unwrap_or_raise!({:error, Ash.Error.to_error_class(error)}, stacktraces?, destroy?)
defp unwrap_or_raise!({:error, error}, destroy?) when is_list(error) do
unwrap_or_raise!({:error, Ash.Error.to_error_class(error)}, destroy?)
end
defp unwrap_or_raise!({:error, error}, stacktraces?, _) do
defp unwrap_or_raise!({:error, error}, _) do
exception = Ash.Error.to_error_class(error)
exception =
if stacktraces? do
exception
else
Ash.Error.clear_stacktraces(exception)
end
case exception do
%{stacktraces?: _} ->
if stacktraces? do
reraise %{exception | stacktraces?: stacktraces?},
Map.get(exception.stacktrace || %{}, :stacktrace)
else
raise %{exception | stacktraces?: stacktraces?}
end
%{stacktrace: %{stacktrace: stacktrace}} = exception ->
reraise exception, stacktrace
_ ->
raise exception

View file

@ -1,6 +1,7 @@
defmodule Ash.DataLayer.Ets do
@behaviour Ash.DataLayer
require Ash.Query
require Ash.Expr
@ets %Spark.Dsl.Section{
name: :ets,
@ -398,7 +399,7 @@ defmodule Ash.DataLayer.Ets do
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.Query.unset(:load)
|> Ash.Query.unset(:select)
|> query.api.read(authorize?: false)
|> Ash.Actions.Read.unpaginated_read(nil, authorize?: false)
|> case do
{:error, error} ->
{:error, error}
@ -410,9 +411,20 @@ defmodule Ash.DataLayer.Ets do
if Map.get(relationship, :no_attributes?) do
query.filter
else
Ash.Filter.add_to_filter(query.filter, [
{destination_attribute, Map.get(parent, source_attribute)}
])
filter =
if is_nil(query.filter) do
%Ash.Filter{resource: query.resource, expression: true}
else
query.filter
end
Ash.Filter.add_to_filter!(
filter,
Ash.Filter.parse!(
query.resource,
Ash.Expr.expr(ref(^destination_attribute) == ^Map.get(parent, source_attribute))
)
)
end
query = %{query | filter: new_filter}

View file

@ -413,7 +413,36 @@ defmodule Ash.EmbeddableType do
attribute_loads = __MODULE__ |> Ash.Resource.Info.attributes() |> Enum.map(& &1.name)
api.load(record, attribute_loads ++ List.wrap(load), opts)
load =
case load do
%Ash.Query{} -> Ash.Query.ensure_selected(load, attribute_loads)
load_statement -> attribute_loads ++ List.wrap(load_statement)
end
api.load(record, load, opts)
end
def merge_load(left, right, constraints, context) do
right = Ash.Query.load(__MODULE__, right)
__MODULE__
|> Ash.Query.new()
|> Ash.Query.load(left)
|> case do
%{valid?: true} = left ->
{:ok, Ash.Query.merge_query_load(left, right, context)}
query ->
{:error, Ash.Error.to_ash_error(query.errors)}
end
end
def get_rewrites(merged_load, calculation, path, _) do
Ash.Actions.Read.Calculations.get_all_rewrites(merged_load, calculation, path)
end
def rewrite(value, rewrites, _constraints) do
Ash.Actions.Read.Calculations.rewrite(rewrites, value)
end
def array_constraints, do: Ash.EmbeddableType.embedded_resource_array_constraints()

View file

@ -307,7 +307,7 @@ defmodule Ash.Engine do
end
end
defp stacktrace(%{stacktraces?: true, stacktrace: %{stacktrace: stacktrace}})
defp stacktrace(%{stacktrace: %{stacktrace: stacktrace}})
when not is_nil(stacktrace) do
stacktrace
end

View file

@ -141,7 +141,6 @@ defmodule Ash.Error do
}
],
stacktrace: #Stacktrace<>,
stacktraces?: true,
vars: []
}
@ -183,7 +182,6 @@ defmodule Ash.Error do
path: [],
query: nil,
stacktrace: #Stacktrace<>,
stacktraces?: true,
vars: []
}
@ -636,7 +634,7 @@ defmodule Ash.Error do
end
end
def error_messages(errors, custom_message, stacktraces?) do
def error_messages(errors, custom_message, stacktraces? \\ true) do
errors = Enum.map(errors, &to_ash_error/1)
generic_message =

View file

@ -46,14 +46,19 @@ defmodule Ash.Error.Exception do
def exception(opts) do
opts =
Keyword.put_new_lazy(opts, :stacktrace, fn ->
if is_nil(opts[:stacktrace]) do
{:current_stacktrace, stacktrace} = Process.info(self(), :current_stacktrace)
%{
__struct__: Ash.Error.Stacktrace,
stacktrace: Enum.drop(stacktrace, 4)
}
end)
stacktrace =
%{
__struct__: Ash.Error.Stacktrace,
stacktrace: stacktrace
}
Keyword.put(opts, :stacktrace, stacktrace)
else
opts
end
super(opts) |> Map.update(:vars, [], &clean_vars/1)
end

View file

@ -3,19 +3,24 @@ defmodule Ash.Error.Forbidden do
use Ash.Error.Exception
def_ash_error([:errors, stacktraces?: true], class: :forbidden)
def_ash_error([:errors], class: :forbidden)
@type t :: %__MODULE__{}
defimpl Ash.ErrorKind do
def id(_), do: Ash.UUID.generate()
def message(%{errors: errors, stacktraces?: stacktraces?}) when not is_nil(errors) do
Ash.Error.error_messages(errors, nil, stacktraces?)
def message(%{errors: errors, stacktrace: %{stacktrace: stacktrace}} = error)
when is_nil(errors) or errors == [] do
Ash.Error.breadcrumb(error.error_context) <>
"* Forbidden\n" <>
Enum.map_join(stacktrace || "", "\n", fn stack_item ->
" " <> Exception.format_stacktrace_entry(stack_item)
end)
end
def message(%{errors: errors}) do
Ash.Error.error_descriptions(errors)
Ash.Error.error_messages(errors || [], nil)
end
def code(_), do: "forbidden"

View file

@ -2,7 +2,7 @@ defmodule Ash.Error.Framework do
@moduledoc "Used when an unknown/generic framework error occurs"
use Ash.Error.Exception
def_ash_error([:errors, stacktraces?: true], class: :framework)
def_ash_error([:errors], class: :framework)
@type t :: %__MODULE__{}
@ -11,8 +11,8 @@ defmodule Ash.Error.Framework do
def code(_), do: "framework"
def message(%{errors: errors, stacktraces?: stacktraces?}) do
Ash.Error.error_messages(errors, nil, stacktraces?)
def message(%{errors: errors}) do
Ash.Error.error_messages(errors, nil)
end
end
end

View file

@ -2,7 +2,7 @@ defmodule Ash.Error.Invalid do
@moduledoc "The top level invalid error"
use Ash.Error.Exception
def_ash_error([:errors, stacktraces?: true], class: :invalid)
def_ash_error([:errors], class: :invalid)
@type t :: %__MODULE__{}
@ -11,8 +11,8 @@ defmodule Ash.Error.Invalid do
def code(_), do: "invalid"
def message(%{errors: errors, stacktraces?: stacktraces?}) do
Ash.Error.error_messages(errors, nil, stacktraces?)
def message(%{errors: errors}) do
Ash.Error.error_messages(errors, nil)
end
end
end

View file

@ -9,6 +9,10 @@ defmodule Ash.Error.Page.InvalidKeyset do
def code(_), do: "invalid_keyset"
def message(%{value: value, key: nil}) do
"Invalid value provided as a keyset: #{inspect(value)}"
end
def message(%{value: value, key: key}) do
"Invalid value provided as a keyset for #{to_string(key)}: #{inspect(value)}"
end

View file

@ -2,7 +2,7 @@ defmodule Ash.Error.Unknown do
@moduledoc "The top level unknown error container"
use Ash.Error.Exception
def_ash_error([:errors, stacktraces?: true], class: :unknown)
def_ash_error([:errors], class: :unknown)
@type t :: %__MODULE__{}
@ -19,8 +19,8 @@ defmodule Ash.Error.Unknown do
def code(_), do: "unknown"
def message(%{errors: errors, stacktraces?: stacktraces?}) do
Ash.Error.error_messages(errors, nil, stacktraces?)
def message(%{errors: errors}) do
Ash.Error.error_messages(errors, nil)
end
end
end

View file

@ -1431,25 +1431,24 @@ defmodule Ash.Filter do
end
end
def run_other_data_layer_filters(api, resource, %{expression: expression} = filter, data) do
case do_run_other_data_layer_filters(expression, api, resource, data) do
{:filter_requests, requests} -> {:filter_requests, requests}
def run_other_data_layer_filters(api, resource, %{expression: expression} = filter, tenant) do
case do_run_other_data_layer_filters(expression, api, resource, tenant) do
{:ok, new_expression} -> {:ok, %{filter | expression: new_expression}}
{:error, error} -> {:error, error}
end
end
def run_other_data_layer_filters(_, _, filter, _data) when filter in [nil, true, false],
def run_other_data_layer_filters(_, _, filter, _tenant) when filter in [nil, true, false],
do: {:ok, filter}
defp do_run_other_data_layer_filters(
%BooleanExpression{op: op, left: left, right: right},
api,
resource,
data
tenant
) do
left_result = do_run_other_data_layer_filters(left, api, resource, data)
right_result = do_run_other_data_layer_filters(right, api, resource, data)
left_result = do_run_other_data_layer_filters(left, api, resource, tenant)
right_result = do_run_other_data_layer_filters(right, api, resource, tenant)
case {left_result, right_result} do
{{:ok, left}, {:ok, right}} ->
@ -1460,22 +1459,12 @@ defmodule Ash.Filter do
{_, {:error, error}} ->
{:error, error}
{{:filter_requests, left_filter_requests}, {:filter_requests, right_filter_requests}} ->
{:filter_requests, left_filter_requests ++ right_filter_requests}
{{:filter_requests, left_filter_requests}, _} ->
{:filter_requests, left_filter_requests}
{_, {:filter_requests, right_filter_requests}} ->
{:filter_requests, right_filter_requests}
end
end
defp do_run_other_data_layer_filters(%Not{expression: expression}, api, resource, data) do
case do_run_other_data_layer_filters(expression, api, resource, data) do
defp do_run_other_data_layer_filters(%Not{expression: expression}, api, resource, tenant) do
case do_run_other_data_layer_filters(expression, api, resource, tenant) do
{:ok, expr} -> {:ok, Not.new(expr)}
{:filter_requests, requests} -> {:filter_requests, requests}
{:error, error} -> {:error, error}
end
end
@ -1484,15 +1473,28 @@ defmodule Ash.Filter do
%Ash.Query.Exists{path: path, expr: expr, at_path: at_path} = exists,
api,
resource,
{request_path, tenant, data}
tenant
) do
case shortest_path_to_changed_data_layer(resource, at_path ++ path) do
{:ok, shortest_path} ->
request_path = request_path ++ [:other_data_layer_filter_exists, at_path] ++ path
related = Ash.Resource.Info.related(resource, shortest_path)
case get_in(data, request_path ++ [:data]) do
%{data: data} ->
# We should do these asynchronously in parallel
# We used to, but this was changed to happen in place as part
# of an architecture simplification (removal of Ash.Engine)
{relationship, context, _action} =
last_relationship_context_and_action(resource, at_path ++ path)
query =
related
|> Ash.Query.do_filter(expr)
|> Ash.Query.set_context(context)
|> Ash.Query.set_tenant(tenant)
|> Map.put(:api, api)
|> Ash.Query.set_context(%{private: %{internal?: true}})
case Ash.Actions.Read.unpaginated_read(query, relationship.read_action) do
{:ok, data} ->
pkey = Ash.Resource.Info.primary_key(related)
expr =
@ -1525,50 +1527,16 @@ defmodule Ash.Filter do
{:ok, expr}
nil ->
{context, action} = last_relationship_context_and_action(resource, at_path ++ path)
query =
related
|> Ash.Query.do_filter(expr)
|> Ash.Query.set_context(context)
{:filter_requests,
Ash.Actions.Read.as_requests(
request_path,
query.resource,
api,
action,
query: query,
page: false,
tenant: tenant
)
|> Enum.map(fn request ->
# By returning the request and a key, we register a dependency on that key
{request, :data}
end)}
{:error, error} ->
{:error, error}
end
:error ->
case do_run_other_data_layer_filters(
expr,
api,
Ash.Resource.Info.related(resource, at_path ++ path),
data
) do
{:ok, new_nested} ->
{:ok, %{exists | expr: new_nested}}
{:error, error} ->
{:error, error}
{:filter_requests, requests} ->
{:filter_requests, requests}
end
{:ok, exists}
end
end
defp do_run_other_data_layer_filters(%{__predicate__?: _} = predicate, api, resource, data) do
defp do_run_other_data_layer_filters(%{__predicate__?: _} = predicate, api, resource, tenant) do
predicate
|> relationship_paths()
|> filter_paths_that_change_data_layers(resource)
@ -1588,7 +1556,7 @@ defmodule Ash.Filter do
{path, new_predicate} ->
relationship = Ash.Resource.Info.relationship(resource, path)
fetch_related_data(resource, path, new_predicate, api, relationship, data)
fetch_related_data(resource, path, new_predicate, api, relationship, tenant)
end
end
@ -1597,7 +1565,7 @@ defmodule Ash.Filter do
defp last_relationship_context_and_action(resource, [name]) do
relationship = Ash.Resource.Info.relationship(resource, name)
{relationship.context,
{relationship, relationship.context,
relationship.read_action ||
Ash.Resource.Info.primary_action!(relationship.destination, :read)}
end
@ -1607,7 +1575,7 @@ defmodule Ash.Filter do
relationship = Ash.Resource.Info.relationship(second_to_last, List.last(path))
{relationship.context, relationship.read_action}
{relationship, relationship.context, relationship.read_action}
end
defp split_expression_by_relationship_path(%{expression: expression}, path) do
@ -1733,7 +1701,7 @@ defmodule Ash.Filter do
api,
%{type: :many_to_many, join_relationship: join_relationship, through: through} =
relationship,
{_, _, context} = data
tenant
) do
if Ash.DataLayer.data_layer(through) == Ash.DataLayer.data_layer(resource) &&
Ash.DataLayer.data_layer_can?(resource, {:join, through}) do
@ -1749,7 +1717,7 @@ defmodule Ash.Filter do
relationship,
:lists.droplast(path) ++ [join_relationship],
api,
data
tenant
)
else
filter = %__MODULE__{
@ -1760,10 +1728,10 @@ defmodule Ash.Filter do
relationship.destination
|> Ash.Query.new(ShadowApi)
|> Ash.Query.do_filter(filter)
|> Ash.Actions.Read.unpaginated_read(
authorize?: context[:authorize?],
actor: context[:actor]
)
|> Ash.Query.do_filter(relationship.filter, parent_stack: [relationship.source])
|> Ash.Query.sort(relationship.sort, prepend?: true)
|> Ash.Query.set_context(relationship.context)
|> Ash.Actions.Read.unpaginated_read()
|> case do
{:ok, results} ->
relationship.through
@ -1776,7 +1744,7 @@ defmodule Ash.Filter do
Ash.Resource.Info.relationship(resource, join_relationship),
:lists.droplast(path),
api,
data
tenant
)
{:error, error} ->
@ -1791,7 +1759,7 @@ defmodule Ash.Filter do
new_predicate,
api,
relationship,
data
tenant
) do
filter = %__MODULE__{
resource: relationship.destination,
@ -1804,44 +1772,30 @@ defmodule Ash.Filter do
|> Ash.Query.do_filter(relationship.filter, parent_stack: [relationship.source])
|> Ash.Query.sort(relationship.sort, prepend?: true)
|> Ash.Query.set_context(relationship.context)
|> filter_related_in(relationship, :lists.droplast(path), api, data)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> filter_related_in(relationship, :lists.droplast(path), api, tenant)
end
defp filter_related_in(
query,
relationship,
path,
api,
{request_path, tenant, data}
_api,
tenant
) do
query = Ash.Query.set_tenant(query, tenant)
request_path = request_path ++ [:other_data_layer_filter, path ++ [relationship.name], query]
case get_in(data, request_path ++ [:data]) do
%{data: records} ->
records_to_expression(
records,
relationship,
path
)
case Ash.Actions.Read.unpaginated_read(query) do
{:ok, data} ->
{:ok,
records_to_expression(
data,
relationship,
path
)}
_ ->
action =
Ash.Resource.Info.action(relationship.destination, relationship.read_action) ||
Ash.Resource.Info.primary_action!(relationship.destination, :read)
action = %{action | pagination: false}
{:filter_requests,
Ash.Actions.Read.as_requests(request_path, query.resource, api, action,
query: query,
page: false,
tenant: tenant
)
|> Enum.map(fn request ->
# By returning the request and a key, we register a dependency on that key
{request, :data}
end)}
{:error, error} ->
{:error, error}
end
end

View file

@ -931,43 +931,59 @@ defmodule Ash.Flow.Executor.AshEngine do
[wait_for, halt_if]
)
Ash.Actions.Read.as_requests([name], resource, api, action,
error_path: List.wrap(name),
authorize?: opts[:authorize?],
actor: opts[:actor],
tracer: opts[:tracer],
query_dependencies: request_deps,
get?: get? || action.get?,
not_found_error?: not_found_error?,
tenant: fn context ->
context = Ash.Helpers.deep_merge_maps(context, additional_context)
results = results(dep_paths, context)
Ash.Engine.Request.new(
resource: resource,
path: [name],
name: inspect([name]),
authorize?: false,
data:
Ash.Engine.Request.resolve(request_deps, fn data ->
data = Ash.Helpers.deep_merge_maps(data, additional_context)
results = results(dep_paths, data)
tenant
|> Ash.Flow.Template.set_dependent_values(%{
results: results,
elements: Map.get(context, :_ash_engine_elements)
})
|> Ash.Flow.handle_modifiers()
end,
modify_query: fn query, context ->
context = Ash.Helpers.deep_merge_maps(context, additional_context)
results = results(dep_paths, context)
case halt_if(halt_if, halt_reason, name, results, data, fn ->
{:ok, Ash.Query.new(resource)}
end) do
{:error, error} ->
{:error, error}
halt_if(halt_if, halt_reason, name, results, context, fn -> query end)
end,
query_input: fn context ->
context = Ash.Helpers.deep_merge_maps(context, additional_context)
{:ok, query} ->
tenant =
tenant
|> Ash.Flow.Template.set_dependent_values(%{
results: results,
elements: Map.get(data, :_ash_engine_elements)
})
|> Ash.Flow.handle_modifiers()
results = results(dep_paths, context)
action_input =
action_input
|> Ash.Flow.Template.set_dependent_values(%{
results: results,
elements: Map.get(data, :_ash_engine_elements)
})
|> Ash.Flow.handle_modifiers()
|> Kernel.||(%{})
action_input
|> Ash.Flow.Template.set_dependent_values(%{
results: results,
elements: Map.get(context, :_ash_engine_elements)
})
|> Ash.Flow.handle_modifiers()
end
query
|> Ash.Query.for_read(action.name, action_input,
actor: data[:actor],
tenant: tenant,
authorize?: data[:authorize?],
tracer: data[:tracer]
)
|> Ash.Query.set_context(%{
private: %{get?: get?, not_found_error?: not_found_error?}
})
|> then(fn query ->
if get? do
api.read_one(query, not_found_error?: not_found_error?)
else
api.read(query)
end
end)
end
end)
)
end
)
@ -1917,7 +1933,7 @@ defmodule Ash.Flow.Executor.AshEngine do
end
defp result_path(%Ash.Flow.Step.Read{name: name}) do
[name, :data, :data]
[name, :data]
end
defp result_path(%Ash.Flow.Step.Create{name: name}) do
@ -1961,7 +1977,7 @@ defmodule Ash.Flow.Executor.AshEngine do
end
defp completion_path(%Ash.Flow.Step.Read{name: name}) do
[name, :data, :completion]
[name, :completion]
end
defp completion_path(%Ash.Flow.Step.Create{name: name}) do
@ -2013,7 +2029,7 @@ defmodule Ash.Flow.Executor.AshEngine do
end
defp data_path(%Ash.Flow.Step.Read{name: name}) do
[name, :data]
[name]
end
defp data_path(%Ash.Flow.Step.Create{name: name}) do

View file

@ -11,7 +11,7 @@ defmodule Ash.Page.Keyset do
def new(results, count, _sort, original_query, more?, opts) do
%__MODULE__{
results: Enum.take(results, (original_query.limit || 1) - 1),
results: results,
count: count,
before: opts[:page][:before],
after: opts[:page][:after],
@ -33,7 +33,7 @@ defmodule Ash.Page.Keyset do
def filter(resource, values, sort, after_or_before) when after_or_before in [:after, :before] do
with {:ok, decoded} <- decode_values(values, after_or_before),
{:ok, zipped} <- zip_fields(sort, decoded) do
{:ok, zipped} <- zip_fields(sort, decoded, values) do
{:ok, filters(zipped, resource, after_or_before)}
end
end
@ -144,14 +144,15 @@ defmodule Ash.Page.Keyset do
defp operator(:before, :desc_nils_first), do: {:gt, false}
defp operator(:before, :desc_nils_last), do: {:gt, true}
defp zip_fields(pkey, values, acc \\ [])
defp zip_fields([], [], acc), do: {:ok, Enum.reverse(acc)}
defp zip_fields(pkey, values, full_value, acc \\ [])
defp zip_fields([], [], _full_value, acc), do: {:ok, Enum.reverse(acc)}
defp zip_fields([{pkey, direction} | rest_pkey], [value | rest_values], acc) do
zip_fields(rest_pkey, rest_values, [{pkey, direction, value} | acc])
defp zip_fields([{pkey, direction} | rest_pkey], [value | rest_values], full_value, acc) do
zip_fields(rest_pkey, rest_values, full_value, [{pkey, direction, value} | acc])
end
defp zip_fields(_, _, _), do: {:error, "Invalid keyset"}
defp zip_fields(_, _, full_value, _),
do: {:error, Ash.Error.Page.InvalidKeyset.exception(value: full_value)}
defp keyset(record, fields) do
record

View file

@ -11,7 +11,7 @@ defmodule Ash.Page.Offset do
def new(results, count, original_query, more?, opts) do
%__MODULE__{
results: Enum.take(results, (original_query.limit || 1) - 1),
results: results,
limit: opts[:page][:limit],
count: count,
offset: opts[:page][:offset] || 0,

View file

@ -570,6 +570,8 @@ defmodule Ash.Policy.Authorizer do
field
end
type = get_type(authorizer.resource, field)
field =
case {field_name, field} do
{nil, %Ash.Query.Calculation{} = calculation} ->
@ -624,7 +626,8 @@ defmodule Ash.Policy.Authorizer do
^field
else
nil
end
end,
type
)
{{expr, data}, acc}
@ -634,6 +637,24 @@ defmodule Ash.Policy.Authorizer do
end
end
defp get_type(_resource, %{type: type, constraints: constraints}), do: {type, constraints}
defp get_type(resource, field) do
case Ash.Resource.Info.field(resource, field) do
%Ash.Resource.Aggregate{kind: kind, field: field, relationship_path: relationship_path} ->
if field do
related = Ash.Resource.Info.related(resource, relationship_path)
{field_type, constraints} = get_type(related, field)
Ash.Query.Aggregate.kind_to_type(kind, field_type, constraints)
else
Ash.Query.Aggregate.kind_to_type(kind, nil, nil)
end
%{type: type, constraints: constraints} ->
{type, constraints}
end
end
defp replace_refs(expression, acc) do
case expression do
%Ash.Query.BooleanExpression{op: op, left: left, right: right} ->

View file

@ -12,7 +12,6 @@ defmodule Ash.Query.Calculation do
context: %{},
required_loads: [],
select: [],
sequence: 0,
filterable?: true
]

View file

@ -44,6 +44,7 @@ defmodule Ash.Query do
load_through: %{},
action_failed?: false,
after_action: [],
authorize_results: [],
aggregates: %{},
arguments: %{},
before_action: [],
@ -77,6 +78,11 @@ defmodule Ash.Query do
| {:ok, [Ash.Resource.record()], [Ash.Notifier.Notification.t()]}
| {:error, any})
],
authorize_results: [
(t, [Ash.Resource.record()] ->
{:ok, [Ash.Resource.record()]}
| {:error, any})
],
aggregates: %{optional(atom) => Ash.Filter.t()},
arguments: %{optional(atom) => any},
before_action: [(t -> t)],
@ -606,6 +612,9 @@ defmodule Ash.Query do
Enum.any?(action.arguments, &(&1.private? == false && to_string(&1.name) == name))
end
defp has_key?(map, key) when is_map(map), do: Map.has_key?(map, key)
defp has_key?(keyword, key), do: Keyword.has_key?(keyword, key)
defp run_preparations(query, action, actor, authorize?, tracer, metadata) do
query.resource
|> Ash.Resource.Info.preparations()
@ -685,6 +694,18 @@ defmodule Ash.Query do
%{query | before_action: [func | query.before_action]}
end
@spec authorize_results(
t(),
(t(), [Ash.Resource.record()] ->
{:ok, [Ash.Resource.record()]}
| {:ok, [Ash.Resource.record()], list(Ash.Notifier.Notification.t())}
| {:error, term})
) :: t()
def authorize_results(query, func) do
query = to_query(query)
%{query | authorize_results: [func | query.authorize_results]}
end
@spec after_action(
t(),
(t(), [Ash.Resource.record()] ->
@ -1022,7 +1043,7 @@ defmodule Ash.Query do
is_nil(query.select) || item in query.select
end
selecting? || Keyword.has_key?(query.load || [], item) ||
selecting? || has_key?(query.load || [], item) ||
Enum.any?(query.calculations, fn
{_, %{module: Ash.Resource.Calculation.LoadRelationship, opts: opts}} ->
opts[:relationship] == item
@ -1073,13 +1094,58 @@ defmodule Ash.Query do
Uses `Ash.Type.load/5` to request that the type load nested data.
"""
def load_through(query, type, name, load) when type in [:attribute, :calculation] do
Map.update!(query, :load_through, fn load_through ->
load_through
|> Map.put_new(type, %{})
|> Map.update!(type, fn loads ->
Map.update(loads, name, load, &(List.wrap(&1) ++ load))
end)
end)
{attr_type, constraints} =
if type == :calculation do
calc = Map.get(query.calculations, name)
{calc.type, calc.constraints}
else
attr =
Ash.Resource.Info.attribute(query.resource, name)
{attr.type, attr.constraints}
end
case Ash.Type.merge_load(
attr_type,
query.load_through[type][name] || [],
load,
constraints,
nil
) do
{:ok, new_value} ->
Map.update!(query, :load_through, fn types ->
types
|> Map.put_new(type, %{})
|> Map.update!(type, fn load_through ->
Map.put(load_through, name, new_value)
end)
end)
{:error, error} ->
Ash.Query.add_error(query, error)
end
end
@doc """
Merges two query's load statements, for the purpose of handling calculation requirements.
This should only be used if you are writing a custom type that is loadable.
See the callback documentation for `c:Ash.Type.merge_load/4` for more.
"""
def merge_query_load(left, right, context) do
if context do
Ash.Actions.Read.Calculations.merge_query_load(
left,
right,
context.api,
context[:calc_path],
context[:calc_name],
context[:calc_load],
context[:relationship_path]
)
else
load(left, right)
end
end
@doc """
@ -1111,10 +1177,17 @@ defmodule Ash.Query do
load(query, List.wrap(fields))
end
def load(query, %Ash.Query{} = new) do
merge_load(query, new)
end
def load(query, fields) do
query = to_query(query)
Enum.reduce(fields, query, fn
%Ash.Query{} = new, query ->
merge_load(query, new)
[], query ->
query
@ -2589,7 +2662,7 @@ defmodule Ash.Query do
end
defp do_unset(query, :load, new) do
query = unset(query, [:calculations, :aggregates])
query = unset(query, [:calculations, :aggregates, :load_through])
struct(query, [{:load, Map.get(new, :load)}])
end
@ -2625,6 +2698,12 @@ defmodule Ash.Query do
Ash.DataLayer.sort(query, ash_query.sort, resource),
{:ok, query} <-
Ash.DataLayer.distinct_sort(query, ash_query.distinct_sort, resource),
{:ok, query} <-
Ash.DataLayer.add_aggregates(
query,
Map.values(ash_query.aggregates),
ash_query.resource
),
{:ok, query} <- maybe_filter(query, ash_query, opts),
{:ok, query} <- Ash.DataLayer.distinct(query, ash_query.distinct, resource),
{:ok, query} <-

View file

@ -42,12 +42,16 @@ defmodule Ash.Query.Type do
end
def try_cast(value, type, constraints) do
case Ash.Type.cast_input(type, value, constraints) do
{:ok, value} ->
{:ok, value}
if Ash.Filter.TemplateHelpers.expr?(value) do
:error
else
case Ash.Type.cast_input(type, value, constraints) do
{:ok, value} ->
{:ok, value}
_ ->
:error
_ ->
:error
end
end
end
end

View file

@ -102,6 +102,7 @@ defmodule Ash.Resource do
Ash.EmbeddableType.define_embeddable_type(embed_nil_values?: embed_nil_values?)
else
# TODO: remove this in 3.0
use Ash.Type
@impl true

View file

@ -14,6 +14,7 @@ defmodule Ash.Resource.Actions.Read do
preparations: [],
primary?: nil,
touches_resources: [],
timeout: nil,
transaction?: false,
type: :read
@ -30,6 +31,7 @@ defmodule Ash.Resource.Actions.Read do
pagination: any,
primary?: boolean,
touches_resources: [atom],
timeout: pos_integer() | nil,
transaction?: boolean,
type: :read
}
@ -72,6 +74,12 @@ defmodule Ash.Resource.Actions.Read do
doc: """
A helper to automatically generate a "get by X" action. Sets `get?` to true, add args for each of the specified fields, and adds a filter for each of the arguments.
"""
],
timeout: [
type: :pos_integer,
doc: """
The maximum amount of time, in milliseconds, that the action is allowed to run for. Ignored if the data layer doesn't support transactions *and* async is disabled.
"""
]
],
@global_opts,

View file

@ -0,0 +1,16 @@
defmodule Ash.Resource.Calculation.FetchAgg do
@moduledoc false
use Ash.Calculation
def calculate(records, opts, _context) do
if load = opts[:load] do
Enum.map(records, fn record -> Map.get(record, load) end)
else
name = opts[:name]
Enum.map(records, fn record ->
record.aggregates[name]
end)
end
end
end

View file

@ -0,0 +1,16 @@
defmodule Ash.Resource.Calculation.FetchCalc do
@moduledoc false
use Ash.Calculation
def calculate(records, opts, _context) do
if load = opts[:load] do
Enum.map(records, fn record -> Map.get(record, load) end)
else
name = opts[:name]
Enum.map(records, fn record ->
record.calculations[name]
end)
end
end
end

View file

@ -0,0 +1,12 @@
defmodule Ash.Resource.Calculation.Literal do
@moduledoc false
use Ash.Calculation
def calculate(records, opts, _context) do
Enum.map(records, fn _ -> opts[:value] end)
end
def expression(_records, opts, _context) do
opts[:value]
end
end

View file

@ -52,3 +52,40 @@ defmodule Ash.Resource.Calculation.LoadRelationship do
end
end
end
# defmodule Ash.Resource.Calculation.LoadRelationship do
# @moduledoc """
# Loads a relationship as a calculation.
# Can be used to load the same relationship with a different query.
# """
# use Ash.Calculation
# def load(query, opts, _) do
# relationship = Ash.Resource.Info.relationship(query.resource, opts[:relationship])
# query =
# opts[:query] ||
# query.resource
# |> Ash.Resource.Info.relationship(opts[:relationship])
# |> Map.get(:destination)
# |> Ash.Query.new()
# query = Ash.Query.to_query(query)
# [relationship.source_attribute, {relationship.name, query}]
# end
# # We should be doing this in the load callback, not the `calculate/3` callback
# # however, we don't have much of a choice currently. We need to rewrite data loading
# # from the ground up, and a byproduct of that will be making data loading more efficient
# # across the board.
# def calculate([], _, _), do: {:ok, []}
# def calculate(results, opts, _context) do
# {:ok,
# Enum.map(results, fn result ->
# Map.get(result, opts[:relationship])
# end)}
# end
# end

View file

@ -0,0 +1,8 @@
defmodule Ash.Resource.Calculation.RuntimeExpression do
@moduledoc false
use Ash.Calculation
def calculate(records, opts, context) do
Ash.Resource.Calculation.Expression.calculate(records, opts, context)
end
end

View file

@ -30,14 +30,14 @@ defmodule Ash.Sort do
For example:
```elixir
Ash.Query.sort(Ash.Sort.expr_sort(author.full_name))
Ash.Query.sort(Ash.Sort.expr_sort(author.full_name, :string))
Ash.Query.sort([{Ash.Sort.expr_sort(author.full_name), :desc_nils_first}])
Ash.Query.sort([{Ash.Sort.expr_sort(author.full_name, :string), :desc_nils_first}])
```
"""
@spec expr_sort(Ash.Expr.t(), Ash.Type.t() | nil) :: Ash.Expr.t()
defmacro expr_sort(expression, type \\ nil) do
quote do
quote generated: true do
require Ash.Expr
type = unquote(type)

View file

@ -108,6 +108,27 @@ defmodule Ash.Type.NewType do
end
end
if function_exported?(subtype_of, :merge_load, 4) do
@impl Ash.Type
def merge_load(left, right, constraints, context) do
unquote(subtype_of).merge_load(left, right, constraints, context)
end
end
if function_exported?(subtype_of, :get_rewrites, 4) do
@impl Ash.Type
def get_rewrites(merged_load, calculation, path, constraints) do
unquote(subtype_of).get_rewrites(merged_load, calculation, path, constraints)
end
end
if function_exported?(subtype_of, :rewrite, 3) do
@impl Ash.Type
def rewrite(value, rewrites, constraints) do
unquote(subtype_of).rewrite(value, rewrites, constraints)
end
end
@impl Ash.Type
def cast_input(value, constraints) do
with {:ok, value} <- unquote(subtype_of).cast_input(value, constraints) do

View file

@ -46,6 +46,33 @@ defmodule Ash.Type.Struct do
api.load(record, load, opts)
end
@impl Ash.Type
def merge_load(left, right, constraints, context) do
instance_of = constraints[:instance_of]
right = Ash.Query.load(instance_of, right)
instance_of
|> Ash.Query.new()
|> Ash.Query.load(left)
|> case do
%{valid?: true} = query ->
{:ok, Ash.Query.merge_query_load(query, right, context)}
query ->
{:error, Ash.Error.to_ash_error(query.errors)}
end
end
@impl Ash.Type
def get_rewrites(merged_load, calculation, path, _) do
Ash.Actions.Read.Calculations.get_all_rewrites(merged_load, calculation, path)
end
@impl Ash.Type
def rewrite(value, rewrites, _constraints) do
Ash.Actions.Read.Calculations.rewrite(rewrites, value)
end
@impl Ash.Type
def can_load?(constraints) do
constraints[:instance_of] && Ash.Resource.Info.resource?(constraints[:instance_of])

View file

@ -173,6 +173,14 @@ defmodule Ash.Type do
authorize?: boolean | nil
}
@type merge_load_context :: %{
api: Ash.Api.t(),
calc_name: term(),
calc_load: term(),
calc_path: list(atom),
relationship_path: list(atom)
}
@callback storage_type() :: Ecto.Type.t()
@callback storage_type(constraints) :: Ecto.Type.t()
@callback include_source(constraints, Ash.Changeset.t()) :: constraints
@ -254,6 +262,27 @@ defmodule Ash.Type do
context :: load_context()
) ::
{:ok, list(term)} | {:error, Ash.Error.t()}
@callback merge_load(
left :: term,
right :: term,
constraints :: Keyword.t(),
context :: merge_load_context() | nil
) ::
{:ok, term} | {:error, error}
@type rewrite_data ::
{type :: :calc | :agg, rewriting_name :: atom, rewriting_load :: atom}
| {:rel, rewriting_name :: atom}
@type rewrite :: {{list(atom), rewrite_data, atom, atom}, source :: term}
@callback get_rewrites(
merged_load :: term,
calculation :: Ash.Query.Calculation.t(),
path :: list(atom),
constraints :: Keyword.t()
) :: [rewrite]
@callback rewrite(value :: term, [rewrite], constraints :: Keyword.t()) :: value :: term
@callback prepare_change_array?() :: boolean()
@callback handle_change_array?() :: boolean()
@ -272,6 +301,9 @@ defmodule Ash.Type do
dump_to_embedded_array: 2,
include_source: 2,
load: 4,
merge_load: 4,
get_rewrites: 4,
rewrite: 3,
operator_overloads: 0,
evaluate_operator: 1
]
@ -873,6 +905,32 @@ defmodule Ash.Type do
type.include_source(constraints, changeset_or_query)
end
@spec merge_load(
type :: Ash.Type.t(),
left :: term(),
right :: term(),
constraints :: Keyword.t(),
context :: merge_load_context() | nil
) ::
{:ok, list(term)} | :error | {:error, Ash.Error.t()}
def merge_load({:array, type}, left, right, constraints, context) do
merge_load(type, left, right, constraints[:items] || [], context)
end
def merge_load(
type,
left,
right,
constraints,
context
) do
if function_exported?(type, :merge_load, 4) do
type.merge_load(left, right, constraints, context)
else
:error
end
end
@spec load(
type :: Ash.Type.t(),
values :: list(term),
@ -907,6 +965,30 @@ defmodule Ash.Type do
end)
end
def get_rewrites({:array, type}, merged_load, calculation, path, constraints) do
get_rewrites(type, merged_load, calculation, path, constraints[:items] || [])
end
def get_rewrites(type, merged_load, calculation, path, constraints) do
type = get_type(type)
type.get_rewrites(merged_load, calculation, path, constraints)
end
def rewrite(_type, nil, _rewrites, _constraints), do: nil
def rewrite(_type, [], _rewrites, _constraints), do: []
def rewrite({:array, type}, value, rewrites, constraints) when is_list(value) do
item_constraints = constraints[:items] || []
Enum.map(value, fn value ->
rewrite(type, value, rewrites, item_constraints)
end)
end
def rewrite(type, item, rewrites, constraints) when not is_list(item) do
type.rewrite(item, rewrites, constraints)
end
@doc false
def splicing_nil_values(values, callback) when is_list(values) do
values

View file

@ -213,6 +213,90 @@ defmodule Ash.Type.Union do
end
end
@impl Ash.Type
def merge_load(left, right, constraints, context) do
constraints[:types]
|> Enum.reduce_while({:ok, []}, fn {name, config}, {:ok, acc} ->
merged =
[left[name], right[name], left[:*], right[:*]]
|> Enum.reject(&is_nil/1)
|> case do
[] ->
{:ok, acc}
load_statements ->
Enum.reduce_while(load_statements, {:ok, []}, fn load_statement, {:ok, merged} ->
if merged in [nil, [], %{}] do
{:cont, {:ok, load_statement}}
else
case Ash.Type.merge_load(
config[:type],
merged,
load_statement,
config[:constraints],
context
) do
{:ok, merged} -> {:cont, {:ok, merged}}
{:error, error} -> {:halt, {:error, error}}
end
end
end)
end
case merged do
{:ok, merged} ->
{:cont, {:ok, Keyword.put(acc, name, merged)}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
end
@impl Ash.Type
def get_rewrites(merged_load, calculation, path, constraints) do
merged_load
|> Enum.flat_map(fn {key, type_load} ->
constraints[:types][key][:type]
|> Ash.Type.get_rewrites(
type_load,
calculation,
path,
constraints[:types][key][:constraints]
)
|> Enum.map(fn {{rewrite_path, data, name, load}, source} ->
{{[key | rewrite_path], data, name, load}, source}
end)
end)
end
@impl Ash.Type
def rewrite(value, rewrites, constraints) do
type_rewrites =
Enum.flat_map(rewrites, fn
{{[first | path_rest], data, name, load}, source} ->
if first == value.type do
[{{path_rest, data, name, load}, source}]
else
[]
end
_ ->
[]
end)
Map.update!(
value,
:value,
&Ash.Type.rewrite(
constraints[:types][value.type][:type],
&1,
type_rewrites,
constraints[:types][value.type][:type]
)
)
end
@impl true
def cast_input(nil, _), do: {:ok, nil}

View file

@ -7,7 +7,7 @@ defmodule Ash.MixProject do
designed to be used by multiple front ends.
"""
@version "2.18.2"
@version "2.18.1"
def project do
[
@ -304,7 +304,10 @@ defmodule Ash.MixProject do
files: ~w(lib .formatter.exs mix.exs README* LICENSE*
CHANGELOG* documentation),
links: %{
GitHub: "https://github.com/ash-project/ash"
GitHub: "https://github.com/ash-project/ash",
Discord: "https://discord.gg/HTHRaaVPUc",
Website: "https://ash-hq.org",
Forum: "https://elixirforum.com/c/elixir-framework-forums/ash-framework-forum"
}
]
end

View file

@ -831,7 +831,7 @@ defmodule Ash.Test.Actions.CreateTest do
on_lookup: :relate,
on_match: :ignore
)
|> Ash.create!(stacktraces?: true)
|> Ash.create!()
end
end

View file

@ -229,7 +229,9 @@ defmodule Ash.Test.Actions.LoadTest do
end
relationships do
belongs_to(:author, Author)
belongs_to :author, Author do
attribute_writable? true
end
has_many :posts_in_same_category, __MODULE__ do
manual(PostsInSameCategory)
@ -463,7 +465,7 @@ defmodule Ash.Test.Actions.LoadTest do
} = Api.load!(author, :campaign)
end
test "it allows loading related data" do
test "it allows loading data" do
author =
Author
|> new(%{name: "zerg"})
@ -471,14 +473,36 @@ defmodule Ash.Test.Actions.LoadTest do
post1 =
Post
|> new(%{title: "post1"})
|> manage_relationship(:author, author, type: :append_and_remove)
|> new(%{title: "post1", author_id: author.id})
|> Api.create!()
post2 =
Post
|> new(%{title: "post2"})
|> manage_relationship(:author, author, type: :append_and_remove)
|> new(%{title: "post2", author_id: author.id})
|> Api.create!()
assert [fetched_post1, fetched_post2] =
author
|> Api.load!(:posts)
|> Map.get(:posts)
assert Enum.sort([post1.id, post2.id]) == Enum.sort([fetched_post1.id, fetched_post2.id])
end
test "it allows loading nested related data" do
author =
Author
|> new(%{name: "zerg"})
|> Api.create!()
post1 =
Post
|> new(%{title: "post1", author_id: author.id})
|> Api.create!()
post2 =
Post
|> new(%{title: "post2", author_id: author.id})
|> Api.create!()
[author] =
@ -808,11 +832,10 @@ defmodule Ash.Test.Actions.LoadTest do
|> new(%{name: "zerg"})
|> Api.create!()
post1 =
Post
|> new(%{title: "post1"})
|> manage_relationship(:author, author, type: :append_and_remove)
|> Api.create!()
Post
|> new(%{title: "post1"})
|> manage_relationship(:author, author, type: :append_and_remove)
|> Api.create!()
post2 =
Post
@ -825,7 +848,6 @@ defmodule Ash.Test.Actions.LoadTest do
|> Ash.Query.load(:latest_post)
|> Api.read!()
refute author.latest_post.id == post1.id
assert author.latest_post.id == post2.id
end
end
@ -917,25 +939,6 @@ defmodule Ash.Test.Actions.LoadTest do
Author
|> Ash.Query.load(bio_union_calc: {%{}, [*: [:full_name, :forbidden_name]]})
|> Api.read!(actor: %{name: "zerg"}, authorize?: true)
# assert [
# %{
# bio_union_calc: %Ash.Union{
# value: %{full_name: "donald duck", forbidden_name: %Ash.ForbiddenField{}}
# }
# },
# %{
# bio_union_calc: %Ash.Union{
# value: %{full_name: "donald duck", forbidden_name: %Ash.ForbiddenField{}}
# }
# }
# ] =
# Author
# |> Ash.Query.load(
# bio_union_calc:
# {%{}, [bio: :full_name, other_kind_of_bio: [:full_name, :forbidden_name]]}
# )
# |> Api.read!(actor: %{name: "zerg"})
end
end
end

View file

@ -283,6 +283,14 @@ defmodule Ash.Test.CalculationTest do
load([:user_is_active])
end
calculate :active_elixir, :boolean do
calculation fn record, _ ->
record.is_active && record.user_is_active
end
load [:is_active, :user_is_active]
end
calculate :has_user, :boolean, RoleHasUser
calculate :user_is_active_with_calc, :boolean, expr(user.is_active || false)
@ -303,6 +311,20 @@ defmodule Ash.Test.CalculationTest do
end
end
defmodule Bio do
use Ash.Resource, data_layer: :embedded
attributes do
attribute :greeting, :string
end
calculations do
calculate :say_hello, :string, expr(greeting <> " " <> ^arg(:to)) do
argument :to, :string, allow_nil?: false
end
end
end
defmodule User do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
@ -329,9 +351,26 @@ defmodule Ash.Test.CalculationTest do
attribute(:prefix, :string)
attribute(:special, :boolean)
attribute(:is_active, :boolean)
attribute(:bio, Bio)
end
calculations do
calculate :say_hello_to_fred, :string do
calculation fn record, _ ->
record.bio.say_hello
end
load bio: [say_hello: %{to: "Fred"}]
end
calculate :say_hello_to_george, :string do
calculation fn record, _ ->
record.bio.say_hello
end
load bio: [say_hello: %{to: "George"}]
end
calculate(:active, :boolean, expr(is_active))
calculate :full_name, :string, {Concat, keys: [:first_name, :last_name]} do
@ -539,7 +578,7 @@ defmodule Ash.Test.CalculationTest do
setup do
user1 =
User
|> Ash.Changeset.new(%{first_name: "zach", last_name: "daniel"})
|> Ash.Changeset.new(%{first_name: "zach", last_name: "daniel", bio: %{greeting: "Yo! "}})
|> Api.create!()
admin_role =
@ -1034,4 +1073,10 @@ defmodule Ash.Test.CalculationTest do
User
|> Ash.Query.load(full_name: %{separator: %{foo: :bar}})
end
test "calculation dependencies with conflicting load throughs still receive the appropriate values",
%{user1: user1} do
user1
|> Api.load!([:say_hello_to_fred, :say_hello_to_george])
end
end

View file

@ -11,8 +11,8 @@ defmodule Ash.Test.Authorizer do
Agent.start_link(
fn ->
%{
strict_check_result: maybe_forbidden(opts[:strict_check]),
check_result: maybe_forbidden(opts[:check]),
strict_check_result: maybe_forbidden(opts[:strict_check] || :authorized),
check_result: maybe_forbidden(opts[:check] || :authorized),
strict_check_context: opts[:strict_check_context]
}
end,

View file

@ -25,6 +25,8 @@ defmodule Ash.Test.Flow.Flows.BranchingTransactionMapping do
steps do
transaction :create_or_update_users, User do
read :get_org, Org, :by_name do
get? true
input %{
name: arg(:org_name)
}

View file

@ -14,6 +14,8 @@ defmodule Ash.Test.Flow.Flows.GetOrgAndUsers do
steps do
read :get_org, Ash.Test.Flow.Org, :by_name do
get? true
input(%{
name: arg(:org_name)
})

View file

@ -14,6 +14,8 @@ defmodule Ash.Test.Flow.Flows.GetOrgByName do
steps do
read :get_org, Ash.Test.Flow.Org, :by_name do
get? true
input(%{
name: arg(:org_name)
})

View file

@ -22,6 +22,8 @@ defmodule Ash.Test.Flow.Flows.SignUpAndDeleteUser do
steps do
read :get_org, Ash.Test.Flow.Org, :by_name do
get? true
input %{
name: arg(:org_name)
}

View file

@ -22,6 +22,8 @@ defmodule Ash.Test.Flow.Flows.SignUpUser do
steps do
read :get_org, Ash.Test.Flow.Org, :by_name do
get? true
input %{
name: arg(:org_name)
}

View file

@ -18,6 +18,8 @@ defmodule Ash.Test.Flow.Flows.UnapproveAllUsers do
steps do
read :get_org, Ash.Test.Flow.Org, :by_name do
get? true
input(%{
name: arg(:org_name)
})

View file

@ -214,61 +214,14 @@ defmodule Ash.Test.TracerTest.AsyncLoadTest do
{[:ash, :api, :read, :start], %{system_time: _},
%{resource_short_name: :post}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :start], %{system_time: _},
%{name: "fetch Ash.Test.TracerTest.AsyncLoadTest.Post.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :stop], %{duration: _},
%{name: "fetch Ash.Test.TracerTest.AsyncLoadTest.Post.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :start], %{system_time: _},
%{name: "process Ash.Test.TracerTest.AsyncLoadTest.Post.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :stop], %{duration: _, system_time: _},
%{name: "process Ash.Test.TracerTest.AsyncLoadTest.Post.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :start], %{system_time: _}, %{name: "load author"}, []}}
assert_receive {:telemetry,
{[:ash, :api, :read, :start], %{system_time: _},
%{resource_short_name: :author}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :start], %{system_time: _},
%{name: "fetch Ash.Test.TracerTest.AsyncLoadTest.Author.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :stop], %{duration: _, system_time: _},
%{name: "fetch Ash.Test.TracerTest.AsyncLoadTest.Author.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :start], %{system_time: _},
%{name: "process Ash.Test.TracerTest.AsyncLoadTest.Author.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :stop], %{duration: _, system_time: _},
%{name: "process Ash.Test.TracerTest.AsyncLoadTest.Author.read"}, []}}
assert_receive {:telemetry,
{[:ash, :api, :read, :stop], %{duration: _, system_time: _},
%{resource_short_name: :author}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :stop], %{duration: _, system_time: _},
%{name: "load author"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :start], %{system_time: _},
%{name: "process Ash.Test.TracerTest.AsyncLoadTest.Post.read"}, []}}
assert_receive {:telemetry,
{[:ash, :request_step, :stop], %{duration: _, system_time: _},
%{name: "process Ash.Test.TracerTest.AsyncLoadTest.Post.read"}, []}}
assert_receive {:telemetry,
{[:ash, :api, :read, :stop], %{duration: _, system_time: _},
%{resource_short_name: :post}, []}}
@ -287,13 +240,6 @@ defmodule Ash.Test.TracerTest.AsyncLoadTest do
}
] = Ash.Tracer.Simple.gather_spans()
assert load =
Enum.find(spans, fn span ->
span.name == "load author"
end)
assert Enum.find(load.spans, fn span ->
span.name == "api:author.read"
end)
assert Enum.any?(spans, &(&1.name == "api:author.read"))
end
end