feat: add Api.stream!/1

This commit is contained in:
Zach Daniel 2023-04-26 17:12:21 -04:00
parent 0398883079
commit fbc341b3a0
6 changed files with 155 additions and 2 deletions

View file

@ -146,6 +146,14 @@ defmodule Ash.Api do
@doc false
def read_opts_schema, do: @read_opts_schema
@doc """
Streams the results of a query.
This utilizes keyset pagination to accomplish this stream, and for that reason,
the action for the query must support keyset pagination.
"""
@callback stream!(Ash.Query.t(), opts :: Keyword.t()) :: Enumerable.t(Ash.Resource.record())
@offset_page_opts [
offset: [
type: :non_neg_integer,
@ -1406,6 +1414,62 @@ defmodule Ash.Api do
end
end
@spec stream!(api :: module(), query :: Ash.Query.t(), opts :: Keyword.t()) ::
Enumerable.t(Ash.Resource.record())
def stream!(api, query, opts \\ []) do
query = Ash.Query.to_query(query)
query =
if query.action do
query
else
Ash.Query.for_read(
query,
Ash.Resource.Info.primary_action!(query.resource, :read).name
)
end
{batch_size, opts} =
Keyword.pop(
opts,
:batch_size,
query.action.pagination.default_limit || query.action.pagination.max_page_size || 100
)
Stream.resource(
fn -> nil end,
fn
false ->
{:halt, nil}
after_keyset ->
if is_nil(query.action.pagination) || !query.action.pagination.keyset? do
raise Ash.Error.Invalid.NonStreamableAction,
resource: query.resource,
action: query.action
end
keyset = if after_keyset != nil, do: [after: after_keyset], else: []
page_opts = Keyword.merge(keyset, limit: batch_size)
opts =
[
page: page_opts
]
|> Keyword.merge(opts)
case api.read!(query, opts) do
%{more?: true, results: results} ->
{results, List.last(results).__metadata__.keyset}
%{results: results} ->
{results, false}
end
end,
& &1
)
end
@doc false
@spec read!(Ash.Api.t(), Ash.Query.t() | Ash.Resource.t(), Keyword.t()) ::
list(Ash.Resource.record()) | Ash.Page.page() | no_return
@ -1512,8 +1576,6 @@ defmodule Ash.Api do
| {Ash.Resource.record(), list(Ash.Notifier.Notification.t())}
| no_return
def create!(api, changeset, opts) do
opts = Spark.OptionsHelpers.validate!(opts, @create_opts_schema)
api
|> create(changeset, opts)
|> unwrap_or_raise!(opts[:stacktraces?])

View file

@ -212,6 +212,10 @@ defmodule Ash.Api.Interface do
end
end
def stream!(query, opts \\ []) do
Ash.Api.stream!(__MODULE__, query, opts)
end
def read!(query, opts \\ [])
def read!(query, opts) do

View file

@ -0,0 +1,22 @@
defmodule Ash.Error.Invalid.NonStreamableAction do
@moduledoc "Used when Api.stream is used with an action that does not support keyset pagination"
use Ash.Error.Exception
def_ash_error([:resource, :action], class: :invalid)
defimpl Ash.ErrorKind do
def id(_), do: Ash.UUID.generate()
def code(_), do: "non_streamable_action"
def message(error) do
"""
Action #{inspect(error.resource)}.#{error.action.name} does not support streaming.
To enable it, keyset pagination to the action #{error.action.name}:
pagination keyset?: true
"""
end
end
end

View file

@ -0,0 +1,65 @@
defmodule Ash.Test.Actions.BulkCreateTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
actions do
defaults [:create, :update, :destroy]
read :read do
primary? true
pagination keyset?: true
end
read :read_with_no_pagination
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
timestamps()
end
end
defmodule Registry do
@moduledoc false
use Ash.Registry
entries do
entry Post
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
registry Registry
end
end
test "records can be streamed" do
1..10
|> Enum.each(fn i ->
Post
|> Ash.Changeset.for_create(:create, %{title: "title#{i}"})
|> Api.create!()
end)
count =
Post
|> Api.stream!(batch_size: 5)
|> Enum.count()
assert count == 10
end
end