diff --git a/lib/ash/actions/destroy/bulk.ex b/lib/ash/actions/destroy/bulk.ex index a8058da1..d6550f28 100644 --- a/lib/ash/actions/destroy/bulk.ex +++ b/lib/ash/actions/destroy/bulk.ex @@ -28,7 +28,18 @@ defmodule Ash.Actions.Destroy.Bulk do for_bulk_destroy: action.name end - run(api, api.stream!(query), action, input, Keyword.put(opts, :resource, query.resource)) + read_opts = + opts + |> Keyword.drop([:resource, :stream_batch_size, :batch_size]) + |> Keyword.put(:batch_size, opts[:stream_batch_size]) + + run( + api, + api.stream!(query, read_opts), + action, + input, + Keyword.put(opts, :resource, query.resource) + ) end def run(api, stream, action, input, opts) do diff --git a/lib/ash/actions/read.ex b/lib/ash/actions/read/read.ex similarity index 100% rename from lib/ash/actions/read.ex rename to lib/ash/actions/read/read.ex diff --git a/lib/ash/actions/read/stream.ex b/lib/ash/actions/read/stream.ex new file mode 100644 index 00000000..78128a3f --- /dev/null +++ b/lib/ash/actions/read/stream.ex @@ -0,0 +1,239 @@ +defmodule Ash.Actions.Read.Stream do + @moduledoc false + + def run!(api, query, opts) do + query = Ash.Query.to_query(query) + + query = + if query.action do + query + else + Ash.Query.for_read( + query, + Ash.Resource.Info.primary_action!(query.resource, :read).name + ) + end + + query + |> stream_strategy(opts[:stream_with], opts[:allow_stream_with]) + |> do_stream(query, api, Keyword.drop(opts, [:batch_size, :stream_with, :allow_stream_with])) + end + + defp do_stream(:keyset, query, api, opts) do + {batch_size, opts} = + Keyword.pop( + opts, + :batch_size, + query.action.pagination.default_limit || query.action.pagination.max_page_size || 250 + ) + + Stream.resource( + fn -> nil end, + fn + false -> + {:halt, nil} + + after_keyset -> + keyset = if after_keyset != nil, do: [after: after_keyset], else: [] + page_opts = Keyword.merge(keyset, limit: batch_size) + + opts = + Keyword.put(opts, :page, page_opts) + + case api.read!(query, opts) do + %{more?: true, results: results} -> + {results, List.last(results).__metadata__.keyset} + + %{results: results} -> + {results, false} + end + end, + & &1 + ) + |> take_query_limit(query) + end + + defp do_stream(:offset, query, api, opts) do + if can_pagination_offset?(query) do + stream_with_offset_pagination(query, api, opts) + else + stream_with_limit_offset(query, api, opts) + end + end + + defp do_stream(:full_read, query, api, opts) do + Stream.resource( + fn -> true end, + fn + false -> + {:halt, false} + + true -> + {api.read!(query, opts), false} + end, + & &1 + ) + end + + defp stream_with_offset_pagination(query, api, opts) do + {limit, opts} = + Keyword.pop( + opts, + :batch_size, + query.action.pagination.default_limit || query.action.pagination.max_page_size || 250 + ) + + Stream.resource( + fn -> 0 end, + fn + false -> + {:halt, nil} + + offset -> + page_opts = [limit: limit, offset: offset] + + opts = + Keyword.put(opts, :page, page_opts) + + case api.read!(query, opts) do + %{more?: true, results: results} -> + {results, offset + limit} + + %{results: results} -> + {results, false} + end + end, + & &1 + ) + |> take_query_limit(query) + end + + defp stream_with_limit_offset(query, api, opts) do + {limit, opts} = + Keyword.pop( + opts, + :batch_size, + query.action.pagination.default_limit || query.action.pagination.max_page_size || 250 + ) + + Stream.resource( + fn -> 0 end, + fn + false -> + {:halt, nil} + + offset -> + query = + query + |> Ash.Query.limit(limit) + |> Ash.Query.offset(offset) + + results = api.read!(query, opts) + + if Enum.count(results) == limit do + {results, false} + else + {results, offset + limit} + end + end, + & &1 + ) + |> take_query_limit(query) + end + + @doc false + def stream_strategy(query, chosen_strategy, _) when not is_nil(chosen_strategy) do + case chosen_strategy do + :keyset -> + if can_keyset?(query) do + :keyset + else + raise Ash.Error.Invalid.NonStreamableAction, + resource: query.resource, + action: query.action, + types: [:keyset] + end + + :offset -> + if can_offset?(query) do + :offset + else + raise Ash.Error.Invalid.NonStreamableAction, + resource: query.resource, + action: query.action, + types: [:offset] + end + + :full_read -> + :full_read + end + end + + def stream_strategy(query, nil, allowed_strategy) when not is_nil(allowed_strategy) do + cond do + can_keyset?(query) and allowed_strategy in [:keyset, :offset, :full_read] -> + :keyset + + can_offset?(query) and allowed_strategy in [:offset, :full_read] -> + :offset + + allowed_strategy == :full_read -> + :full_read + + allowed_strategy == :keyset -> + raise Ash.Error.Invalid.NonStreamableAction, + resource: query.resource, + action: query.action, + types: [:keyset] + + allowed_strategy == :offset -> + raise Ash.Error.Invalid.NonStreamableAction, + resource: query.resource, + action: query.action, + types: [:keyset, :offset] + end + end + + def stream_strategy(query, nil, :full_read) do + if Ash.DataLayer.data_layer_can?(query.resource, :limit) && + Ash.DataLayer.data_layer_can?(query.resource, :offset) do + :full_read + else + raise Ash.Error.Invalid.NonStreamableAction, + resource: query.resource, + action: query.action, + type: :keyset + end + end + + defp can_offset?(query) do + not requires_keyset_pagination?(query) and + (can_pagination_offset?(query) || can_limit_offset?(query)) + end + + defp can_pagination_offset?(query) do + query.action.pagination && query.action.pagination.offset? + end + + def requires_keyset_pagination?(query) do + query.action.pagination.keyset? && not query.action.pagination.offset? && + query.action.pagination.required? + end + + defp can_limit_offset?(query) do + Ash.DataLayer.data_layer_can?(query.resource, :limit) && + Ash.DataLayer.data_layer_can?(query.resource, :offset) + end + + defp can_keyset?(query) do + query.action.pagination && query.action.pagination.keyset? + end + + defp take_query_limit(stream, query) do + if query.limit do + Stream.take(stream, query.limit) + else + stream + end + end +end diff --git a/lib/ash/actions/update/bulk.ex b/lib/ash/actions/update/bulk.ex index a70449bf..82ea6935 100644 --- a/lib/ash/actions/update/bulk.ex +++ b/lib/ash/actions/update/bulk.ex @@ -28,7 +28,18 @@ defmodule Ash.Actions.Update.Bulk do for_bulk_update: action.name end - run(api, api.stream!(query), action, input, Keyword.put(opts, :resource, query.resource)) + read_opts = + opts + |> Keyword.drop([:resource, :atomic_update, :stream_batch_size, :batch_size]) + |> Keyword.put(:batch_size, opts[:stream_batch_size]) + + run( + api, + api.stream!(query, read_opts), + action, + input, + Keyword.put(opts, :resource, query.resource) + ) end def run(api, stream, action, input, opts) do diff --git a/lib/ash/api/api.ex b/lib/ash/api/api.ex index eef193df..fa3b5e9c 100644 --- a/lib/ash/api/api.ex +++ b/lib/ash/api/api.ex @@ -157,11 +157,66 @@ defmodule Ash.Api do @doc false def read_opts_schema, do: @read_opts_schema + @stream_opts [ + batch_size: [ + type: :integer, + doc: + "How many records to request in each query run. Defaults to the pagination limits on the resource, or 250." + ], + allow_stream_with: [ + type: {:one_of, [:keyset, :offset, :full_read]}, + doc: + "The 'worst' strategy allowed to be used to fetch records. See `Ash.Api.stream!/2` docs for more.", + default: :keyset + ], + stream_with: [ + type: {:one_of, [:keyset, :offset, :full_read]}, + doc: + "The specific strategy to use to fetch records. See `Ash.Api.stream!/2` docs for more." + ] + ] + |> merge_schemas( + @read_opts_schema, + "Read Options" + ) + @doc """ Streams the results of a query. - This utilizes keyset pagination to accomplish this stream, and for that reason, - the action for the query must support keyset pagination. + ## Strategies + + There are three strategies supported, and the best one available is always chosen. They are, + in order from best to worst: + + - `:keyset` + - `:offset` + - `:full_read` + + By default, only `:keyset` is supported. If you want to allow worse strategies to be used, pass + the worst one you wish to allow as the `allow_stream_with` option, i.e `allow_stream_with: :full_read`. + If you wish to specify a specific strategy to use, pass `stream_with: :strategy_name`. + + ### Keyset + + This utilizes keyset pagination to accomplish this stream. The action must support keyset pagination. + This is the most efficient way to stream a query, because it works by using filters which can benefit + from indexes in the data layer. + + ### Offset + + This utilizes offset/limit to accomplish this stream. If the action supports offset pagination, that will + be used. Otherwise, if the data layer supports limit/offset, then explicit limits/offsets will be used. + This is a much less efficient way of streaming a resource than `keyset`. To use limit/offset to reliably + stream, a sort must always be applied, and limit/offset in the data layer will generally require sorting + the entire table to figure out what is in each batch. + + ### Full Read + + This reads the entire table into memory with no limit. This is, generally speaking, the least efficient. + + ## Options + + #{Spark.OptionsHelpers.docs(@stream_opts)} """ @callback stream!(Ash.Query.t(), opts :: Keyword.t()) :: Enumerable.t(Ash.Resource.record()) @@ -482,12 +537,21 @@ defmodule Ash.Api do type: :map, doc: "A map of atomic updates to apply. See `Ash.Changeset.atomic_update/3` for more." + ], + stream_batch_size: [ + type: :integer, + doc: + "Batch size to use if provided a query and the query must be streamed" ] ] |> merge_schemas( Keyword.delete(@global_opts, :action), "Global options" ) + |> merge_schemas( + Keyword.delete(@stream_opts, :batch_size), + "Stream Options" + ) |> merge_schemas( @shared_created_update_and_destroy_opts_schema, "Shared create/update/destroy options" @@ -502,12 +566,21 @@ defmodule Ash.Api do type: {:spark, Ash.Resource}, doc: "The resource being updated. This must be provided if the input given is a stream, so we know ahead of time what the resource being updated is." + ], + stream_batch_size: [ + type: :integer, + doc: + "Batch size to use if provided a query and the query must be streamed" ] ] |> merge_schemas( Keyword.delete(@global_opts, :action), "Global options" ) + |> merge_schemas( + Keyword.delete(@stream_opts, :batch_size), + "Stream Options" + ) |> merge_schemas( @shared_created_update_and_destroy_opts_schema, "Shared create/update/destroy options" @@ -2086,70 +2159,9 @@ defmodule Ash.Api do @spec stream!(api :: module(), query :: Ash.Query.t(), opts :: Keyword.t()) :: Enumerable.t(Ash.Resource.record()) def stream!(api, query, opts \\ []) do - query = Ash.Query.to_query(query) + opts = Spark.OptionsHelpers.validate!(opts, @stream_opts) - query = - if query.action do - query - else - Ash.Query.for_read( - query, - Ash.Resource.Info.primary_action!(query.resource, :read).name - ) - end - - if !query.action.pagination || !query.action.pagination.keyset? do - raise Ash.Error.Invalid.NonStreamableAction, - resource: query.resource, - action: query.action - end - - {batch_size, opts} = - Keyword.pop( - opts, - :batch_size, - query.action.pagination.default_limit || query.action.pagination.max_page_size || 100 - ) - - Stream.resource( - fn -> nil end, - fn - false -> - {:halt, nil} - - after_keyset -> - keyset = if after_keyset != nil, do: [after: after_keyset], else: [] - page_opts = Keyword.merge(keyset, limit: batch_size) - - opts = - [ - page: page_opts - ] - |> Keyword.merge(opts) - - case api.read!(query, opts) do - %{more?: true, results: results} -> - {results, List.last(results).__metadata__.keyset} - - %{results: results} -> - {results, false} - end - end, - & &1 - ) - |> take_query_limit(query) - end - - # This is technically an inefficient way to do this - # because the last request we make will take `query.limit` instead of - # calculating a smaller limit based on how many records we've received - # so far. - defp take_query_limit(stream, query) do - if query.limit do - Stream.take(stream, query.limit) - else - stream - end + Ash.Actions.Read.Stream.run!(api, query, opts) end @doc false diff --git a/lib/ash/error/invalid/non_streamable_action.ex b/lib/ash/error/invalid/non_streamable_action.ex index 1f2e736e..6f7f88fb 100644 --- a/lib/ash/error/invalid/non_streamable_action.ex +++ b/lib/ash/error/invalid/non_streamable_action.ex @@ -2,7 +2,9 @@ defmodule Ash.Error.Invalid.NonStreamableAction do @moduledoc "Used when Api.stream is used with an action that does not support keyset pagination" use Ash.Error.Exception - def_ash_error([:resource, :action, :for_bulk_update, :for_bulk_destroy], class: :invalid) + def_ash_error([:resource, :action, :for_bulk_update, :for_bulk_destroy, types: [:keyset]], + class: :invalid + ) defimpl Ash.ErrorKind do def id(_), do: Ash.UUID.generate() @@ -12,32 +14,46 @@ defmodule Ash.Error.Invalid.NonStreamableAction do def message(%{for_bulk_update: action} = error) when not is_nil(action) do """ You are attempting to pair read action #{error.action.name} with bulk update - action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not support streaming. + action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not + support streaming with one of #{inspect(error.types)}. - To enable it, keyset pagination to the action #{error.action.name}: - - pagination keyset?: true, required?: false + #{how_to_enable(error)} """ end def message(%{for_bulk_destroy: action} = error) when not is_nil(action) do """ You are attempting to pair read action #{error.action.name} with bulk destroy - action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not support streaming. + action #{action}, but #{inspect(error.resource)}.#{error.action.name} does not + support streaming with one of #{inspect(error.types)}. - To enable it, keyset pagination to the action #{error.action.name}: - - pagination keyset?: true, required?: false + #{how_to_enable(error)} """ end def message(error) do """ - Action #{inspect(error.resource)}.#{error.action.name} does not support streaming. + Action #{inspect(error.resource)}.#{error.action.name} does not support streaming with one of #{inspect(error.types)}. - To enable it, keyset pagination to the action #{error.action.name}: + #{how_to_enable(error)} + """ + end + defp how_to_enable(error) do + """ + There are two ways to handle this. + + 1.) Use the `allow_stream_with` or `stream_with` options to control what strategies are allowed. + 2.) Enable the respective required pagination type on the action #{error.action.name}, for example: + + # allow keyset pagination keyset?: true, required?: false + + # allow offset + pagination offset?: true, required?: false + + # allow both + pagination offset?: true, keyset?: true, required?: false """ end end diff --git a/test/actions/stream_test.exs b/test/actions/stream_test.exs index 4366cdb0..4f16ef49 100644 --- a/test/actions/stream_test.exs +++ b/test/actions/stream_test.exs @@ -15,7 +15,7 @@ defmodule Ash.Test.Actions.StreamTest do read :read do primary? true - pagination keyset?: true + pagination keyset?: true, offset?: true, required?: false end read :read_with_no_pagination @@ -60,6 +60,32 @@ defmodule Ash.Test.Actions.StreamTest do assert count == 10 end + test "records can be streamed using limit/offset strategy" do + 1..10 + |> Stream.map(&%{title: "title#{&1}"}) + |> Api.bulk_create!(Post, :create) + + count = + Post + |> Api.stream!(batch_size: 5, stream_with: :offset) + |> Enum.count() + + assert count == 10 + end + + test "records can be streamed using full_read strategy" do + 1..10 + |> Stream.map(&%{title: "title#{&1}"}) + |> Api.bulk_create!(Post, :create) + + count = + Post + |> Api.stream!(batch_size: 5, stream_with: :full_read) + |> Enum.count() + + assert count == 10 + end + test "records can be streamed, and the overall limit will be honored" do 1..10 |> Stream.map(&%{title: "title#{&1}"})