feat: add named aggregates

This commit is contained in:
Zach Daniel 2020-08-08 13:55:36 -04:00
parent 094b39d27f
commit be74d1e7eb
No known key found for this signature in database
GPG key ID: C377365383138D4B
29 changed files with 526 additions and 241 deletions

View file

@ -7,6 +7,8 @@ locals_without_parens = [
belongs_to: 2,
belongs_to: 3,
constraints: 1,
count: 2,
count: 3,
create: 1,
create: 2,
create_timestamp: 1,
@ -20,6 +22,7 @@ locals_without_parens = [
destroy: 2,
expensive?: 1,
field_type: 1,
filter: 1,
generated?: 1,
has_many: 2,
has_many: 3,
@ -27,6 +30,7 @@ locals_without_parens = [
has_one: 3,
join_attributes: 1,
join_relationship: 1,
kind: 1,
many_to_many: 2,
many_to_many: 3,
on: 1,

View file

@ -21,25 +21,6 @@ defmodule MyApp.Api do
end
```
Then, add `MyApp.Api` to your `application.ex`'s start function, which should look something like this:
```elixir
def start(_type, _args) do
children = [
# Start the Ecto repository
MyApp.Repo,
# Start the Telemetry supervisor
MyApp.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: MyApp.PubSub},
# Start the Endpoint (http/https)
MyApp.Endpoint,
MyApp.Api # <- Add your API here
]
...
end
```
## Create a resource
A resource is the primary entity in Ash. Your Api module ties your resources together and gives them an interface, but the vast majority if your configuration will live in a resource. In your typical setup, you might have a resource per database table. For those already familiar with ecto, a resource and an ecto schema are very similar. In fact, all resources define an ecto schema under the hood. This can be leveraged when you need to do things that are not yet implemented or fall outside of the scope of Ash. The current reccomendation for where to put your resources is in `lib/resources/<resource_name>.ex`. Here are a few examples:
@ -146,5 +127,5 @@ end
- `Ash.Api` for what you can do with your resources.
- `Ash.Query` for the kinds of queries you can make.
- `Ash.Dsl` for the resource DSL documentation.
- `Ash.Resource.Dsl` for the resource DSL documentation.
- `Ash.Api.Dsl` for the API DSL documentation.

View file

@ -10,7 +10,7 @@ defmodule Ash do
## Quick Links
- [Resource DSL Documentation](Ash.Dsl.html)
- [Resource DSL Documentation](Ash.Resource.Dsl.html)
- [Api DSL Documentation](Ash.Api.Dsl.html)
- [Api interface documentation](Ash.Api.html)
- [Query Documentation](Ash.Query.html)
@ -68,7 +68,7 @@ defmodule Ash do
@type action_type :: :read | :create | :update | :destroy
@type actor :: Ash.record()
@type aggregate :: Ash.Query.Aggregate.t()
@type aggregate_type :: Ash.Query.Aggregate.type()
@type aggregate_kind :: Ash.Query.Aggregate.kind()
@type api :: module
@type attribute :: Ash.Resource.Attribute.t()
@type cardinality_many_relationship() :: HasMany.t() | ManyToMany.t()
@ -77,7 +77,7 @@ defmodule Ash do
@type data_layer :: module
@type data_layer_query :: struct
@type error :: struct
@type filter :: map()
@type filter :: Ash.Filter.t()
@type params :: Keyword.t()
@type primary_key :: record() | map | term
@type query :: Ash.Query.t()
@ -85,7 +85,7 @@ defmodule Ash do
@type relationship :: cardinality_one_relationship() | cardinality_many_relationship()
@type relationship_cardinality :: :many | :one
@type resource :: module
@type side_loads :: Keyword.t()
@type side_loads :: [{atom, [atom | query()]} | atom] | atom
@type sort :: Keyword.t()
@type validation :: Ash.Resource.Validation.t()

View file

@ -1,12 +1,13 @@
defmodule Ash.Actions.Create do
@moduledoc false
alias Ash.Actions.Relationships
alias Ash.Engine
alias Ash.Engine.Request
alias Ash.Actions.{Relationships, SideLoad}
require Logger
@spec run(Ash.api(), Ash.changeset(), Ash.action(), Keyword.t()) ::
{:ok, Ash.record()} | {:error, Ash.error()}
def run(api, changeset, action, opts) do
side_load = opts[:side_load] || []
upsert? = opts[:upsert?] || false
resource = changeset.resource
@ -17,12 +18,8 @@ defmodule Ash.Actions.Create do
with %{valid?: true} = changeset <- changeset(changeset, api),
:ok <- check_upsert_support(changeset.resource, upsert?),
{:ok, side_load_query} <-
side_loads_as_query(changeset.api, changeset.resource, side_load),
side_load_requests <-
SideLoad.requests(side_load_query),
%{
data: %{commit: %^resource{} = created} = state,
data: %{commit: %^resource{} = created},
errors: []
} <-
do_run_requests(
@ -31,10 +28,9 @@ defmodule Ash.Actions.Create do
engine_opts,
action,
resource,
api,
side_load_requests
api
) do
{:ok, SideLoad.attach_side_loads(created, state)}
{:ok, created}
else
%Ash.Changeset{errors: errors} ->
{:error, Ash.Error.to_ash_error(errors)}
@ -92,8 +88,7 @@ defmodule Ash.Actions.Create do
engine_opts,
action,
resource,
api,
side_load_requests
api
) do
authorization_request =
Request.new(
@ -135,8 +130,7 @@ defmodule Ash.Actions.Create do
)
Engine.run(
[authorization_request | [commit_request | relationship_read_requests]] ++
side_load_requests,
[authorization_request | [commit_request | relationship_read_requests]],
api,
engine_opts
)
@ -152,20 +146,6 @@ defmodule Ash.Actions.Create do
end
end
defp side_loads_as_query(_api, _resource, nil), do: {:ok, nil}
defp side_loads_as_query(_api, _resource, %Ash.Query{errors: []} = query), do: {:ok, query}
defp side_loads_as_query(_api, _resource, %Ash.Query{errors: errors}), do: {:error, errors}
defp side_loads_as_query(api, resource, side_loads) when is_list(side_loads) do
resource
|> Ash.Query.new(api)
|> Ash.Query.side_load(side_loads)
|> case do
%{errors: []} = query -> {:ok, query}
%{errors: errors} -> {:error, errors}
end
end
defp default({:constant, value}), do: value
defp default({mod, func, args}), do: apply(mod, func, args)
defp default(function) when is_function(function, 0), do: function.()

View file

@ -10,6 +10,8 @@ defmodule Ash.Actions.Read do
def run(query, action, opts \\ []) do
engine_opts = Keyword.take(opts, [:verbose?, :actor, :authorize?])
query = query_with_initial_data(query, opts)
with %{errors: []} <- query,
{:ok, requests} <- requests(query, action, opts),
side_load_requests <- SideLoad.requests(query),
@ -19,6 +21,7 @@ defmodule Ash.Actions.Read do
data_with_aggregates <-
add_aggregate_values(
data_with_side_loads,
query.aggregates,
query.resource,
Map.get(all_data, :aggregate_values, %{})
) do
@ -32,18 +35,49 @@ defmodule Ash.Actions.Read do
end
end
defp query_with_initial_data(query, opts) do
case Keyword.fetch(opts, :initial_data) do
:error ->
query
{:ok, nil} ->
Ash.Query.filter(query, false)
{:ok, []} ->
Ash.Query.filter(query, false)
{:ok, [record]} ->
pkey_value = record |> Map.take(Ash.Resource.primary_key(query.resource)) |> Map.to_list()
Ash.Query.filter(query, pkey_value)
{:ok, %{} = record} ->
pkey_value = record |> Map.take(Ash.Resource.primary_key(query.resource)) |> Map.to_list()
Ash.Query.filter(query, pkey_value)
{:ok, records} when is_list(records) ->
pkey = Ash.Resource.primary_key(query.resource)
pkey_value = Enum.map(records, fn record -> record |> Map.take(pkey) |> Map.to_list() end)
Ash.Query.filter(query, or: pkey_value)
end
end
defp requests(query, action, opts) do
filter_requests =
if Keyword.has_key?(opts, :actor) || opts[:authorize?] do
if not Keyword.has_key?(opts, :initial_data) &&
(Keyword.has_key?(opts, :actor) || opts[:authorize?]) do
Filter.read_requests(query.api, query.filter)
else
{:ok, []}
end
authorizing? = Keyword.has_key?(opts, :actor) || opts[:authorize?]
can_be_in_query? = not Keyword.has_key?(opts, :initial_data)
{aggregate_auth_requests, aggregate_value_requests, aggregates_in_query} =
Aggregate.requests(query, authorizing?)
Aggregate.requests(query, can_be_in_query?, authorizing?)
case filter_requests do
{:ok, filter_requests} ->
@ -53,6 +87,7 @@ defmodule Ash.Actions.Read do
api: query.api,
query: query,
action: action,
authorize?: not Keyword.has_key?(opts, :initial_data),
data:
data_field(
opts,
@ -125,10 +160,11 @@ defmodule Ash.Actions.Read do
end
end
defp add_aggregate_values(results, _resource, aggregate_values) when aggregate_values == %{},
defp add_aggregate_values(results, _aggregates, _resource, aggregate_values)
when aggregate_values == %{},
do: Enum.map(results, &Map.update!(&1, :aggregates, fn agg -> agg || %{} end))
defp add_aggregate_values(results, resource, aggregate_values) do
defp add_aggregate_values(results, aggregates, resource, aggregate_values) do
keys_to_aggregates =
Enum.reduce(aggregate_values, %{}, fn {_name, keys_to_values}, acc ->
Enum.reduce(keys_to_values, acc, fn {pkey, values}, acc ->
@ -138,9 +174,17 @@ defmodule Ash.Actions.Read do
pkey = Ash.Resource.primary_key(resource)
loaded =
aggregates
|> Enum.map(fn {_, aggregate} -> aggregate.load end)
|> Enum.reject(&is_nil/1)
Enum.map(results, fn result ->
aggregate_values = Map.get(keys_to_aggregates, Map.take(result, pkey), %{})
%{result | aggregates: Map.merge(result.aggregates || %{}, aggregate_values)}
{top_level, nested} = Map.split(aggregate_values || %{}, loaded)
Map.merge(%{result | aggregates: Map.merge(result.aggregates, nested)}, top_level)
end)
end

View file

@ -584,14 +584,14 @@ defmodule Ash.Actions.Relationships do
defp add_to_set_relationship(record, relationship_name, to_relate) do
Map.update!(record, relationship_name, fn
%Ecto.Association.NotLoaded{} -> [clear_relationships(to_relate)]
%Ash.NotLoaded{type: :relationship} -> [clear_relationships(to_relate)]
set_relationship -> [clear_relationships(to_relate) | set_relationship]
end)
end
defp remove_from_set_relationship(record, relationship_name, to_remove, pkey) do
Map.update!(record, relationship_name, fn
%Ecto.Association.NotLoaded{} ->
%Ash.NotLoaded{type: :relationship} ->
[]
set_relationship ->
@ -777,10 +777,9 @@ defmodule Ash.Actions.Relationships do
resource
|> Ash.Resource.relationships()
|> Enum.reduce(record, fn relationship, record ->
not_loaded = %Ecto.Association.NotLoaded{
__cardinality__: relationship.cardinality,
__field__: relationship.name,
__owner__: relationship.source
not_loaded = %Ash.NotLoaded{
type: :relationship,
field: relationship.name
}
Map.put(record, relationship.name, not_loaded)

View file

@ -54,41 +54,6 @@ defmodule Ash.Actions.SideLoad do
end)
end
def side_load(data, query, opts \\ [])
def side_load([], _query, _opts), do: {:ok, []}
def side_load(nil, _query, _opts), do: {:ok, nil}
def side_load(data, %{side_load: []}, _opts), do: {:ok, data}
def side_load(data, query, opts) when not is_list(data) do
data
|> List.wrap()
|> side_load(query, opts)
|> case do
{:ok, [record]} -> {:ok, record}
{:error, error} -> {:error, error}
end
end
def side_load([%resource{} | _] = data, side_load_query, opts) do
api = side_load_query.api
pkey = Ash.Resource.primary_key(resource)
pkey_filters = Enum.map(data, &Map.take(&1, pkey))
new_query = Ash.Query.filter(side_load_query, or: pkey_filters)
requests = requests(new_query, false, data)
case Engine.run(requests, api, opts) do
%{data: %{side_load: _} = state, errors: errors} when errors == [] ->
{:ok, attach_side_loads(data, state)}
%{errors: errors} ->
{:error, errors}
end
end
def attach_side_loads([%resource{} | _] = data, %{side_load: side_loads})
when is_list(data) do
side_loads

View file

@ -1,15 +1,13 @@
defmodule Ash.Actions.Update do
@moduledoc false
alias Ash.Actions.Relationships
alias Ash.Engine
alias Ash.Engine.Request
alias Ash.Actions.{Relationships, SideLoad}
require Logger
@spec run(Ash.api(), Ash.record(), Ash.action(), Keyword.t()) ::
{:ok, Ash.record()} | {:error, Ecto.Changeset.t()} | {:error, Ash.error()}
def run(api, changeset, action, opts) do
side_load = opts[:side_load] || []
engine_opts =
opts
|> Keyword.take([:verbose?, :actor, :authorize?])
@ -18,20 +16,15 @@ defmodule Ash.Actions.Update do
resource = changeset.resource
with %{valid?: true} = changeset <- changeset(changeset, api),
{:ok, side_load_query} <-
side_loads_as_query(changeset.api, changeset.resource, side_load),
side_load_requests <-
SideLoad.requests(side_load_query),
%{data: %{commit: %^resource{} = updated}, errors: []} = state <-
%{data: %{commit: %^resource{} = updated}, errors: []} <-
do_run_requests(
changeset,
engine_opts,
action,
resource,
api,
side_load_requests
api
) do
{:ok, SideLoad.attach_side_loads(updated, state)}
{:ok, updated}
else
%Ash.Changeset{errors: errors} ->
{:error, Ash.Error.to_ash_error(errors)}
@ -82,8 +75,7 @@ defmodule Ash.Actions.Update do
engine_opts,
action,
resource,
api,
side_load_requests
api
) do
authorization_request =
Request.new(
@ -121,26 +113,12 @@ defmodule Ash.Actions.Update do
relationship_requests = changeset.requests
Engine.run(
[authorization_request | [commit_request | relationship_requests]] ++ side_load_requests,
[authorization_request | [commit_request | relationship_requests]],
api,
engine_opts
)
end
defp side_loads_as_query(_api, _resource, nil), do: {:ok, nil}
defp side_loads_as_query(_api, _resource, %Ash.Query{errors: []} = query), do: {:ok, query}
defp side_loads_as_query(_api, _resource, %Ash.Query{errors: errors}), do: {:error, errors}
defp side_loads_as_query(api, resource, side_loads) when is_list(side_loads) do
resource
|> Ash.Query.new(api)
|> Ash.Query.side_load(side_loads)
|> case do
%{errors: []} = query -> {:ok, query}
%{errors: errors} -> {:error, errors}
end
end
defp default({:constant, value}), do: value
defp default({mod, func, args}), do: apply(mod, func, args)
defp default(function) when is_function(function, 0), do: function.()

View file

@ -23,7 +23,7 @@ defmodule Ash.Api do
import Ash.OptionsHelpers, only: [merge_schemas: 3]
alias Ash.Actions.{Create, Destroy, Read, SideLoad, Update}
alias Ash.Actions.{Create, Destroy, Read, Update}
alias Ash.Error.Invalid.{InvalidPrimaryKey, NoSuchAction, NoSuchResource}
@global_opts [
@ -54,24 +54,17 @@ defmodule Ash.Api do
@doc false
def read_opts_schema, do: @read_opts_schema
@side_load_opts_schema merge_schemas([], @global_opts, "Global Options")
@load_opts_schema merge_schemas([], @global_opts, "Global Options")
@get_opts_schema [
side_load: [
load: [
type: :any,
doc:
"Side loads to include in the query, same as you would pass to `Ash.Query.side_load/2`"
doc: "Fields or relationships to load in the query. See `Ash.Query.load/2`"
]
]
|> merge_schemas(@global_opts, "Global Options")
@shared_create_and_update_opts_schema [
side_load: [
type: :any,
doc:
"Side loads to include in the query, same as you would pass to `Ash.Query.side_load/2`"
]
]
@shared_create_and_update_opts_schema []
|> merge_schemas(@global_opts, "Global Options")
@create_opts_schema [
@ -135,23 +128,31 @@ defmodule Ash.Api do
{:ok, list(Ash.resource())} | {:error, Ash.error()}
@doc """
Side load on already fetched records. See `c:side_load/2` for more information.
Load fields or relationships on already fetched records. See `c:load/2` for more information.
#{NimbleOptions.docs(@side_load_opts_schema)}
#{NimbleOptions.docs(@load_opts_schema)}
"""
@callback side_load!(resource :: Ash.resource(), params :: Keyword.t() | Ash.query()) ::
list(Ash.resource()) | no_return
@callback load!(
record_or_records :: Ash.record() | [Ash.record()],
params :: Keyword.t() | Ash.query()
) ::
Ash.record() | [Ash.record()] | no_return
@doc """
Side load on already fetched records.
Load fields or relationships on already fetched records.
Accepts a keyword list of side loads as they would be passed into `Ash.Query.side_load/2`
or an `%Ash.Query{}`, in which case that query's side loads are used.
Accepts a list of non-loaded fields and loads them on the provided records or a query, in
which case the loaded fields of the query are used. Relationship loads can be nested, for
example: `MyApi.load(record, [posts: [:comments]])`. See `Ash.Query.side_load/2` for more
information on specifically loading relationships.
#{NimbleOptions.docs(@side_load_opts_schema)}
#{NimbleOptions.docs(@load_opts_schema)}
"""
@callback side_load(resource :: Ash.resource(), params :: Keyword.t() | Ash.query()) ::
{:ok, list(Ash.resource())} | {:error, Ash.error()}
@callback load(
record_or_records :: Ash.record() | [Ash.record()],
params :: Keyword.t() | Ash.query()
) ::
{:ok, Ash.record() | [Ash.record()]} | {:error, Ash.error()}
@doc """
Create a record. See `c:create/2` for more information.
@ -282,8 +283,8 @@ defmodule Ash.Api do
resource
|> Ash.Query.new(api)
|> Ash.Query.filter(filter)
|> Ash.Query.side_load(opts[:side_load] || [])
|> api.read(Keyword.delete(opts, :side_load))
|> Ash.Query.load(opts[:load] || [])
|> api.read(Keyword.delete(opts, :load))
|> case do
{:ok, [single_result]} ->
{:ok, single_result}
@ -321,38 +322,38 @@ defmodule Ash.Api do
end
@doc false
@spec side_load!(
@spec load!(
Ash.api(),
Ash.record() | list(Ash.record()),
Ash.query() | list(atom | {atom, list()}),
Keyword.t()
) ::
list(Ash.record()) | Ash.record() | no_return
def side_load!(api, data, query, opts \\ []) do
opts = NimbleOptions.validate!(opts, @side_load_opts_schema)
def load!(api, data, query, opts \\ []) do
opts = NimbleOptions.validate!(opts, @load_opts_schema)
api
|> side_load(data, query, opts)
|> load(data, query, opts)
|> unwrap_or_raise!()
end
@doc false
@spec side_load(Ash.api(), Ash.query(), Keyword.t()) ::
@spec load(Ash.api(), Ash.query(), Keyword.t()) ::
{:ok, list(Ash.resource())} | {:error, Ash.error()}
def side_load(api, data, query, opts \\ [])
def side_load(_, [], _, _), do: {:ok, []}
def side_load(_, nil, _, _), do: {:ok, nil}
def load(api, data, query, opts \\ [])
def load(_, [], _, _), do: {:ok, []}
def load(_, nil, _, _), do: {:ok, nil}
def side_load(api, data, query, opts) when not is_list(data) do
def load(api, data, query, opts) when not is_list(data) do
api
|> side_load(List.wrap(data), query, opts)
|> load(List.wrap(data), query, opts)
|> case do
{:ok, [data]} -> {:ok, data}
{:error, error} -> {:error, error}
end
end
def side_load(api, [%resource{} | _] = data, query, opts) do
def load(api, [%resource{} | _] = data, query, opts) do
query =
case query do
%Ash.Query{} = query ->
@ -361,12 +362,13 @@ defmodule Ash.Api do
keyword ->
resource
|> Ash.Query.new(api)
|> Ash.Query.side_load(keyword)
|> Ash.Query.load(keyword)
end
with %{valid?: true} <- query,
{:ok, opts} <- NimbleOptions.validate(opts, @side_load_opts_schema) do
SideLoad.side_load(data, query, opts)
{:ok, action} <- get_action(query.resource, opts, :read),
{:ok, opts} <- NimbleOptions.validate(opts, @load_opts_schema) do
Read.run(query, action, Keyword.put(opts, :initial_data, data))
else
{:error, error} ->
{:error, error}
@ -404,7 +406,7 @@ defmodule Ash.Api do
@doc false
@spec create!(Ash.api(), Ash.changeset(), Keyword.t()) ::
Ash.record() | {:error, Ash.error()}
Ash.record() | no_return
def create!(api, changeset, opts) do
opts = NimbleOptions.validate!(opts, @create_opts_schema)
@ -415,7 +417,7 @@ defmodule Ash.Api do
@doc false
@spec create(Ash.api(), Ash.changeset(), Keyword.t()) ::
{:ok, Ash.resource()} | {:error, Ash.error()}
{:ok, Ash.record()} | {:error, Ash.error()}
def create(api, changeset, opts) do
with {:ok, opts} <- NimbleOptions.validate(opts, @create_opts_schema),
{:ok, resource} <- Ash.Api.resource(api, changeset.resource),

View file

@ -36,13 +36,13 @@ defmodule Ash.Api.Interface do
end
@impl true
def side_load!(data, query, opts \\ []) do
Api.side_load!(__MODULE__, data, query, opts)
def load!(data, query, opts \\ []) do
Api.load!(__MODULE__, data, query, opts)
end
@impl true
def side_load(data, query, opts \\ []) do
case Api.side_load(__MODULE__, data, query, opts) do
def load(data, query, opts \\ []) do
case Api.load(__MODULE__, data, query, opts) do
{:ok, results} -> {:ok, results}
{:error, error} -> {:error, List.wrap(error)}
end

View file

@ -9,7 +9,7 @@ defmodule Ash.DataLayer do
"""
@type feature() ::
:transact
| {:aggregate, Ash.aggregate_type()}
| {:aggregate, Ash.aggregate_kind()}
| :aggregate_filter
| :aggregate_sort
| :boolean_filter

View file

@ -61,8 +61,12 @@ defmodule Ash.Dsl.Entity do
opts,
nested_entities
) do
with {:ok, opts} <- NimbleOptions.validate(opts, schema),
opts <- Keyword.merge(opts, auto_set_fields || []),
{before_validate_auto, after_validate_auto} =
Keyword.split(auto_set_fields || [], Keyword.keys(schema))
with {:ok, opts} <-
NimbleOptions.validate(Keyword.merge(opts, before_validate_auto), schema),
opts <- Keyword.merge(opts, after_validate_auto),
built <- struct(target, opts),
built <- struct(built, nested_entities),
{:ok, built} <-

View file

@ -18,7 +18,6 @@ defmodule Ash.Dsl.Transformer do
@callback transform(module, map) :: {:ok, map} | {:error, term} | :halt
@callback before?(module) :: boolean
@callback after?(module) :: boolean
@callback compile_time_only? :: boolean
defmacro __using__(_) do
quote do
@ -26,29 +25,8 @@ defmodule Ash.Dsl.Transformer do
def before?(_), do: false
def after?(_), do: false
def compile_time_only?, do: false
defoverridable before?: 1, after?: 1, compile_time_only?: 0
end
end
def wait_for_transformer(resource, transformers, timeout \\ :timer.seconds(15), wait \\ 50)
def wait_for_transformer(resource, transformers, 0, _) do
raise "Timed out waiting for #{inspect(transformers)} for #{inspect(resource)}"
end
def wait_for_transformer(resource, transformers, timeout, wait) do
transformers_run = :persistent_term.get({resource, :ash, :transformers}, [])
transformers = List.wrap(transformers)
if Enum.all?(transformers, &(&1 in transformers_run)) do
:ok
else
wait = min(timeout, wait)
:timer.sleep(wait)
wait_for_transformer(resource, transformers, timeout - wait, wait + 50)
defoverridable before?: 1, after?: 1
end
end

View file

@ -0,0 +1,16 @@
defmodule Ash.Error.Query.InvalidLoad do
@moduledoc "Used when an invalid load is provided"
use Ash.Error
def_ash_error([:load], class: :invalid)
defimpl Ash.ErrorKind do
def id(_), do: Ecto.UUID.generate()
def code(_), do: "invalid_load"
def message(%{load: load}) do
"#{inspect(load)} is not a valid load"
end
end
end

View file

@ -32,6 +32,8 @@ defmodule Ash.Filter do
defstruct [:resource, :expression]
@type t :: %__MODULE__{}
def parse!(resource, statement, aggregates \\ %{}) do
case parse(resource, statement, aggregates) do
{:ok, filter} ->

View file

@ -123,8 +123,11 @@ defmodule Ash.Filter.Runtime do
defp get_related(records, [key | rest]) when is_list(records) do
Enum.flat_map(records, fn record ->
case Map.get(record, key) do
%Ecto.Association.NotLoaded{} -> []
value -> get_related(value, rest)
%Ash.NotLoaded{type: :relationship} ->
[]
value ->
get_related(value, rest)
end
end)
end
@ -133,7 +136,7 @@ defmodule Ash.Filter.Runtime do
Enum.all?(records, &loaded?(&1, path))
end
defp loaded?(%Ecto.Association.NotLoaded{}, _), do: false
defp loaded?(%Ash.NotLoaded{}, _), do: false
defp loaded?(_, []), do: true

12
lib/ash/not_loaded.ex Normal file
View file

@ -0,0 +1,12 @@
defmodule Ash.NotLoaded do
@moduledoc "Used when an aggregate or relationship hasn't been loaded."
defstruct [:field, :type]
defimpl Inspect do
import Inspect.Algebra
def inspect(not_loaded, opts) do
concat(["#Ash.NotLoaded<", to_doc(not_loaded.type, opts), ">"])
end
end
end

View file

@ -7,11 +7,17 @@ defmodule Ash.Query.Aggregate do
:query,
:kind,
:type,
:authorization_filter
:authorization_filter,
:load
]
@kinds [:count]
@type t :: %__MODULE__{}
@type type :: :count
@type kind :: unquote(Enum.reduce(@kinds, &{:|, [], [&1, &2]}))
@doc false
def kinds, do: @kinds
alias Ash.Actions.SideLoad
alias Ash.Engine.Request
@ -55,10 +61,68 @@ defmodule Ash.Query.Aggregate do
end
end
defp kind_to_type(:count), do: {:ok, Ash.Type.Integer}
defp kind_to_type(kind), do: {:error, "Invalid aggregate kind: #{kind}"}
@doc false
def kind_to_type(:count), do: {:ok, Ash.Type.Integer}
def kind_to_type(kind), do: {:error, "Invalid aggregate kind: #{kind}"}
def requests(initial_query, authorizing?) do
# def requests_with_initial_data(query, authorizing?) do
# query.aggregates
# |> Map.values()
# |> Enum.group_by(& &1.relationship_path)
# |> Enum.reduce({[], []}, fn {relationship_path, aggregates},
# {auth_requests, value_requests} ->
# related = Ash.Resource.related(initial_query.resource, relationship_path)
# relationship =
# Ash.Resource.relationship(
# initial_query.resource,
# List.first(relationship_path)
# )
# remaining_path = List.delete_at(relationship_path, 0)
# reverse_relationship =
# case SideLoad.reverse_relationship_path(relationship, remaining_path) do
# :error ->
# nil
# {:ok, reverse_relationship} ->
# reverse_relationship
# end
# auth_request =
# if authorizing? do
# auth_request(related, initial_query, reverse_relationship, relationship_path)
# else
# nil
# end
# new_auth_requests =
# if auth_request do
# [auth_request | auth_requests]
# else
# auth_requests
# end
# if reverse_relationship do
# request =
# value_request(
# initial_query,
# related,
# reverse_relationship,
# relationship_path,
# aggregates,
# auth_request
# )
# {new_auth_requests, [request | value_requests]}
# else
# raise "Unimplemented"
# end
# end)
# end
def requests(initial_query, can_be_in_query?, authorizing?) do
initial_query.aggregates
|> Map.values()
|> Enum.group_by(& &1.relationship_path)
@ -77,26 +141,17 @@ defmodule Ash.Query.Aggregate do
{in_query?, reverse_relationship} =
case SideLoad.reverse_relationship_path(relationship, remaining_path) do
:error ->
{true, nil}
{can_be_in_query?, nil}
{:ok, reverse_relationship} ->
{any_aggregate_matching_path_used_in_query?(initial_query, relationship_path),
{can_be_in_query? &&
any_aggregate_matching_path_used_in_query?(initial_query, relationship_path),
reverse_relationship}
end
auth_request =
if authorizing? do
Request.new(
resource: related,
api: initial_query.api,
async?: false,
query: aggregate_query(related, reverse_relationship),
path: [:aggregate, relationship_path],
strict_check_only?: true,
action: Ash.Resource.primary_action!(related, :read),
name: "authorize aggregate: #{Enum.join(relationship_path, ".")}",
data: []
)
auth_request(related, initial_query, reverse_relationship, relationship_path)
else
nil
end
@ -126,6 +181,20 @@ defmodule Ash.Query.Aggregate do
end)
end
defp auth_request(related, initial_query, reverse_relationship, relationship_path) do
Request.new(
resource: related,
api: initial_query.api,
async?: false,
query: aggregate_query(related, reverse_relationship),
path: [:aggregate, relationship_path],
strict_check_only?: true,
action: Ash.Resource.primary_action!(related, :read),
name: "authorize aggregate: #{Enum.join(relationship_path, ".")}",
data: []
)
end
defp value_request(
initial_query,
related,
@ -154,13 +223,15 @@ defmodule Ash.Query.Aggregate do
Request.resolve(
deps,
fn data ->
if data.data.data == [] do
records = get_in(data, [:data, :data])
if records == [] do
{:ok, %{}}
else
initial_query = Ash.Query.unset(initial_query, [:filter, :sort, :aggregates])
query =
case data.data.data do
case records do
[record] ->
Ash.Query.filter(
initial_query,
@ -195,12 +266,27 @@ defmodule Ash.Query.Aggregate do
data_layer_query,
query.resource
) do
loaded_aggregates =
aggregates
|> Enum.map(& &1.load)
|> Enum.reject(&is_nil/1)
all_aggregates = Enum.map(aggregates, & &1.name)
aggregate_values =
Enum.reduce(results, %{}, fn result, acc ->
loaded_aggregate_values = Map.take(result, loaded_aggregates)
all_aggregate_values =
result.aggregates
|> Kernel.||(%{})
|> Map.take(all_aggregates)
|> Map.merge(loaded_aggregate_values)
Map.put(
acc,
Map.take(result, pkey),
Map.take(result.aggregates || %{}, Enum.map(aggregates, & &1.name))
all_aggregate_values
)
end)
@ -235,9 +321,13 @@ defmodule Ash.Query.Aggregate do
fn data ->
data_query = data.data.query
if reverse_relationship do
filter = Ash.Filter.put_at_path(data_query.filter, reverse_relationship)
{:ok, Ash.Query.filter(resource, filter)}
else
{:ok, data_query}
end
end
)
end

View file

@ -70,6 +70,120 @@ defmodule Ash.Query do
|> set_data_layer_query()
end
@spec load(t(), atom | list(atom)) :: t()
def load(query, fields) when not is_list(fields) do
load(query, List.wrap(fields))
end
def load(query, fields) do
query = to_query(query)
Enum.reduce(fields, query, fn
{field, rest}, query ->
side_load(query, [{field, rest}])
field, query ->
do_load(query, field)
end)
end
defp do_load(query, field) do
cond do
Ash.Resource.attribute(query.resource, field) ->
query
Ash.Resource.relationship(query.resource, field) ->
side_load(query, field)
aggregate = Ash.Resource.aggregate(query.resource, field) ->
with %{valid?: true} = aggregate_query <-
build(query.resource, filter: aggregate.filter),
{:ok, query_aggregate} <-
Aggregate.new(
query.resource,
aggregate.name,
aggregate.kind,
aggregate.relationship_path,
aggregate_query
) do
query_aggregate = %{query_aggregate | load: field}
new_aggregates = Map.put(query.aggregates, aggregate.name, query_aggregate)
%{query | aggregates: new_aggregates}
else
%{errors: errors} ->
add_error(query, :aggregates, Ash.Error.to_ash_error(errors))
{:error, error} ->
add_error(query, :aggregates, Ash.Error.to_ash_error(error))
end
end
end
def unload(query, fields) do
query = to_query(query)
Enum.reduce(fields, query, fn field, query ->
case field do
{field, rest} ->
new_side_loads = do_unload_side_load(query.side_load, {field, rest})
%{query | side_load: new_side_loads}
field ->
do_unload(query, field)
end
end)
end
defp do_unload(query, field) do
cond do
Ash.Resource.attribute(query.resource, field) ->
query
Ash.Resource.relationship(query.resource, field) ->
%{query | side_load: Keyword.delete(query.side_load, field)}
Ash.Resource.aggregate(query.resource, field) ->
new_aggregates =
Enum.reduce(query.aggregates, %{}, fn
{_field, %{load: ^field}}, acc ->
acc
{field, aggregate}, acc ->
Map.put(acc, field, aggregate)
end)
%{query | aggregates: new_aggregates}
end
end
defp do_unload_side_load(%__MODULE__{} = query, unload) do
%{query | side_load: do_unload_side_load(query.side_load, unload)}
end
defp do_unload_side_load(side_loads, {field, rest}) do
Enum.reduce(side_loads, [], fn
^field, acc ->
acc
{^field, value}, acc ->
new_value =
rest
|> List.wrap()
|> Enum.reduce(value, &do_unload_side_load(&2, &1))
[{field, new_value} | acc]
value, acc ->
[value | acc]
end)
|> Enum.reverse()
end
defp do_unload_side_load(side_loads, field) do
do_unload_side_load(side_loads, {field, []})
end
@spec build(Ash.resource(), Ash.api() | nil, Keyword.t()) :: t()
def build(resource, api \\ nil, keyword) do
Enum.reduce(keyword, new(resource, api), fn
@ -111,8 +225,13 @@ defmodule Ash.Query do
either a filter or a keyword list of options to supply to build a limiting query for that aggregate.
However, currently only filters are accepted.
"""
@spec aggregate(Ash.query(), atom(), Ash.aggregate_type(), atom | list(atom), Ash.query() | nil) ::
Ash.Query.t()
@spec aggregate(
t() | Ash.resource(),
atom(),
Ash.aggregate_kind(),
atom | list(atom),
Ash.query() | nil
) :: t()
def aggregate(query, name, type, relationship, agg_query \\ nil) do
query = to_query(query)
relationship = List.wrap(relationship)
@ -149,7 +268,8 @@ defmodule Ash.Query do
end
@doc "Limit the results returned from the query"
def limit(query, nil), do: query
@spec limit(t() | Ash.resource(), nil | integer()) :: t()
def limit(query, nil), do: to_query(query)
def limit(query, limit) when is_integer(limit) do
query
@ -163,7 +283,8 @@ defmodule Ash.Query do
end
@doc "Skip the first n records"
def offset(query, nil), do: query
@spec offset(t() | Ash.resource(), nil | integer()) :: t()
def offset(query, nil), do: to_query(query)
def offset(query, offset) when is_integer(offset) do
query
@ -179,6 +300,7 @@ defmodule Ash.Query do
end
@doc "Side loads related entities"
@spec side_load(t() | Ash.resource(), Ash.side_loads()) :: t()
def side_load(query, statement) do
query = to_query(query)
@ -248,6 +370,7 @@ defmodule Ash.Query do
end)
end
@spec filter(t() | Ash.resource(), nil | false | Ash.filter() | Keyword.t()) :: t()
def filter(query, nil), do: to_query(query)
def filter(query, %Ash.Filter{} = filter) do
@ -292,6 +415,7 @@ defmodule Ash.Query do
end
end
@spec sort(t() | Ash.resource(), Ash.sort()) :: t()
def sort(query, sorts) do
query = to_query(query)
@ -308,11 +432,12 @@ defmodule Ash.Query do
|> set_data_layer_query()
end
@spec unset(Ash.resource() | t(), atom | [atom]) :: t()
def unset(query, keys) when is_list(keys) do
keys
|> Enum.reduce(query, fn key, query ->
if key in [:api, :resource] do
query
to_query(query)
else
query
|> to_query()
@ -324,7 +449,7 @@ defmodule Ash.Query do
def unset(query, key) do
if key in [:api, :resource] do
query
to_query(query)
else
query
|> to_query()

View file

@ -2,7 +2,7 @@ defmodule Ash.Resource do
@moduledoc """
A resource is a static definition of an entity in your system.
Resource DSL documentation: `Ash.Dsl`
Resource DSL documentation: `Ash.Resource.Dsl`
"""
alias Ash.Dsl.Extension
@ -17,9 +17,9 @@ defmodule Ash.Resource do
extensions =
if data_layer && Ash.implements_behaviour?(data_layer, Ash.Dsl.Extension) do
[data_layer, Ash.Dsl]
[data_layer, Ash.Resource.Dsl]
else
[Ash.Dsl]
[Ash.Resource.Dsl]
end
authorizer_extensions =
@ -137,6 +137,17 @@ defmodule Ash.Resource do
|> Enum.find(&(&1.name == relationship_name))
end
@spec aggregates(Ash.resource()) :: list(Ash.relationship())
def aggregates(resource) do
Extension.get_entities(resource, [:aggregates])
end
def aggregate(resource, name) do
resource
|> aggregates()
|> Enum.find(&(&1.name == name))
end
@doc "Returns the primary action of the given type"
@spec primary_action!(Ash.resource(), Ash.action_type()) :: Ash.action() | no_return
def primary_action!(resource, type) do

View file

@ -0,0 +1,40 @@
defmodule Ash.Resource.Aggregate do
@moduledoc "Represents a named aggregate on the resource that can be loaded"
defstruct [:name, :relationship_path, :filter, :kind, :query]
@schema [
name: [
type: :atom,
doc: "The field to place the aggregate in",
required: true
],
relationship_path: [
type: {:custom, __MODULE__, :relationship_path, []},
doc: "The relationship or relationship path to use for the aggregate",
required: true
],
kind: [
type: {:one_of, Ash.Query.Aggregate.kinds()},
doc: "The kind of the aggregate",
required: true
],
filter: [
type: :keyword_list,
doc: "A filter to apply to the aggregate",
default: []
]
]
@doc false
def schema, do: @schema
def relationship_path(value) do
value = List.wrap(value)
if Enum.all?(value, &is_atom/1) do
{:ok, value}
else
{:error, "relationship path must be atoms"}
end
end
end

View file

@ -1,5 +1,5 @@
defmodule Ash.Resource.Attribute do
@moduledoc false
@moduledoc "Represents an attribute on a resource"
defstruct [
:name,

View file

@ -1,4 +1,4 @@
defmodule Ash.Dsl do
defmodule Ash.Resource.Dsl do
@moduledoc """
The built in resource DSL. The four core DSL components of a resource are:
@ -282,7 +282,34 @@ defmodule Ash.Dsl do
]
}
@sections [@attributes, @relationships, @actions, @resource, @validations]
@count %Ash.Dsl.Entity{
name: :count,
describe: """
Declares a named aggregate on the resource
""",
examples: [
"count :assigned_ticket_count, :reported_tickets, filter: [active: true]"
],
target: Ash.Resource.Aggregate,
args: [:name, :relationship_path],
schema: Ash.Resource.Aggregate.schema(),
auto_set_fields: [kind: :count]
}
@aggregates %Ash.Dsl.Section{
name: :aggregates,
describe: """
Declare named aggregates on the resource.
These are aggregates that can be fetched only by name using `Ash.Query.fetch/2`.
They are also available as top level fields on the resource.
""",
entities: [
@count
]
}
@sections [@attributes, @relationships, @actions, @resource, @validations, @aggregates]
@transformers [
Ash.Resource.Transformers.SetRelationshipSource,
@ -291,6 +318,7 @@ defmodule Ash.Dsl do
Ash.Resource.Transformers.CreateJoinRelationship,
Ash.Resource.Transformers.CachePrimaryKey,
Ash.Resource.Transformers.SetPrimaryActions
# Ash.Resource.Transformers.SetAggregateQueries
]
use Ash.Dsl.Extension,

View file

@ -8,6 +8,7 @@ defmodule Ash.Schema do
defmacro define_schema do
quote unquote: false do
alias Ash.Query.Aggregate
use Ecto.Schema
@primary_key false
@ -22,6 +23,13 @@ defmodule Ash.Schema do
end
field(:aggregates, :map, virtual: true, default: %{})
for aggregate <- Ash.Resource.aggregates(__MODULE__) do
{:ok, type} = Aggregate.kind_to_type(aggregate.kind)
field(aggregate.name, Ash.Type.ecto_type(type), virtual: true)
end
relationships = Ash.Resource.relationships(__MODULE__)
for relationship <- Enum.filter(relationships, &(&1.type == :belongs_to)) do
@ -55,6 +63,21 @@ defmodule Ash.Schema do
]
)
end
for relationship <- relationships do
new_struct_fields =
Enum.reject(@struct_fields, fn {name, _} -> name == relationship.name end) ++
[{relationship.name, %Ash.NotLoaded{field: relationship.name, type: :relationship}}]
Module.delete_attribute(__MODULE__, :struct_fields)
Module.register_attribute(__MODULE__, :struct_fields, accumulate: true)
Enum.each(
Enum.reverse(new_struct_fields),
&Module.put_attribute(__MODULE__, :struct_fields, &1)
)
end
end
end
end

View file

@ -7,7 +7,7 @@ defmodule Ash.Resource.Transformers.BelongsToAttribute do
alias Ash.Dsl.Transformer
alias Ash.Error.Dsl.DslError
@extension Ash.Dsl
@extension Ash.Resource.Dsl
def transform(_resource, dsl_state) do
dsl_state

View file

@ -9,7 +9,7 @@ defmodule Ash.Resource.Transformers.CreateJoinRelationship do
alias Ash.Dsl.Transformer
@extension Ash.Dsl
@extension Ash.Resource.Dsl
def transform(_resource, dsl_state) do
dsl_state

View file

@ -44,7 +44,7 @@ defmodule Mix.Tasks.Ash.Formatter do
contents =
@formatter_exs_template
|> String.replace("__replace_me__", inspect(locals_without_parens))
|> String.replace("__replace_me__", inspect(locals_without_parens, limit: :infinity))
|> Code.format_string!()
contents_with_newline = [contents, "\n"]

View file

@ -63,8 +63,8 @@ defmodule Ash.MixProject do
Ash.Dsl.Extension,
Ash.Dsl.Section
],
"resource dsl": ~r/Ash.Resource.Dsl/,
"resource dsl transformers": ~r/Ash.Resource.Transformers/,
"resource dsl": ~r/Ash.Dsl/,
"api dsl transformers": ~r/Ash.Api.Transformers/,
"api dsl": ~r/Ash.Api.Dsl/,
"filter predicates": ~r/Ash.Filter.Predicate/,
@ -113,7 +113,7 @@ defmodule Ash.MixProject do
sobelow: "sobelow --skip",
credo: "credo --strict",
"ash.formatter":
"ash.formatter --extensions Ash.Dsl,Ash.Api.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia"
"ash.formatter --extensions Ash.Resource.Dsl,Ash.Api.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia"
]
end
end

View file

@ -468,7 +468,7 @@ defmodule Ash.Test.Actions.UpdateTest do
|> replace_relationship(:author, author2)
|> Api.update!()
assert Api.get!(Author, author2.id, side_load: [:posts]).posts == [Api.get!(Post, post.id)]
assert Api.get!(Author, author2.id, load: [:posts]).posts == [Api.get!(Post, post.id)]
end
test "it responds with the relationship field filled in" do