improvement: support streaming without keyset pagination

use the new `allow_stream_with` and `stream_with` options to control
what strategy may be used to stream query results.
This commit is contained in:
Zach Daniel 2023-12-21 08:35:49 -05:00
parent 0932cf32a7
commit eb05e9be63
7 changed files with 394 additions and 79 deletions

View file

@ -28,7 +28,18 @@ defmodule Ash.Actions.Destroy.Bulk do
for_bulk_destroy: action.name for_bulk_destroy: action.name
end end
run(api, api.stream!(query), action, input, Keyword.put(opts, :resource, query.resource)) read_opts =
opts
|> Keyword.drop([:resource, :stream_batch_size, :batch_size])
|> Keyword.put(:batch_size, opts[:stream_batch_size])
run(
api,
api.stream!(query, read_opts),
action,
input,
Keyword.put(opts, :resource, query.resource)
)
end end
def run(api, stream, action, input, opts) do def run(api, stream, action, input, opts) do

View file

@ -0,0 +1,239 @@
defmodule Ash.Actions.Read.Stream do
@moduledoc false
def run!(api, query, opts) do
query = Ash.Query.to_query(query)
query =
if query.action do
query
else
Ash.Query.for_read(
query,
Ash.Resource.Info.primary_action!(query.resource, :read).name
)
end
query
|> stream_strategy(opts[:stream_with], opts[:allow_stream_with])
|> do_stream(query, api, Keyword.drop(opts, [:batch_size, :stream_with, :allow_stream_with]))
end
defp do_stream(:keyset, query, api, opts) do
{batch_size, opts} =
Keyword.pop(
opts,
:batch_size,
query.action.pagination.default_limit || query.action.pagination.max_page_size || 250
)
Stream.resource(
fn -> nil end,
fn
false ->
{:halt, nil}
after_keyset ->
keyset = if after_keyset != nil, do: [after: after_keyset], else: []
page_opts = Keyword.merge(keyset, limit: batch_size)
opts =
Keyword.put(opts, :page, page_opts)
case api.read!(query, opts) do
%{more?: true, results: results} ->
{results, List.last(results).__metadata__.keyset}
%{results: results} ->
{results, false}
end
end,
& &1
)
|> take_query_limit(query)
end
defp do_stream(:offset, query, api, opts) do
if can_pagination_offset?(query) do
stream_with_offset_pagination(query, api, opts)
else
stream_with_limit_offset(query, api, opts)
end
end
defp do_stream(:full_read, query, api, opts) do
Stream.resource(
fn -> true end,
fn
false ->
{:halt, false}
true ->
{api.read!(query, opts), false}
end,
& &1
)
end
defp stream_with_offset_pagination(query, api, opts) do
{limit, opts} =
Keyword.pop(
opts,
:batch_size,
query.action.pagination.default_limit || query.action.pagination.max_page_size || 250
)
Stream.resource(
fn -> 0 end,
fn
false ->
{:halt, nil}
offset ->
page_opts = [limit: limit, offset: offset]
opts =
Keyword.put(opts, :page, page_opts)
case api.read!(query, opts) do
%{more?: true, results: results} ->
{results, offset + limit}
%{results: results} ->
{results, false}
end
end,
& &1
)
|> take_query_limit(query)
end
defp stream_with_limit_offset(query, api, opts) do
{limit, opts} =
Keyword.pop(
opts,
:batch_size,
query.action.pagination.default_limit || query.action.pagination.max_page_size || 250
)
Stream.resource(
fn -> 0 end,
fn
false ->
{:halt, nil}
offset ->
query =
query
|> Ash.Query.limit(limit)
|> Ash.Query.offset(offset)
results = api.read!(query, opts)
if Enum.count(results) == limit do
{results, false}
else
{results, offset + limit}
end
end,
& &1
)
|> take_query_limit(query)
end
@doc false
def stream_strategy(query, chosen_strategy, _) when not is_nil(chosen_strategy) do
case chosen_strategy do
:keyset ->
if can_keyset?(query) do
:keyset
else
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action,
types: [:keyset]
end
:offset ->
if can_offset?(query) do
:offset
else
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action,
types: [:offset]
end
:full_read ->
:full_read
end
end
def stream_strategy(query, nil, allowed_strategy) when not is_nil(allowed_strategy) do
cond do
can_keyset?(query) and allowed_strategy in [:keyset, :offset, :full_read] ->
:keyset
can_offset?(query) and allowed_strategy in [:offset, :full_read] ->
:offset
allowed_strategy == :full_read ->
:full_read
allowed_strategy == :keyset ->
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action,
types: [:keyset]
allowed_strategy == :offset ->
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action,
types: [:keyset, :offset]
end
end
def stream_strategy(query, nil, :full_read) do
if Ash.DataLayer.data_layer_can?(query.resource, :limit) &&
Ash.DataLayer.data_layer_can?(query.resource, :offset) do
:full_read
else
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action,
type: :keyset
end
end
defp can_offset?(query) do
not requires_keyset_pagination?(query) and
(can_pagination_offset?(query) || can_limit_offset?(query))
end
defp can_pagination_offset?(query) do
query.action.pagination && query.action.pagination.offset?
end
def requires_keyset_pagination?(query) do
query.action.pagination.keyset? && not query.action.pagination.offset? &&
query.action.pagination.required?
end
defp can_limit_offset?(query) do
Ash.DataLayer.data_layer_can?(query.resource, :limit) &&
Ash.DataLayer.data_layer_can?(query.resource, :offset)
end
defp can_keyset?(query) do
query.action.pagination && query.action.pagination.keyset?
end
defp take_query_limit(stream, query) do
if query.limit do
Stream.take(stream, query.limit)
else
stream
end
end
end

View file

@ -28,7 +28,18 @@ defmodule Ash.Actions.Update.Bulk do
for_bulk_update: action.name for_bulk_update: action.name
end end
run(api, api.stream!(query), action, input, Keyword.put(opts, :resource, query.resource)) read_opts =
opts
|> Keyword.drop([:resource, :atomic_update, :stream_batch_size, :batch_size])
|> Keyword.put(:batch_size, opts[:stream_batch_size])
run(
api,
api.stream!(query, read_opts),
action,
input,
Keyword.put(opts, :resource, query.resource)
)
end end
def run(api, stream, action, input, opts) do def run(api, stream, action, input, opts) do

View file

@ -157,11 +157,66 @@ defmodule Ash.Api do
@doc false @doc false
def read_opts_schema, do: @read_opts_schema def read_opts_schema, do: @read_opts_schema
@stream_opts [
batch_size: [
type: :integer,
doc:
"How many records to request in each query run. Defaults to the pagination limits on the resource, or 250."
],
allow_stream_with: [
type: {:one_of, [:keyset, :offset, :full_read]},
doc:
"The 'worst' strategy allowed to be used to fetch records. See `Ash.Api.stream!/2` docs for more.",
default: :keyset
],
stream_with: [
type: {:one_of, [:keyset, :offset, :full_read]},
doc:
"The specific strategy to use to fetch records. See `Ash.Api.stream!/2` docs for more."
]
]
|> merge_schemas(
@read_opts_schema,
"Read Options"
)
@doc """ @doc """
Streams the results of a query. Streams the results of a query.
This utilizes keyset pagination to accomplish this stream, and for that reason, ## Strategies
the action for the query must support keyset pagination.
There are three strategies supported, and the best one available is always chosen. They are,
in order from best to worst:
- `:keyset`
- `:offset`
- `:full_read`
By default, only `:keyset` is supported. If you want to allow worse strategies to be used, pass
the worst one you wish to allow as the `allow_stream_with` option, i.e `allow_stream_with: :full_read`.
If you wish to specify a specific strategy to use, pass `stream_with: :strategy_name`.
### Keyset
This utilizes keyset pagination to accomplish this stream. The action must support keyset pagination.
This is the most efficient way to stream a query, because it works by using filters which can benefit
from indexes in the data layer.
### Offset
This utilizes offset/limit to accomplish this stream. If the action supports offset pagination, that will
be used. Otherwise, if the data layer supports limit/offset, then explicit limits/offsets will be used.
This is a much less efficient way of streaming a resource than `keyset`. To use limit/offset to reliably
stream, a sort must always be applied, and limit/offset in the data layer will generally require sorting
the entire table to figure out what is in each batch.
### Full Read
This reads the entire table into memory with no limit. This is, generally speaking, the least efficient.
## Options
#{Spark.OptionsHelpers.docs(@stream_opts)}
""" """
@callback stream!(Ash.Query.t(), opts :: Keyword.t()) :: Enumerable.t(Ash.Resource.record()) @callback stream!(Ash.Query.t(), opts :: Keyword.t()) :: Enumerable.t(Ash.Resource.record())
@ -482,12 +537,21 @@ defmodule Ash.Api do
type: :map, type: :map,
doc: doc:
"A map of atomic updates to apply. See `Ash.Changeset.atomic_update/3` for more." "A map of atomic updates to apply. See `Ash.Changeset.atomic_update/3` for more."
],
stream_batch_size: [
type: :integer,
doc:
"Batch size to use if provided a query and the query must be streamed"
] ]
] ]
|> merge_schemas( |> merge_schemas(
Keyword.delete(@global_opts, :action), Keyword.delete(@global_opts, :action),
"Global options" "Global options"
) )
|> merge_schemas(
Keyword.delete(@stream_opts, :batch_size),
"Stream Options"
)
|> merge_schemas( |> merge_schemas(
@shared_created_update_and_destroy_opts_schema, @shared_created_update_and_destroy_opts_schema,
"Shared create/update/destroy options" "Shared create/update/destroy options"
@ -502,12 +566,21 @@ defmodule Ash.Api do
type: {:spark, Ash.Resource}, type: {:spark, Ash.Resource},
doc: doc:
"The resource being updated. This must be provided if the input given is a stream, so we know ahead of time what the resource being updated is." "The resource being updated. This must be provided if the input given is a stream, so we know ahead of time what the resource being updated is."
],
stream_batch_size: [
type: :integer,
doc:
"Batch size to use if provided a query and the query must be streamed"
] ]
] ]
|> merge_schemas( |> merge_schemas(
Keyword.delete(@global_opts, :action), Keyword.delete(@global_opts, :action),
"Global options" "Global options"
) )
|> merge_schemas(
Keyword.delete(@stream_opts, :batch_size),
"Stream Options"
)
|> merge_schemas( |> merge_schemas(
@shared_created_update_and_destroy_opts_schema, @shared_created_update_and_destroy_opts_schema,
"Shared create/update/destroy options" "Shared create/update/destroy options"
@ -2086,70 +2159,9 @@ defmodule Ash.Api do
@spec stream!(api :: module(), query :: Ash.Query.t(), opts :: Keyword.t()) :: @spec stream!(api :: module(), query :: Ash.Query.t(), opts :: Keyword.t()) ::
Enumerable.t(Ash.Resource.record()) Enumerable.t(Ash.Resource.record())
def stream!(api, query, opts \\ []) do def stream!(api, query, opts \\ []) do
query = Ash.Query.to_query(query) opts = Spark.OptionsHelpers.validate!(opts, @stream_opts)
query = Ash.Actions.Read.Stream.run!(api, query, opts)
if query.action do
query
else
Ash.Query.for_read(
query,
Ash.Resource.Info.primary_action!(query.resource, :read).name
)
end
if !query.action.pagination || !query.action.pagination.keyset? do
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action
end
{batch_size, opts} =
Keyword.pop(
opts,
:batch_size,
query.action.pagination.default_limit || query.action.pagination.max_page_size || 100
)
Stream.resource(
fn -> nil end,
fn
false ->
{:halt, nil}
after_keyset ->
keyset = if after_keyset != nil, do: [after: after_keyset], else: []
page_opts = Keyword.merge(keyset, limit: batch_size)
opts =
[
page: page_opts
]
|> Keyword.merge(opts)
case api.read!(query, opts) do
%{more?: true, results: results} ->
{results, List.last(results).__metadata__.keyset}
%{results: results} ->
{results, false}
end
end,
& &1
)
|> take_query_limit(query)
end
# This is technically an inefficient way to do this
# because the last request we make will take `query.limit` instead of
# calculating a smaller limit based on how many records we've received
# so far.
defp take_query_limit(stream, query) do
if query.limit do
Stream.take(stream, query.limit)
else
stream
end
end end
@doc false @doc false

View file

@ -2,7 +2,9 @@ defmodule Ash.Error.Invalid.NonStreamableAction do
@moduledoc "Used when Api.stream is used with an action that does not support keyset pagination" @moduledoc "Used when Api.stream is used with an action that does not support keyset pagination"
use Ash.Error.Exception use Ash.Error.Exception
def_ash_error([:resource, :action, :for_bulk_update, :for_bulk_destroy], class: :invalid) def_ash_error([:resource, :action, :for_bulk_update, :for_bulk_destroy, types: [:keyset]],
class: :invalid
)
defimpl Ash.ErrorKind do defimpl Ash.ErrorKind do
def id(_), do: Ash.UUID.generate() def id(_), do: Ash.UUID.generate()
@ -12,32 +14,46 @@ defmodule Ash.Error.Invalid.NonStreamableAction do
def message(%{for_bulk_update: action} = error) when not is_nil(action) do def message(%{for_bulk_update: action} = error) when not is_nil(action) do
""" """
You are attempting to pair read action #{error.action.name} with bulk update You are attempting to pair read action #{error.action.name} with bulk update
action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not support streaming. action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not
support streaming with one of #{inspect(error.types)}.
To enable it, keyset pagination to the action #{error.action.name}: #{how_to_enable(error)}
pagination keyset?: true, required?: false
""" """
end end
def message(%{for_bulk_destroy: action} = error) when not is_nil(action) do def message(%{for_bulk_destroy: action} = error) when not is_nil(action) do
""" """
You are attempting to pair read action #{error.action.name} with bulk destroy You are attempting to pair read action #{error.action.name} with bulk destroy
action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not support streaming. action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not
support streaming with one of #{inspect(error.types)}.
To enable it, keyset pagination to the action #{error.action.name}: #{how_to_enable(error)}
pagination keyset?: true, required?: false
""" """
end end
def message(error) do def message(error) do
""" """
Action #{inspect(error.resource)}.#{error.action.name} does not support streaming. Action #{inspect(error.resource)}.#{error.action.name} does not support streaming with one of #{inspect(error.types)}.
To enable it, keyset pagination to the action #{error.action.name}: #{how_to_enable(error)}
"""
end
defp how_to_enable(error) do
"""
There are two ways to handle this.
1.) Use the `allow_stream_with` or `stream_with` options to control what strategies are allowed.
2.) Enable the respective required pagination type on the action #{error.action.name}, for example:
# allow keyset
pagination keyset?: true, required?: false pagination keyset?: true, required?: false
# allow offset
pagination offset?: true, required?: false
# allow both
pagination offset?: true, keyset?: true, required?: false
""" """
end end
end end

View file

@ -15,7 +15,7 @@ defmodule Ash.Test.Actions.StreamTest do
read :read do read :read do
primary? true primary? true
pagination keyset?: true pagination keyset?: true, offset?: true, required?: false
end end
read :read_with_no_pagination read :read_with_no_pagination
@ -60,6 +60,32 @@ defmodule Ash.Test.Actions.StreamTest do
assert count == 10 assert count == 10
end end
test "records can be streamed using limit/offset strategy" do
1..10
|> Stream.map(&%{title: "title#{&1}"})
|> Api.bulk_create!(Post, :create)
count =
Post
|> Api.stream!(batch_size: 5, stream_with: :offset)
|> Enum.count()
assert count == 10
end
test "records can be streamed using full_read strategy" do
1..10
|> Stream.map(&%{title: "title#{&1}"})
|> Api.bulk_create!(Post, :create)
count =
Post
|> Api.stream!(batch_size: 5, stream_with: :full_read)
|> Enum.count()
assert count == 10
end
test "records can be streamed, and the overall limit will be honored" do test "records can be streamed, and the overall limit will be honored" do
1..10 1..10
|> Stream.map(&%{title: "title#{&1}"}) |> Stream.map(&%{title: "title#{&1}"})