From f8fadc67e67c955bb68b3a8d642be13e2b7e8ca9 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Sun, 11 Oct 2020 23:55:47 -0400 Subject: [PATCH] feat: Add pagination (#131) --- .formatter.exs | 1 + coveralls.json | 10 + documentation/topics/pagination.md | 61 +++ lib/ash.ex | 1 + lib/ash/actions/read.ex | 372 ++++++++++++++-- lib/ash/actions/relationships.ex | 9 +- lib/ash/actions/side_load.ex | 6 +- lib/ash/actions/sort.ex | 27 ++ lib/ash/api/api.ex | 279 +++++++++++- lib/ash/api/interface.ex | 27 ++ lib/ash/data_layer/data_layer.ex | 47 ++ lib/ash/data_layer/ets.ex | 21 +- lib/ash/engine/request.ex | 3 +- lib/ash/error/invalid/limit_required.ex | 20 + lib/ash/error/invalid/multiple_results.ex | 9 +- lib/ash/error/invalid/pagination_required.ex | 20 + lib/ash/filter/filter.ex | 10 +- lib/ash/page/keyset.ex | 173 ++++++++ lib/ash/page/offset.ex | 21 + lib/ash/query/operator/not_eq.ex | 42 ++ lib/ash/query/query.ex | 16 + lib/ash/resource/actions/read.ex | 75 +++- lib/ash/resource/dsl.ex | 7 +- lib/ash/resource/schema.ex | 1 + .../transformers/countable_actions.ex | 34 ++ mix.exs | 6 +- test/actions/pagination_test.exs | 402 ++++++++++++++++++ test/ash/data_layer/ets_test.exs | 3 +- 28 files changed, 1623 insertions(+), 80 deletions(-) create mode 100644 coveralls.json create mode 100644 documentation/topics/pagination.md create mode 100644 lib/ash/error/invalid/limit_required.ex create mode 100644 lib/ash/error/invalid/pagination_required.ex create mode 100644 lib/ash/page/keyset.ex create mode 100644 lib/ash/page/offset.ex create mode 100644 lib/ash/query/operator/not_eq.ex create mode 100644 lib/ash/resource/transformers/countable_actions.ex create mode 100644 test/actions/pagination_test.exs diff --git a/.formatter.exs b/.formatter.exs index 19654cd8..08cc622e 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -44,6 +44,7 @@ locals_without_parens = [ many_to_many: 2, many_to_many: 3, on: 1, + pagination: 1, primary?: 1, primary_key?: 1, private?: 1, diff --git a/coveralls.json b/coveralls.json new file mode 100644 index 00000000..e7ca68b5 --- /dev/null +++ b/coveralls.json @@ -0,0 +1,10 @@ +{ + "skip_files": [ + "lib/ash/dsl", + "lib/ash/api/dsl.ex", + "lib/ash/api/interface.ex", + "lib/ash/error", + "lib/ash/resource/dsl.ex", + "lib/mix/tasks" + ] +} diff --git a/documentation/topics/pagination.md b/documentation/topics/pagination.md new file mode 100644 index 00000000..71818ad9 --- /dev/null +++ b/documentation/topics/pagination.md @@ -0,0 +1,61 @@ +# Pagination + +Pagination is configured at the action level. There are two kinds of pagination supported: `keyset` and `offset`. There are +pros and cons to each. An action can support both at the same time, or only one (or none). A full count of records can be +requested by passing `page: [count: true]`, but it should be kept in mind that doing this requires running the same query +twice, one of which is a count of all records. Ash does these in parallel, but it can still be quite expensive on large +datasets. For more information on the options for configuring actions to support pagination, see the pagination section in `Ash.Resource.Dsl.Read.read/2` + +## Offset Pagination + +Offset pagination is done via providing a `limit` and an `offset`. A `limit` is how many records that should be returned on the page. +An `offset` is how many records from the beginning should be skipped. Using this, you might make requests like the following: + +```elixir +# Get the first ten records +Api.read(Resource, page: [limit: 10]) +# Get the second ten records +Api.read(Resource, page: [limit: 10, offset: 10]) +# No need to do this in practice, see `c:Ash.Api.page/2` +``` + +### Offset Pros + +- Simple to think about +- Possible to skip to a page by number. E.g the 5th page of 10 records is `offset: 40` +- Easy to reason about what page you are currently on (if the total number of records is requested) +- Can go to the last page (even though, if done by using the full count, the data could have changed) + +### Offset Cons + +- Does not perform well on large datasets +- When moving between pages, if data was created or deleted, records may appear on multiple pages or be skipped + +## Keyset Pagination + +Keyset pagination is done via providing an `after` or `before` option, as well as a `limit`. The value of this option should be +a `keyset` that has been returned from a previous request. Keysets are returned when a request is made with a `limit` to an action +that supports `keyset` pagination, and they are stored in the `metadata` key of each record. The `keyset` is a special value that +can be passed into the `after` or `before` options, to get records that occur after or before. + +For example: + +```elixir +page = Api.read(Resource, page: [limit: 10]) + +last_record = List.last(page.results) + +# No need to do this in practice, see `c:Ash.Api.page/2` +next_page = Api.read(Resource, page: [limit: 10, after: last_record.metadata.keyset]) +``` + +### Keyset Pros + +- Performs very well on large datasets (assuming indices exist on the columns being sorted on) +- Behaves well as data changes. The record specified will always be the first or last item in the page + +### Keyset Cons + +- A bit more complex +- Can't go to a specific page number +- Going to the last page requires diff --git a/lib/ash.ex b/lib/ash.ex index f74babf4..bb197806 100644 --- a/lib/ash.ex +++ b/lib/ash.ex @@ -26,6 +26,7 @@ defmodule Ash do @type relationship_cardinality :: :many | :one @type resource :: module @type side_loads :: term + @type page :: Ash.Page.Keyset.t() | Ash.Page.Offset.t() @type sort :: list(atom | {atom, :asc} | {atom, :desc}) @type validation :: Ash.Resource.Validation.t() diff --git a/lib/ash/actions/read.ex b/lib/ash/actions/read.ex index e04ea057..40e72a07 100644 --- a/lib/ash/actions/read.ex +++ b/lib/ash/actions/read.ex @@ -3,22 +3,45 @@ defmodule Ash.Actions.Read do alias Ash.Actions.SideLoad alias Ash.Engine alias Ash.Engine.Request + alias Ash.Error.Invalid.{LimitRequired, PaginationRequired} alias Ash.Filter alias Ash.Query.Aggregate require Logger require Ash.Query + def unpaginated_read(query, action \\ nil, opts \\ []) do + action = action || Ash.Resource.primary_action!(query.resource, :read) + + if action.pagination do + opts = Keyword.put(opts, :page, false) + run(query, %{action | pagination: %{action.pagination | required?: false}}, opts) + else + run(query, action, opts) + end + end + def run(query, action, opts \\ []) do if Ash.Resource.data_layer_can?(query.resource, :read) do engine_opts = Keyword.take(opts, [:verbose?, :actor, :authorize?]) + original_query = query + + initial_offset = query.offset + initial_limit = query.limit with %{errors: []} = query <- query_with_initial_data(query, opts), %{errors: []} = query <- add_action_filters(query, action, engine_opts[:actor]), - {:ok, requests} <- requests(query, action, opts), + {:ok, filter_requests} <- filter_requests(query, opts), + {:ok, query, page_opts, count_request} <- + paginate(query, action, filter_requests, initial_offset, initial_limit, opts), + {:ok, requests} <- requests(query, action, filter_requests, opts), side_load_requests <- SideLoad.requests(query), %{data: %{data: data} = all_data, errors: []} <- - Engine.run(requests ++ side_load_requests, query.api, engine_opts), + Engine.run( + requests ++ side_load_requests ++ List.wrap(count_request), + query.api, + engine_opts + ), data_with_side_loads <- SideLoad.attach_side_loads(data, all_data), data_with_aggregates <- add_aggregate_values( @@ -27,7 +50,19 @@ defmodule Ash.Actions.Read do query.resource, Map.get(all_data, :aggregate_values, %{}) ) do - {:ok, data_with_aggregates} + if opts[:page] do + {:ok, + to_page( + data_with_aggregates, + action, + Map.get(all_data, :count), + query.sort, + original_query, + Keyword.put(opts, :page, page_opts) + )} + else + {:ok, data_with_aggregates} + end else %{errors: errors} -> {:error, Ash.Error.to_ash_error(errors)} @@ -40,6 +75,42 @@ defmodule Ash.Actions.Read do end end + defp to_page(data, action, count, sort, original_query, opts) do + page_opts = opts[:page] + + if page_opts[:offset] do + if action.pagination.keyset? do + data + |> Ash.Page.Keyset.data_with_keyset(sort) + |> Ash.Page.Offset.new(count, original_query, opts) + else + Ash.Page.Offset.new(data, count, original_query, opts) + end + else + cond do + action.pagination.offset? && action.pagination.keyset? -> + data + |> Ash.Page.Keyset.data_with_keyset(sort) + |> Ash.Page.Offset.new(count, original_query, opts) + + action.pagination.offset? -> + Ash.Page.Offset.new(data, count, original_query, opts) + + true -> + Ash.Page.Keyset.new(data, count, sort, original_query, opts) + end + end + end + + defp filter_requests(query, opts) do + if not Keyword.has_key?(opts, :initial_data) && + (Keyword.has_key?(opts, :actor) || opts[:authorize?]) do + Filter.read_requests(query.api, query.filter) + else + {:ok, []} + end + end + defp add_action_filters(query, %{filter: nil}, _actor), do: query defp add_action_filters(query, action, actor) do @@ -82,47 +153,46 @@ defmodule Ash.Actions.Read do end end - defp requests(query, action, opts) do - filter_requests = - if not Keyword.has_key?(opts, :initial_data) && - (Keyword.has_key?(opts, :actor) || opts[:authorize?]) do - Filter.read_requests(query.api, query.filter) - else - {:ok, []} - end - + defp requests(query, action, filter_requests, opts) do authorizing? = Keyword.has_key?(opts, :actor) || opts[:authorize?] can_be_in_query? = not Keyword.has_key?(opts, :initial_data) {aggregate_auth_requests, aggregate_value_requests, aggregates_in_query} = Aggregate.requests(query, can_be_in_query?, authorizing?) - case filter_requests do - {:ok, filter_requests} -> - request = - Request.new( - resource: query.resource, - api: query.api, - query: query, - action: action, - authorize?: not Keyword.has_key?(opts, :initial_data), - data: - data_field( - opts, - filter_requests, - aggregate_auth_requests, - aggregates_in_query, - query - ), - path: [:data], - name: "#{action.type} - `#{action.name}`" - ) + request = + Request.new( + resource: query.resource, + api: query.api, + query: + Request.resolve([], fn _ -> + case Filter.run_other_data_layer_filters( + query.api, + query.resource, + query.filter + ) do + {:ok, filter} -> + {:ok, %{query | filter: filter}} - {:ok, [request | filter_requests] ++ aggregate_auth_requests ++ aggregate_value_requests} + {:error, error} -> + {:error, error} + end + end), + action: action, + authorize?: not Keyword.has_key?(opts, :initial_data), + data: + data_field( + opts, + filter_requests, + aggregate_auth_requests, + aggregates_in_query, + query + ), + path: [:data], + name: "#{action.type} - `#{action.name}`" + ) - {:error, error} -> - {:error, error} - end + {:ok, [request | filter_requests] ++ aggregate_auth_requests ++ aggregate_value_requests} end defp data_field( @@ -157,12 +227,6 @@ defmodule Ash.Actions.Read do with {:ok, filter} <- filter_with_related(relationship_filter_paths, ash_query, data), - {:ok, filter} <- - Filter.run_other_data_layer_filters( - ash_query.api, - ash_query.resource, - filter - ), {:ok, query} <- add_aggregates( query, @@ -182,6 +246,194 @@ defmodule Ash.Actions.Read do end end + defp paginate(starting_query, action, filter_requests, initial_offset, initial_limit, opts) do + if action.pagination == false do + {:ok, starting_query, opts[:page], nil} + else + case opts[:page] do + false -> + if action.pagination.required? do + {:error, PaginationRequired.exception([])} + else + {:ok, starting_query, false, nil} + end + + nil -> + if action.pagination.default_limit do + paginate( + starting_query, + action, + filter_requests, + initial_offset, + initial_limit, + Keyword.put(opts, :page, limit: action.pagination.default_limit) + ) + else + {:error, LimitRequired.exception([])} + end + + page_params -> + case do_paginate(starting_query, action.pagination, opts) do + {:ok, query} -> + count_request = + count_request( + starting_query, + action, + filter_requests, + initial_offset, + initial_limit, + opts + ) + + {:ok, query, page_params, count_request} + + {:error, error} -> + {:error, error} + end + end + end + end + + defp do_paginate(query, pagination, opts) do + cond do + opts[:page][:before] || opts[:page][:after] -> + keyset_pagination(query, opts[:page]) + + opts[:page][:offset] -> + limit_offset_pagination(query, opts[:page]) + + pagination.offset? && pagination.keyset? -> + keyset_pagination(query, opts[:page]) + + pagination.offset? -> + limit_offset_pagination(query, opts[:page]) + + true -> + keyset_pagination(query, opts[:page]) + end + end + + defp keyset_pagination(query, opts) do + sorted = + if Ash.Actions.Sort.sorting_on_identity?(query) do + query + else + Ash.Query.sort(query, Ash.Resource.primary_key(query.resource)) + end + + limited = + cond do + opts[:limit] && sorted.limit -> + Ash.Query.limit(sorted, min(opts[:limit], sorted.limit)) + + opts[:limit] -> + Ash.Query.limit(sorted, opts[:limit]) + + true -> + sorted + end + + if opts[:before] || opts[:after] do + after_or_before = + if opts[:before] do + :before + else + :after + end + + case Ash.Page.Keyset.filter( + opts[:before] || opts[:after], + sorted.sort, + after_or_before + ) do + {:ok, filter} -> + {:ok, Ash.Query.filter(limited, ^filter)} + + {:error, error} -> + {:error, error} + end + else + {:ok, limited} + end + end + + defp limit_offset_pagination(query, opts) do + limited = + cond do + opts[:limit] && query.limit -> + Ash.Query.limit(query, min(opts[:limit], query.limit)) + + opts[:limit] -> + Ash.Query.limit(query, opts[:limit]) + + true -> + query + end + + with_offset = + cond do + opts[:offset] && query.offset -> + Ash.Query.offset(limited, max(opts[:offset], query.offset)) + + opts[:offset] -> + Ash.Query.offset(limited, opts[:offset]) + + true -> + limited + end + + {:ok, with_offset} + end + + defp count_request(_, %{pagination: %{countable: false}}, _, _, _, _), do: nil + + defp count_request(initial_query, action, filter_requests, initial_offset, initial_limit, opts) do + if opts[:page][:count] == true || + (opts[:page][:count] != false and action.pagination.countable == :by_default) do + relationship_filter_paths = + Enum.map(filter_requests, fn request -> + request.path ++ [:authorization_filter] + end) + + Request.new( + resource: initial_query.resource, + api: initial_query.api, + query: initial_query, + action: action, + authorize?: false, + data: + Request.resolve( + [[:data, :authorization_filter]] ++ relationship_filter_paths, + fn %{ + data: %{ + authorization_filter: auth_filter + } + } = data -> + query = + initial_query + |> Ash.Query.unset([:filter, :aggregates, :sort, :limit, :offset]) + |> Ash.Query.limit(initial_limit) + |> Ash.Query.offset(initial_offset) + |> Ash.Query.filter(^auth_filter) + |> Map.get(:data_layer_query) + + with {:ok, filter} <- + filter_with_related(relationship_filter_paths, initial_query, data), + {:ok, query} <- + Ash.DataLayer.filter(query, filter, initial_query.resource), + {:ok, query} <- + Ash.DataLayer.sort(query, initial_query.sort, initial_query.resource), + {:ok, %{count: count}} <- run_count_query(initial_query, query) do + {:ok, count} + end + end + ), + path: [:count], + name: "#{action.type} - `#{action.name}`" + ) + end + end + defp run_query( %{ resource: destination_resource, @@ -207,6 +459,42 @@ defmodule Ash.Actions.Read do Ash.DataLayer.run_query(query, resource) end + defp run_count_query( + %{ + resource: destination_resource, + context: %{ + data_layer: %{lateral_join_source: {root_data, resource, source, destination}} + } + }, + query + ) do + case Ash.Query.Aggregate.new(destination_resource, :count, :count, [], nil) do + {:ok, aggregate} -> + Ash.DataLayer.run_aggregate_query_with_lateral_join( + query, + [aggregate], + root_data, + resource, + destination_resource, + source, + destination + ) + + {:error, error} -> + {:error, error} + end + end + + defp run_count_query(ash_query, query) do + case Ash.Query.Aggregate.new(ash_query.resource, :count, :count, [], nil) do + {:ok, aggregate} -> + Ash.DataLayer.run_aggregate_query(query, [aggregate], ash_query.resource) + + {:error, error} -> + {:error, error} + end + end + defp add_calculation_values(query, results, calculations) do calculations |> Enum.reduce_while({:ok, %{}}, fn {_name, calculation}, {:ok, calculation_results} -> diff --git a/lib/ash/actions/relationships.ex b/lib/ash/actions/relationships.ex index 306764c5..b339dcda 100644 --- a/lib/ash/actions/relationships.ex +++ b/lib/ash/actions/relationships.ex @@ -159,7 +159,7 @@ defmodule Ash.Actions.Relationships do if possible? do query = get_in(data, [:relationships, relationship_name, type, :query]) - case changeset.api.read(query) do + case Ash.Actions.Read.unpaginated_read(query) do {:ok, results} -> {:ok, add_changes_to_results(changeset.resource, results, identifiers)} @@ -703,7 +703,8 @@ defmodule Ash.Actions.Relationships do data: Request.resolve([[:relationships, relationship.name, :current, :query]], fn data -> query = get_in(data, [:relationships, relationship.name, :current, :query]) - changeset.api.read(query) + + Ash.Actions.Read.unpaginated_read(query) end), name: "Read related #{relationship.name} before replace" ) @@ -729,7 +730,7 @@ defmodule Ash.Actions.Relationships do data: Request.resolve([[:relationships, relationship.name, :current_join, :query]], fn data -> query = get_in(data, [:relationships, relationship.name, :current_join, :query]) - changeset.api.read(query) + Ash.Actions.Read.unpaginated_read(query) end), name: "Read related join for #{relationship.name} before replace" ) @@ -767,7 +768,7 @@ defmodule Ash.Actions.Relationships do ^name => %{current: %{query: query}} } } -> - api.read(query) + Ash.Actions.Read.unpaginated_read(query) end ), name: "Read related join for #{name} before replace" diff --git a/lib/ash/actions/side_load.ex b/lib/ash/actions/side_load.ex index c8b051c7..4d1451f3 100644 --- a/lib/ash/actions/side_load.ex +++ b/lib/ash/actions/side_load.ex @@ -452,18 +452,18 @@ defmodule Ash.Actions.SideLoad do relationship.destination_field} } }) - |> query.api.read() + |> Ash.Actions.Read.unpaginated_read() (query.limit || offset?) && relationship.type != :many_to_many -> artificial_limit_and_offset(query, relationship) true -> - query.api.read(query) + Ash.Actions.Read.unpaginated_read(query) end end defp artificial_limit_and_offset(query, relationship) do - case query.api.read(query) do + case Ash.Actions.Read.unpaginated_read(query) do {:ok, results} -> new_results = results diff --git a/lib/ash/actions/sort.ex b/lib/ash/actions/sort.ex index eec0ce30..29feae27 100644 --- a/lib/ash/actions/sort.ex +++ b/lib/ash/actions/sort.ex @@ -42,6 +42,33 @@ defmodule Ash.Actions.Sort do end end + def sorting_on_identity?(%{sort: nil}), do: false + + def sorting_on_identity?(query) do + identity_keys = + query.resource + |> Ash.Resource.identities() + |> Enum.map(& &1.keys) + + sort_fields = Keyword.keys(query.sort) + + Enum.any?([Ash.Resource.primary_key(query.resource) | identity_keys], fn keyset -> + Enum.all?(keyset, &(&1 in sort_fields)) + end) + end + + def reverse(sort) do + Enum.map(sort, fn {field, direction} -> + case direction do + :asc -> + {field, :desc} + + :desc -> + {field, :asc} + end + end) + end + defp aggregate_sort(aggregates, field, order, resource, sorts, errors) do aggregate = Map.get(aggregates, field) diff --git a/lib/ash/api/api.ex b/lib/ash/api/api.ex index f36592ff..e816bd57 100644 --- a/lib/ash/api/api.ex +++ b/lib/ash/api/api.ex @@ -25,10 +25,18 @@ defmodule Ash.Api do import Ash.OptionsHelpers, only: [merge_schemas: 3] alias Ash.Actions.{Create, Destroy, Read, Update} - alias Ash.Error.Invalid.{InvalidPrimaryKey, NoPrimaryAction, NoSuchAction, NoSuchResource} + + alias Ash.Error.Invalid.{ + InvalidPrimaryKey, + NoPrimaryAction, + NoSuchAction, + NoSuchResource + } require Ash.Query + @type page_request :: :next | :prev | :first | :last | integer + @global_opts [ verbose?: [ type: :boolean, @@ -52,10 +60,75 @@ defmodule Ash.Api do ] ] - @read_opts_schema merge_schemas([], @global_opts, "Global Options") + @read_opts_schema merge_schemas( + [ + page: [ + doc: + "Nested pagination options, see the section on pagination for more", + type: {:custom, __MODULE__, :page_opts, []} + ] + ], + @global_opts, + "Global Options" + ) + + @offset_page_opts [ + offset: [ + type: :non_neg_integer, + doc: "The number of records to skip from the beginning of the query" + ], + limit: [ + type: :pos_integer, + doc: "The number of records to include in the page" + ], + count: [ + type: :boolean, + doc: "Whether or not to return the page with a full count of all records" + ] + ] + + @keyset_page_opts [ + before: [ + type: :string, + doc: "Get records that appear before the provided keyset (mutually exclusive with `after`)" + ], + after: [ + type: :string, + doc: "Get records that appear after the provided keyset (mutually exclusive with `before`)" + ], + limit: [ + type: :pos_integer, + doc: "How many records to include in the page" + ], + count: [ + type: :boolean, + doc: "Whether or not to return the page with a full count of all records" + ] + ] @doc false - def read_opts_schema, do: @read_opts_schema + def page_opts(page_opts) do + if page_opts == false do + {:ok, false} + else + if page_opts[:after] || page_opts[:before] do + validate_or_error(page_opts, @keyset_page_opts) + else + if page_opts[:offset] do + validate_or_error(page_opts, @offset_page_opts) + else + validate_or_error(page_opts, @keyset_page_opts) + end + end + end + end + + defp validate_or_error(opts, schema) do + case NimbleOptions.validate(opts, schema) do + {:ok, value} -> {:ok, value} + {:error, error} -> {:error, Exception.message(error)} + end + end @load_opts_schema merge_schemas([], @global_opts, "Global Options") @@ -95,8 +168,6 @@ defmodule Ash.Api do @doc """ Get a record by a primary key. See `c:get/3` for more. - - #{NimbleOptions.docs(@get_opts_schema)} """ @callback get!(resource :: Ash.resource(), id_or_filter :: term(), params :: Keyword.t()) :: Ash.record() | no_return @@ -114,8 +185,6 @@ defmodule Ash.Api do @doc """ Run an ash query. See `c:read/2` for more. - - #{NimbleOptions.docs(@read_opts_schema)} """ @callback read!(Ash.query(), params :: Keyword.t()) :: list(Ash.resource()) | no_return @@ -126,14 +195,34 @@ defmodule Ash.Api do For more information, on building a query, see `Ash.Query`. #{NimbleOptions.docs(@read_opts_schema)} + + ## Pagination + + #### keyset pagination + #{NimbleOptions.docs(@offset_page_opts)} + + #### Limit/offset pagination + #{NimbleOptions.docs(@keyset_page_opts)} """ @callback read(Ash.query(), params :: Keyword.t()) :: {:ok, list(Ash.resource())} | {:error, Ash.error()} @doc """ - Load fields or relationships on already fetched records. See `c:load/2` for more information. + Fetch a page relative to the provided page. + """ + @callback page!(Ash.page(), page_request) :: + Ash.page() | no_return - #{NimbleOptions.docs(@load_opts_schema)} + @doc """ + Fetch a page relative to the provided page. + + A page is the return value of a paginated action called via `c:read/2`. + """ + @callback page(Ash.page(), page_request) :: + {:ok, Ash.page()} | {:error, Ash.error()} + + @doc """ + Load fields or relationships on already fetched records. See `c:load/2` for more information. """ @callback load!( record_or_records :: Ash.record() | [Ash.record()], @@ -160,8 +249,6 @@ defmodule Ash.Api do @doc """ Create a record. See `c:create/2` for more information. - - #{NimbleOptions.docs(@create_opts_schema)} """ @callback create!(Ash.changeset(), params :: Keyword.t()) :: Ash.record() | no_return @@ -176,8 +263,6 @@ defmodule Ash.Api do @doc """ Update a record. See `c:update/2` for more information. - - #{NimbleOptions.docs(@update_opts_schema)} """ @callback update!(Ash.changeset(), params :: Keyword.t()) :: Ash.record() | no_return @@ -192,8 +277,6 @@ defmodule Ash.Api do @doc """ Destroy a record. See `c:destroy/2` for more information. - - #{NimbleOptions.docs(@destroy_opts_schema)} """ @callback destroy!(Ash.changeset() | Ash.record(), params :: Keyword.t()) :: :ok | no_return @@ -297,6 +380,20 @@ defmodule Ash.Api do query |> api.read(Keyword.delete(opts, :load)) |> case do + {:ok, %{results: [single_result]}} -> + {:ok, single_result} + + {:ok, %{results: []}} -> + {:ok, nil} + + {:ok, %{results: results}} -> + {:error, + Ash.Error.Invalid.MultipleResults.exception( + count: Enum.count(results), + query: query, + at_least?: true + )} + {:ok, [single_result]} -> {:ok, single_result} @@ -316,6 +413,156 @@ defmodule Ash.Api do end end + def page!(api, keyset, request) do + api + |> page(keyset, request) + |> unwrap_or_raise!() + end + + def page(_, %Ash.Page.Keyset{results: []} = page, :next) do + {:ok, page} + end + + def page(_, %Ash.Page.Keyset{results: []} = page, :prev) do + {:ok, page} + end + + def page(_, %Ash.Page.Keyset{}, n) when is_integer(n) do + {:error, "Cannot seek to a specific page with keyset based pagination"} + end + + def page( + api, + %Ash.Page.Keyset{results: results, rerun: {query, opts}}, + :next + ) do + last_keyset = + results + |> :lists.last() + |> Map.get(:metadata) + |> Map.get(:keyset) + + new_page_opts = + opts[:page] + |> Keyword.delete(:before) + |> Keyword.put(:after, last_keyset) + + read(api, query, Keyword.put(opts, :page, new_page_opts)) + end + + def page(api, %Ash.Page.Keyset{results: results, rerun: {query, opts}}, :prev) do + first_keyset = + results + |> List.first() + |> Map.get(:metadata) + |> Map.get(:keyset) + + new_page_opts = + opts[:page] + |> Keyword.put(:before, first_keyset) + |> Keyword.delete(:after) + + read(api, query, Keyword.put(opts, :page, new_page_opts)) + end + + def page(api, %Ash.Page.Keyset{rerun: {query, opts}}, :first) do + page_opts = + if opts[:page][:count] do + [count: true] + else + [] + end + + read(api, query, Keyword.put(opts, :page, page_opts)) + end + + def page(api, %Ash.Page.Keyset{rerun: {query, opts}}, :last) do + query_reverse_sorted = + case query.sort do + nil -> + sort = + query.resource + |> Ash.Resource.primary_key() + |> Enum.map(&{&1, :desc}) + + Ash.Query.sort(query, sort) + + sort -> + new_sorted = + query + |> Ash.Query.unset(:sort) + |> Ash.Query.sort(Ash.Actions.Sort.reverse(sort)) + + if Ash.Actions.Sort.sorting_on_identity?(new_sorted) do + new_sorted + else + sort = + query.resource + |> Ash.Resource.primary_key() + |> Enum.map(&{&1, :desc}) + + Ash.Query.sort(new_sorted, sort) + end + end + + new_page_params = Keyword.drop(opts[:page] || [], [:before, :after]) + + case read(api, query_reverse_sorted, Keyword.put(opts, :page, new_page_params)) do + {:ok, page} -> + {:ok, Map.update!(page, :results, &Enum.reverse/1)} + + {:error, error} -> + {:error, error} + end + end + + def page( + api, + %Ash.Page.Offset{count: count, limit: limit, offset: offset, rerun: {query, opts}}, + request + ) do + page_opts = + case request do + :next -> + [offset: offset + limit, limit: limit] + + :prev -> + [offset: min(offset - limit, 0), limit: limit] + + :first -> + [offset: 0, limit: limit] + + :last -> + if count do + [offset: count - limit, limit: limit] + else + [offset: 0, limit: limit] + end + + page_num when is_integer(page_num) -> + [offset: page_num * limit, limit: limit] + end + + page_opts = + if opts[:page][:count] do + Keyword.put(page_opts, :count, true) + else + page_opts + end + + if request == :last && !count do + case read(api, Ash.Query.reverse(query), Keyword.put(opts, :page, page_opts)) do + {:ok, page} -> + {:ok, Map.update!(page, :results, &Enum.reverse/1)} + + {:error, error} -> + {:error, error} + end + else + read(api, query, Keyword.put(opts, :page, page_opts)) + end + end + defp get_filter(resource, id) do primary_key = Ash.Resource.primary_key(resource) keyword? = Keyword.keyword?(id) @@ -438,7 +685,7 @@ defmodule Ash.Api do @doc false @spec read(Ash.api(), Ash.query(), Keyword.t()) :: - {:ok, list(Ash.resource())} | {:error, Ash.error()} + {:ok, list(Ash.resource()) | Ash.page()} | {:error, Ash.error()} def read(api, query, opts \\ []) do query = Ash.Query.set_api(query, api) diff --git a/lib/ash/api/interface.ex b/lib/ash/api/interface.ex index ce89449b..f248da0d 100644 --- a/lib/ash/api/interface.ex +++ b/lib/ash/api/interface.ex @@ -5,10 +5,12 @@ defmodule Ash.Api.Interface do quote do alias Ash.Api + @impl Ash.Api def get!(resource, id, params \\ []) do Api.get!(__MODULE__, resource, id, params) end + @impl Ash.Api def get(resource, id, params \\ []) do case Api.get(__MODULE__, resource, id, params) do {:ok, instance} -> {:ok, instance} @@ -16,12 +18,14 @@ defmodule Ash.Api.Interface do end end + @impl Ash.Api def read!(query, opts \\ []) def read!(query, opts) do Api.read!(__MODULE__, query, opts) end + @impl Ash.Api def read(query, opts \\ []) def read(query, opts) do @@ -31,10 +35,25 @@ defmodule Ash.Api.Interface do end end + @impl Ash.Api + def page!(page, request) do + Api.page!(__MODULE__, page, request) + end + + @impl Ash.Api + def page(page, request) do + case Api.page(__MODULE__, page, request) do + {:ok, page} -> {:ok, page} + {:error, error} -> {:error, Ash.Error.to_ash_error(error)} + end + end + + @impl Ash.Api def load!(data, query, opts \\ []) do Api.load!(__MODULE__, data, query, opts) end + @impl Ash.Api def load(data, query, opts \\ []) do case Api.load(__MODULE__, data, query, opts) do {:ok, results} -> {:ok, results} @@ -42,10 +61,12 @@ defmodule Ash.Api.Interface do end end + @impl Ash.Api def create!(changeset, params \\ []) do Api.create!(__MODULE__, changeset, params) end + @impl Ash.Api def create(changeset, params \\ []) do case Api.create(__MODULE__, changeset, params) do {:ok, instance} -> {:ok, instance} @@ -53,10 +74,12 @@ defmodule Ash.Api.Interface do end end + @impl Ash.Api def update!(changeset, params \\ []) do Api.update!(__MODULE__, changeset, params) end + @impl Ash.Api def update(changeset, params \\ []) do case Api.update(__MODULE__, changeset, params) do {:ok, instance} -> {:ok, instance} @@ -64,10 +87,12 @@ defmodule Ash.Api.Interface do end end + @impl Ash.Api def destroy!(record, params \\ []) do Api.destroy!(__MODULE__, record, params) end + @impl Ash.Api def destroy(record, params \\ []) do case Api.destroy(__MODULE__, record, params) do :ok -> :ok @@ -75,11 +100,13 @@ defmodule Ash.Api.Interface do end end + @impl Ash.Api def reload!(%resource{} = record, params \\ []) do id = record |> Map.take(Ash.Resource.primary_key(resource)) |> Enum.to_list() get!(resource, id, params) end + @impl Ash.Api def reload(%resource{} = record, params \\ []) do id = record |> Map.take(Ash.Resource.primary_key(resource)) |> Enum.to_list() get(resource, id, params) diff --git a/lib/ash/data_layer/data_layer.ex b/lib/ash/data_layer/data_layer.ex index eda57357..6588fff4 100644 --- a/lib/ash/data_layer/data_layer.ex +++ b/lib/ash/data_layer/data_layer.ex @@ -12,6 +12,7 @@ defmodule Ash.DataLayer do | {:lateral_join, Ash.resource()} | {:join, Ash.resource()} | {:aggregate, Ash.aggregate_kind()} + | {:query_aggregate, Ash.aggregate_kind()} | :aggregate_filter | :aggregate_sort | :boolean_filter @@ -49,6 +50,18 @@ defmodule Ash.DataLayer do @callback run_query(Ash.data_layer_query(), Ash.resource()) :: {:ok, list(Ash.resource())} | {:error, term} @callback equal?(Ash.data_layer()) :: boolean + @callback run_aggregate_query(Ash.data_layer_query(), list(Ash.aggregate()), Ash.resource()) :: + {:ok, map} | {:error, term} + @callback run_aggregate_query_with_lateral_join( + Ash.data_layer_query(), + list(Ash.aggregate()), + [Ash.record()], + source_resource :: Ash.resource(), + destination_resource :: Ash.resource(), + source :: atom, + destination :: atom + ) :: + {:ok, list(Ash.resource())} | {:error, term} @callback run_query_with_lateral_join( Ash.data_layer_query(), [Ash.record()], @@ -93,6 +106,8 @@ defmodule Ash.DataLayer do functions: 1, in_transaction?: 1, add_aggregate: 3, + run_aggregate_query: 3, + run_aggregate_query_with_lateral_join: 7, transform_query: 1, resource_to_query: 1 @@ -215,12 +230,44 @@ defmodule Ash.DataLayer do data_layer.can?(resource, feature) end + @spec run_aggregate_query(Ash.data_layer_query(), list(Ash.aggregate()), Ash.resource()) :: + {:ok, map} | {:error, term} + def run_aggregate_query(query, aggregates, resource) do + data_layer = Ash.Resource.data_layer(resource) + + if :erlang.function_exported(data_layer, :run_aggregate_query, 3) do + data_layer.run_aggregate_query(query, aggregates, resource) + else + {:error, "Aggregate queries not supported"} + end + end + @spec run_query(Ash.data_layer_query(), central_resource :: Ash.resource()) :: {:ok, list(Ash.record())} | {:error, term} def run_query(query, central_resource) do Ash.Resource.data_layer(central_resource).run_query(query, central_resource) end + def run_aggregate_query_with_lateral_join( + query, + aggregates, + root_data, + source_resource, + destination_resource, + source, + destination + ) do + Ash.Resource.data_layer(source_resource).run_query_with_lateral_join( + query, + aggregates, + root_data, + source_resource, + destination_resource, + source, + destination + ) + end + def run_query_with_lateral_join( query, root_data, diff --git a/lib/ash/data_layer/ets.ex b/lib/ash/data_layer/ets.ex index 46b4a01c..2cc21607 100644 --- a/lib/ash/data_layer/ets.ex +++ b/lib/ash/data_layer/ets.ex @@ -42,7 +42,7 @@ defmodule Ash.DataLayer.Ets do defmodule Query do @moduledoc false - defstruct [:resource, :filter, :limit, :sort, relationships: %{}, offset: 0, aggregates: []] + defstruct [:resource, :filter, :limit, :sort, relationships: %{}, offset: 0] end @impl true @@ -69,6 +69,7 @@ defmodule Ash.DataLayer.Ets do def can?(_, {:filter_operator, %LessThanOrEqual{}}), do: true def can?(_, {:filter_operator, %GreaterThanOrEqual{}}), do: true def can?(_, {:filter_operator, %IsNil{}}), do: true + def can?(_, {:query_aggregate, :count}), do: true def can?(_, {:sort, _}), do: true def can?(_, _), do: false @@ -96,8 +97,22 @@ defmodule Ash.DataLayer.Ets do end @impl true - def add_aggregate(query, aggregate, _) do - {:ok, %{query | aggregates: [aggregate | query.aggregates]}} + def run_aggregate_query(query, aggregates, resource) do + case run_query(query, resource) do + {:ok, results} -> + Enum.reduce_while(aggregates, {:ok, %{}}, fn + %{kind: :count, name: name, query: query}, {:ok, acc} -> + value = + results + |> filter_matches(Map.get(query || %{}, :filter)) + |> Enum.count() + + {:cont, {:ok, Map.put(acc, name, value)}} + + _, _ -> + {:halt, {:error, "unsupported aggregate"}} + end) + end end @impl true diff --git a/lib/ash/engine/request.ex b/lib/ash/engine/request.ex index 84caa9dd..3aeccdcb 100644 --- a/lib/ash/engine/request.ex +++ b/lib/ash/engine/request.ex @@ -616,7 +616,8 @@ defmodule Ash.Engine.Request do |> Ash.Query.filter(^primary_key_filter) |> Ash.Query.filter(^filter) - request.api.read(new_query) + new_query + |> Ash.Actions.Read.unpaginated_read() |> case do {:ok, results} -> pkey = Ash.Resource.primary_key(request.resource) diff --git a/lib/ash/error/invalid/limit_required.ex b/lib/ash/error/invalid/limit_required.ex new file mode 100644 index 00000000..bc1af38f --- /dev/null +++ b/lib/ash/error/invalid/limit_required.ex @@ -0,0 +1,20 @@ +defmodule Ash.Error.Invalid.LimitRequired do + @moduledoc "Used when no limit is provided, pagination is required, and no default page size is configured" + use Ash.Error + + def_ash_error([], class: :invalid) + + defimpl Ash.ErrorKind do + def id(_), do: Ecto.UUID.generate() + + def code(_), do: "limit_is_required" + + def class(_), do: :invalid + + def message(_) do + "Limit is required" + end + + def stacktrace(_), do: nil + end +end diff --git a/lib/ash/error/invalid/multiple_results.ex b/lib/ash/error/invalid/multiple_results.ex index 50a7cd00..f0ba6fda 100644 --- a/lib/ash/error/invalid/multiple_results.ex +++ b/lib/ash/error/invalid/multiple_results.ex @@ -2,19 +2,22 @@ defmodule Ash.Error.Invalid.MultipleResults do @moduledoc "Used when multiple requests with the same path are passed to the internal engine" use Ash.Error - def_ash_error([:count, :query], class: :invalid) + def_ash_error([:count, :query, :at_least?], class: :invalid) defimpl Ash.ErrorKind do def id(_), do: Ecto.UUID.generate() def code(_), do: "multiple_results" - def message(%{count: count, query: query}) do + def message(%{count: count, query: query, at_least?: at_least?}) do """ - expected at most one result but got #{count} in query: + expected at most one result but got #{at_least(at_least?)}#{count} in query: #{inspect(query)} """ end + + defp at_least(true), do: "at least " + defp at_least(_), do: "" end end diff --git a/lib/ash/error/invalid/pagination_required.ex b/lib/ash/error/invalid/pagination_required.ex new file mode 100644 index 00000000..d9c55823 --- /dev/null +++ b/lib/ash/error/invalid/pagination_required.ex @@ -0,0 +1,20 @@ +defmodule Ash.Error.Invalid.PaginationRequired do + @moduledoc "Used when `page: false` is provided but pagination is required" + use Ash.Error + + def_ash_error([], class: :invalid) + + defimpl Ash.ErrorKind do + def id(_), do: Ecto.UUID.generate() + + def code(_), do: "pagination_required" + + def class(_), do: :invalid + + def message(_) do + "Pagination is required" + end + + def stacktrace(_), do: nil + end +end diff --git a/lib/ash/filter/filter.ex b/lib/ash/filter/filter.ex index ff040505..cc541863 100644 --- a/lib/ash/filter/filter.ex +++ b/lib/ash/filter/filter.ex @@ -18,7 +18,8 @@ defmodule Ash.Filter do GreaterThanOrEqual, In, LessThan, - LessThanOrEqual + LessThanOrEqual, + NotEq } alias Ash.Query.{Expression, Not, Ref} @@ -31,6 +32,7 @@ defmodule Ash.Filter do @operators [ Ash.Query.Operator.IsNil, Eq, + NotEq, In, LessThan, GreaterThan, @@ -43,6 +45,8 @@ defmodule Ash.Filter do @operator_aliases [ eq: Eq, equals: Eq, + not_eq: NotEq, + not_equals: NotEq, gt: GreaterThan, greater_than: GreaterThan, lt: LessThan, @@ -441,7 +445,7 @@ defmodule Ash.Filter do relationship.destination |> Ash.Query.new(api) |> Ash.Query.do_filter(filter) - |> api.read() + |> Ash.Actions.Read.unpaginated_read() |> case do {:ok, results} -> relationship.through @@ -480,7 +484,7 @@ defmodule Ash.Filter do end defp filter_related_in(query, relationship, path) do - case query.api.read(query) do + case Ash.Actions.Read.unpaginated_read(query) do {:error, error} -> {:error, error} diff --git a/lib/ash/page/keyset.ex b/lib/ash/page/keyset.ex new file mode 100644 index 00000000..43fd476c --- /dev/null +++ b/lib/ash/page/keyset.ex @@ -0,0 +1,173 @@ +defmodule Ash.Page.Keyset do + @moduledoc """ + A page of results from `keyset` based pagination. + + The results are generated with a `keyset` metadata, + which can be used to fetch the next/previous pages. + """ + defstruct [:results, :count, :before, :after, :limit, :rerun] + + @type t :: %__MODULE__{} + + def new(results, count, sort, original_query, opts) do + %__MODULE__{ + results: data_with_keyset(results, sort), + count: count, + before: opts[:page][:before], + after: opts[:page][:after], + limit: opts[:page][:limit], + rerun: {original_query, opts} + } + end + + def data_with_keyset(results, sort) do + fields_in_keyset = + sort + |> Keyword.keys() + |> Enum.sort() + + Enum.map(results, fn result -> + Map.update!( + result, + :metadata, + &Map.put(&1, :keyset, keyset(result, fields_in_keyset)) + ) + end) + end + + def filter(values, sort, after_or_before) when after_or_before in [:after, :before] do + sort_fields = + sort + |> Keyword.keys() + |> Enum.sort() + + with {:ok, decoded} <- decode_values(values), + {:ok, zipped} <- zip_fields(sort_fields, decoded) do + field_values = + Enum.map(sort, fn {field, direction} -> + {field, direction, Keyword.get(zipped, field)} + end) + + {:ok, filters(field_values, after_or_before)} + end + end + + defp decode_values(values) do + {:ok, + values + |> Base.decode64!() + |> non_executable_binary_to_term([:safe])} + rescue + e -> + {:error, e} + end + + defp filters([{field, direction, value} | rest], after_or_before) do + operator = operator(after_or_before, direction) + + case rest do + [] -> + [{field, [{operator, value}]}] + + rest -> + [ + and: [ + [{field, [{operator, value}]}], + [or: [[{field, [{operator, value}]}], filters(rest, after_or_before)]] + ] + ] + end + end + + defp operator(:after, :asc), do: :gt + defp operator(:after, :desc), do: :lt + defp operator(:before, :asc), do: :lt + defp operator(:before, :desc), do: :gt + + defp zip_fields(pkey, values, acc \\ []) + defp zip_fields([], [], acc), do: {:ok, Enum.reverse(acc)} + + defp zip_fields([pkey | rest_pkey], [value | rest_values], acc) do + zip_fields(rest_pkey, rest_values, [{pkey, value} | acc]) + end + + defp zip_fields(_, _, _), do: {:error, "Invalid keyset"} + + defp keyset(record, fields) do + record + |> field_values(fields) + |> :erlang.term_to_binary() + |> Base.encode64() + end + + defp field_values(record, fields) do + Enum.map(fields, &Map.get(record, &1)) + end + + # A restricted version of `:erlang.binary_to_term/2` that forbids + # *executable* terms, such as anonymous functions. + # The `opts` are given to the underlying `:erlang.binary_to_term/2` + # call, with an empty list as a default. + # By default this function does not restrict atoms, as an atom + # interned in one node may not yet have been interned on another + # (except for releases, which preload all code). + # If you want to avoid atoms from being created, then you can pass + # `[:safe]` as options, as that will also enable the safety mechanisms + # from `:erlang.binary_to_term/2` itself. + # Ripped from https://github.com/elixir-plug/plug_crypto/blob/v1.2.0/lib/plug/crypto.ex + + # sobelow_skip ["Misc.BinToTerm"] + defp non_executable_binary_to_term(binary, opts) when is_binary(binary) do + term = :erlang.binary_to_term(binary, opts) + non_executable_terms(term) + term + end + + defp non_executable_terms(list) when is_list(list) do + non_executable_list(list) + end + + defp non_executable_terms(tuple) when is_tuple(tuple) do + non_executable_tuple(tuple, tuple_size(tuple)) + end + + defp non_executable_terms(map) when is_map(map) do + folder = fn key, value, acc -> + non_executable_terms(key) + non_executable_terms(value) + acc + end + + :maps.fold(folder, map, map) + end + + defp non_executable_terms(other) + when is_atom(other) or is_number(other) or is_bitstring(other) or is_pid(other) or + is_reference(other) do + other + end + + defp non_executable_terms(other) do + raise ArgumentError, + "cannot deserialize #{inspect(other)}, the term is not safe for deserialization" + end + + defp non_executable_list([]), do: :ok + + defp non_executable_list([h | t]) when is_list(t) do + non_executable_terms(h) + non_executable_list(t) + end + + defp non_executable_list([h | t]) do + non_executable_terms(h) + non_executable_terms(t) + end + + defp non_executable_tuple(_tuple, 0), do: :ok + + defp non_executable_tuple(tuple, n) do + non_executable_terms(:erlang.element(n, tuple)) + non_executable_tuple(tuple, n - 1) + end +end diff --git a/lib/ash/page/offset.ex b/lib/ash/page/offset.ex new file mode 100644 index 00000000..083fcd7f --- /dev/null +++ b/lib/ash/page/offset.ex @@ -0,0 +1,21 @@ +defmodule Ash.Page.Offset do + @moduledoc """ + A page of results from `offset` based pagination. + + If a resource supports `keyset` pagination as well, + it will also have the `keyset` metadata. + """ + defstruct [:results, :limit, :offset, :count, :rerun] + + @type t :: %__MODULE__{} + + def new(results, count, original_query, opts) do + %__MODULE__{ + results: results, + limit: opts[:page][:limit], + count: count, + offset: opts[:page][:offset] || 0, + rerun: {original_query, opts} + } + end +end diff --git a/lib/ash/query/operator/not_eq.ex b/lib/ash/query/operator/not_eq.ex new file mode 100644 index 00000000..5997317c --- /dev/null +++ b/lib/ash/query/operator/not_eq.ex @@ -0,0 +1,42 @@ +defmodule Ash.Query.Operator.NotEq do + @moduledoc """ + left != right + + In comparison, simplifies to `not(left == right)` + """ + use Ash.Query.Operator, operator: :!=, predicate?: true + + alias Ash.Query.Not + alias Ash.Query.Operator.Eq + + def new(%Ref{} = ref, nil) do + Ash.Query.Operator.new(Ash.Query.Operator.IsNil, ref, false) + end + + def new(%Ref{attribute: %{type: type}} = left, right) do + case Ash.Type.cast_input(type, right) do + {:ok, casted} -> + {:ok, left, casted} + + _ -> + {:error, + Ash.Error.Query.InvalidFilterValue.exception( + value: right, + message: "Could not be casted to type #{inspect(type)}", + context: %__MODULE__{left: left, right: right} + )} + end + end + + def new(left, right) do + {:known, left != right} + end + + def evaluate(%{left: left, right: right}) do + left != right + end + + def simplify(%__MODULE__{left: left, right: right}) do + %Not{expression: %Eq{left: left, right: right}} + end +end diff --git a/lib/ash/query/query.ex b/lib/ash/query/query.ex index 0d243ee5..f7c8ee35 100644 --- a/lib/ash/query/query.ex +++ b/lib/ash/query/query.ex @@ -792,6 +792,22 @@ defmodule Ash.Query do end end + @doc """ + Reverse the sort order of a query. + + If the query has no sort, an error is added indicating that. + """ + @spec reverse(t()) :: t() + def reverse(%{sort: nil} = query) do + add_error(query, :sort, "Unreversable sort") + end + + def reverse(query) do + query + |> Ash.Query.unset(:sort) + |> Ash.Query.sort(Ash.Actions.Sort.reverse(query.sort)) + end + @spec unset(Ash.resource() | t(), atom | [atom]) :: t() def unset(query, keys) when is_list(keys) do query = to_query(query) diff --git a/lib/ash/resource/actions/read.ex b/lib/ash/resource/actions/read.ex index eb895b5d..11880089 100644 --- a/lib/ash/resource/actions/read.ex +++ b/lib/ash/resource/actions/read.ex @@ -1,7 +1,7 @@ defmodule Ash.Resource.Actions.Read do @moduledoc "Represents a read action on a resource." - defstruct [:name, :primary?, :filter, :description, type: :read] + defstruct [:name, :pagination, :primary?, :filter, :description, type: :read] @type t :: %__MODULE__{ type: :read, @@ -20,12 +20,85 @@ defmodule Ash.Resource.Actions.Read do type: :any, doc: "A filter template, that may contain actor references. See `Ash.Filter` for more on templates" + ], + pagination: [ + type: {:custom, __MODULE__, :pagination, []}, + doc: + "Options for how the action should support pagination. See the pagination section for more information.", + default: false ] ], @global_opts, "Action Options" ) + @pagination_schema [ + keyset?: [ + type: :boolean, + doc: "Whether or not keyset based pagination is supported", + default: false + ], + offset?: [ + type: :boolean, + doc: "Whether or not offset based pagination is supported", + default: false + ], + default_limit: [ + type: :pos_integer, + doc: "The default page size to apply, if one is not supplied" + ], + countable: [ + type: {:one_of, [true, false, :by_default]}, + doc: + "Whether not a returned page will have a full count of all records. Use `:by_default` to do it automatically.", + default: false + ], + max_page_size: [ + type: :pos_integer, + doc: "The maximum amount of records that can be requested in a single page", + default: 250 + ], + required?: [ + type: :boolean, + doc: + "Whether or not pagination can be disabled. Only relevant if some pagination configuration is supplied.", + default: true + ] + ] + + defmodule Pagination do + @moduledoc "Represents the pagination configuration of a read action" + defstruct [ + :default_limit, + :max_page_size, + countable: false, + required?: false, + keyset?: false, + offset?: false + ] + end + + def pagination(false) do + {:ok, %Pagination{}} + end + + def pagination(opts) do + case NimbleOptions.validate(opts, @pagination_schema) do + {:ok, result} -> + pagination = struct(Pagination, result) + + if pagination.keyset? or pagination.offset? do + {:ok, pagination} + else + {:error, "Must enable `keyset?` or `offset?`"} + end + + {:error, error} -> + {:error, error} + end + end + @doc false def opt_schema, do: @opt_schema + def pagination_schema, do: @pagination_schema end diff --git a/lib/ash/resource/dsl.ex b/lib/ash/resource/dsl.ex index 43898a00..799333d3 100644 --- a/lib/ash/resource/dsl.ex +++ b/lib/ash/resource/dsl.ex @@ -217,6 +217,10 @@ defmodule Ash.Resource.Dsl do name: :read, describe: """ Declares a `read` action. For calling this action, see the `Ash.Api` documentation. + + ## Pagination + + #{NimbleOptions.docs(Ash.Resource.Actions.Read.pagination_schema())} """, examples: [ "read :read_all, primary?: true" @@ -463,7 +467,8 @@ defmodule Ash.Resource.Dsl do Ash.Resource.Transformers.CreateJoinRelationship, Ash.Resource.Transformers.CachePrimaryKey, Ash.Resource.Transformers.SetPrimaryActions, - Ash.Resource.Transformers.ValidateActionTypesSupported + Ash.Resource.Transformers.ValidateActionTypesSupported, + Ash.Resource.Transformers.CountableActions ] use Ash.Dsl.Extension, diff --git a/lib/ash/resource/schema.ex b/lib/ash/resource/schema.ex index bdf2e53a..a1daddf7 100644 --- a/lib/ash/resource/schema.ex +++ b/lib/ash/resource/schema.ex @@ -29,6 +29,7 @@ defmodule Ash.Schema do field(:aggregates, :map, virtual: true, default: %{}) field(:calculations, :map, virtual: true, default: %{}) + field(:metadata, :map, virtual: true, default: %{}) for aggregate <- Ash.Resource.aggregates(__MODULE__) do {:ok, type} = Aggregate.kind_to_type(aggregate.kind) diff --git a/lib/ash/resource/transformers/countable_actions.ex b/lib/ash/resource/transformers/countable_actions.ex new file mode 100644 index 00000000..dd0e789f --- /dev/null +++ b/lib/ash/resource/transformers/countable_actions.ex @@ -0,0 +1,34 @@ +defmodule Ash.Resource.Transformers.CountableActions do + @moduledoc """ + Ensures that countable paginated actions do not exist for resources that are not countable + """ + use Ash.Dsl.Transformer + + alias Ash.Dsl.Transformer + + # sobelow_skip ["DOS.BinToAtom"] + def transform(resource, dsl_state) do + dsl_state + |> Transformer.get_entities([:actions]) + |> Enum.filter(&(&1.type == :read)) + |> Enum.filter(& &1.pagination) + |> Enum.filter(& &1.pagination.countable) + |> case do + [] -> + {:ok, dsl_state} + + [action | _] -> + if Ash.Resource.data_layer_can?(resource, {:query_aggregate, :count}) do + {:ok, dsl_state} + else + {:error, + Ash.Error.Dsl.DslError.exception( + module: __MODULE__, + path: [:actions, action.name], + message: + "Action cannot be countable, as the datalayer does not support counting queries" + )} + end + end + end +end diff --git a/mix.exs b/mix.exs index c1892cc8..b0185955 100644 --- a/mix.exs +++ b/mix.exs @@ -44,6 +44,7 @@ defmodule Ash.MixProject do "README.md", "documentation/introduction/getting_started.md", "documentation/topics/authorization.md", + "documentation/topics/pagination.md", "documentation/topics/validation.md", "documentation/topics/error_handling.md", "documentation/topics/aggregates.md", @@ -59,7 +60,9 @@ defmodule Ash.MixProject do Ash, Ash.Api, Ash.Query, - Ash.Changeset + Ash.Changeset, + Ash.Resource.Dsl, + Ash.Api.Dsl ], validations: ~r/Ash.Resource.Validation/, changes: ~r/Ash.Resource.Change/, @@ -71,6 +74,7 @@ defmodule Ash.MixProject do type: ~r/Ash.Type/, data_layer: ~r/Ash.DataLayer/, authorizer: ~r/Ash.Authorizer/, + pagination: ~r/Ash.Page/, extension: [ Ash.Dsl.Entity, Ash.Dsl.Extension, diff --git a/test/actions/pagination_test.exs b/test/actions/pagination_test.exs new file mode 100644 index 00000000..0a4072fd --- /dev/null +++ b/test/actions/pagination_test.exs @@ -0,0 +1,402 @@ +defmodule Ash.Actions.PaginationTest do + use ExUnit.Case, async: true + + require Ash.Query + + defmodule User do + @moduledoc false + use Ash.Resource, data_layer: Ash.DataLayer.Ets + + ets do + private?(true) + end + + actions do + read :offset do + pagination offset?: true, countable: true + end + + read :optional_offset do + pagination offset?: true, countable: true, required?: false + end + + read :offset_countable_by_default do + pagination offset?: true, countable: :by_default, required?: false + end + + read :required_offset_with_default do + pagination offset?: true, countable: true, required?: false, default_limit: 25 + end + + read :keyset do + pagination keyset?: true, countable: true + end + + read :optional_keyset do + pagination keyset?: true, countable: true, required?: false + end + + read :keyset_countable_by_default do + pagination keyset?: true, countable: :by_default, required?: false + end + + read :required_keyset_with_default do + pagination keyset?: true, countable: true, required?: false, default_limit: 25 + end + + read :both_required do + primary? true + pagination keyset?: true, offset?: true, countable: true + end + + read :both_optional do + pagination keyset?: true, offset?: true, countable: true, default_limit: 25 + end + + create :default + update :default + end + + attributes do + attribute :id, :uuid, primary_key?: true, default: &Ecto.UUID.generate/0 + attribute :name, :string + end + end + + defmodule Api do + use Ash.Api + + resources do + resource User + end + end + + test "pagination is required by default" do + assert_raise Ash.Error.Invalid.PaginationRequired, fn -> + Api.read!(User, page: false) + end + end + + test "a default limit allows not specifying page parameters" do + assert_raise Ash.Error.Invalid.LimitRequired, fn -> + Api.read!(User) + end + + Api.read!(User, action: :required_offset_with_default) + end + + describe "offset pagination" do + setup do + for i <- 0..9 do + Api.create!(Ash.Changeset.new(User, %{name: "#{i}"})) + end + + :ok + end + + test "can be limited" do + assert Enum.count(Api.read!(User, action: :optional_offset, page: false)) == 10 + assert Enum.count(Api.read!(User, action: :optional_offset, page: [limit: 5]).results) == 5 + end + + test "can be offset" do + assert Enum.count(Api.read!(User, action: :optional_offset, page: false)) == 10 + assert Enum.count(Api.read!(User, action: :optional_offset, page: [offset: 5]).results) == 5 + end + + test "can include a full count" do + assert Api.read!(User, action: :optional_offset, page: [limit: 1, count: true]).count == 10 + end + + test "can default to including a count" do + assert Api.read!(User, action: :offset_countable_by_default, page: [limit: 1]).count == 10 + end + + test "count is not included by default otherwise" do + assert is_nil(Api.read!(User, action: :optional_offset, page: [limit: 1]).count) + end + + test "`count: false` prevents the count from occurring even if it is on `by_default`" do + assert is_nil( + Api.read!(User, + action: :offset_countable_by_default, + page: [limit: 1, count: false] + ).count + ) + end + + test "pagination works with a sort applied" do + names = + User + |> Ash.Query.sort(:name) + |> Api.read!(page: [offset: 5]) + |> Map.get(:results) + |> Enum.map(& &1.name) + + assert names == ["5", "6", "7", "8", "9"] + end + + test "pagination works with a reversed sort applied" do + names = + User + |> Ash.Query.sort(name: :desc) + |> Api.read!(page: [offset: 5]) + |> Map.get(:results) + |> Enum.map(& &1.name) + + assert names == ["4", "3", "2", "1", "0"] + end + + test "pagination works with a filter" do + names = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [offset: 1]) + |> Map.get(:results) + |> Enum.map(& &1.name) + + assert names == ["3", "2", "1", "0"] + end + + test "the next page can be fetched" do + assert %{results: [%{name: "3"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [offset: 1, limit: 1]) + + assert %{results: [%{name: "2"}]} = Api.page!(page, :next) + end + + test "the previous page can be fetched" do + assert %{results: [%{name: "3"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [offset: 1, limit: 1]) + + assert %{results: [%{name: "4"}]} = Api.page!(page, :prev) + end + + test "the first page can be fetched" do + assert %{results: [%{name: "2"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [offset: 2, limit: 1]) + + assert %{results: [%{name: "4"}]} = Api.page!(page, :first) + end + + test "the last page can be fetched if the count was not requested" do + assert %{results: [%{name: "3"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [offset: 1, limit: 1]) + + assert %{results: [%{name: "0"}]} = Api.page!(page, :last) + end + + test "the last page can be fetched if the count was requested" do + assert %{results: [%{name: "3"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [offset: 1, limit: 1, count: true]) + + assert %{results: [%{name: "0"}]} = Api.page!(page, :last) + end + end + + describe "keyset pagination" do + setup do + for i <- 0..9 do + Api.create!(Ash.Changeset.new(User, %{name: "#{i}"})) + end + + :ok + end + + test "can be limited" do + assert Enum.count(Api.read!(User, action: :optional_keyset, page: false)) == 10 + assert Enum.count(Api.read!(User, action: :optional_keyset, page: [limit: 5]).results) == 5 + end + + test "can include a full count" do + assert Api.read!(User, action: :optional_keyset, page: [limit: 1, count: true]).count == 10 + end + + test "can default to including a count" do + assert Api.read!(User, action: :keyset_countable_by_default, page: [limit: 1]).count == 10 + end + + test "count is not included by default otherwise" do + assert is_nil(Api.read!(User, action: :optional_keyset, page: [limit: 1]).count) + end + + test "`count: false` prevents the count from occurring even if it is on `by_default`" do + assert is_nil( + Api.read!(User, + action: :keyset_countable_by_default, + page: [limit: 1, count: false] + ).count + ) + end + + test "can ask for records after a specific keyset" do + %{results: [%{id: id, metadata: %{keyset: keyset}}]} = + Api.read!(User, action: :keyset, page: [limit: 1]) + + %{results: [%{id: next_id}]} = + Api.read!(User, action: :keyset, page: [limit: 1, after: keyset]) + + refute id == next_id + end + + test "can ask for records before a specific keyset" do + %{results: [%{id: id, metadata: %{keyset: keyset}}]} = + Api.read!(User, action: :keyset, page: [limit: 1]) + + %{results: [%{id: next_id, metadata: %{keyset: keyset2}}]} = + Api.read!(User, action: :keyset, page: [limit: 1, after: keyset]) + + refute id == next_id + + %{results: [%{id: before_id}]} = + Api.read!(User, action: :keyset, page: [limit: 1, before: keyset2]) + + assert id == before_id + end + + test "pagination works with a sort applied" do + page = + User + |> Ash.Query.filter(name == "4") + |> Ash.Query.sort(:name) + |> Api.read!(page: [limit: 1]) + + keyset = Enum.at(page.results, 0).metadata.keyset + + names = + User + |> Ash.Query.sort(:name) + |> Api.read!(page: [after: keyset, limit: 5]) + |> Map.get(:results) + |> Enum.map(& &1.name) + + assert names == ["5", "6", "7", "8", "9"] + end + + test "pagination works with a reversed sort applied" do + page = + User + |> Ash.Query.filter(name == "5") + |> Ash.Query.sort(name: :desc) + |> Api.read!(page: [limit: 1]) + + keyset = Enum.at(page.results, 0).metadata.keyset + + names = + User + |> Ash.Query.sort(name: :desc) + |> Api.read!(page: [after: keyset, limit: 5]) + |> Map.get(:results) + |> Enum.map(& &1.name) + + assert names == ["4", "3", "2", "1", "0"] + end + + test "pagination works with a filter" do + page = + User + |> Ash.Query.filter(name == "5") + |> Ash.Query.sort(name: :desc) + |> Api.read!(page: [limit: 1]) + + keyset = Enum.at(page.results, 0).metadata.keyset + + names = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name != "4") + |> Api.read!(page: [after: keyset, limit: 5]) + |> Map.get(:results) + |> Enum.map(& &1.name) + + assert names == ["3", "2", "1", "0"] + end + + test "the next page can be fetched" do + assert %{results: [%{name: "4"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [limit: 1]) + + assert %{results: [%{name: "3"}]} = Api.page!(page, :next) + end + + test "the previous page can be fetched" do + assert %{results: [%{name: "4"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [limit: 1]) + + assert %{results: [%{name: "3"}]} = page = Api.page!(page, :next) + assert %{results: [%{name: "4"}]} = Api.page!(page, :prev) + end + + test "the first page can be fetched" do + assert %{results: [%{name: "4"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [limit: 1]) + + assert %{results: [%{name: "3"}]} = page = Api.page!(page, :next) + assert %{results: [%{name: "4"}]} = Api.page!(page, :first) + end + + test "the last page can be fetched" do + assert %{results: [%{name: "3"}]} = + page = + User + |> Ash.Query.sort(name: :desc) + |> Ash.Query.filter(name in ["4", "3", "2", "1", "0"]) + |> Api.read!(page: [offset: 1, limit: 1]) + + assert %{results: [%{name: "0"}]} = Api.page!(page, :last) + end + end + + describe "when both are supported" do + setup do + for i <- 0..9 do + Api.create!(Ash.Changeset.new(User, %{name: "#{i}"})) + end + + :ok + end + + test "it defaults to offset pagination" do + assert %Ash.Page.Offset{} = Api.read!(User, action: :both_optional, page: [limit: 10]) + end + + test "it adds a keyset to the records, even though it returns an offset page" do + for result <- Api.read!(User, action: :both_optional, page: [limit: 10]).results do + refute is_nil(result.metadata.keyset) + end + end + end +end diff --git a/test/ash/data_layer/ets_test.exs b/test/ash/data_layer/ets_test.exs index ec883360..e940d4de 100644 --- a/test/ash/data_layer/ets_test.exs +++ b/test/ash/data_layer/ets_test.exs @@ -72,13 +72,12 @@ defmodule Ash.DataLayer.EtsTest do assert %Query{resource: EtsTestUser} = EtsDataLayer.resource_to_query(EtsTestUser) end - test "limit, offset, filter, sortm, aggregate" do + test "limit, offset, filter, sort" do query = EtsDataLayer.resource_to_query(EtsTestUser) assert {:ok, %Query{limit: 3}} = EtsDataLayer.limit(query, 3, :foo) assert {:ok, %Query{offset: 10}} = EtsDataLayer.offset(query, 10, :foo) assert {:ok, %Query{filter: :all}} = EtsDataLayer.filter(query, :all, :foo) assert {:ok, %Query{sort: :asc}} = EtsDataLayer.sort(query, :asc, :foo) - assert {:ok, %Query{aggregates: [:foo]}} = EtsDataLayer.add_aggregate(query, :foo, :bar) end test "create" do