improvement: finishing initial touches on bulk actions

This commit is contained in:
Zach Daniel 2023-05-02 17:24:24 -04:00
parent 0d6a86e54e
commit 1e61ae2d3c
5 changed files with 553 additions and 145 deletions

View file

@ -8,6 +8,15 @@ defmodule Ash.Actions.Create.Bulk do
def run(api, resource, action, inputs, opts) do
action = Ash.Resource.Info.action(resource, action)
if opts[:transaction] == :all && opts[:return_stream?] do
raise ArgumentError,
"Cannot specify `transaction: :all` and `return_stream?: true` together"
end
if opts[:return_stream?] && opts[:sorted?] do
raise ArgumentError, "Cannot specify `sorted?: true` and `return_stream?: true` together"
end
if opts[:transaction] == :all &&
Ash.DataLayer.data_layer_can?(resource, :transact) do
notify? =
@ -73,7 +82,6 @@ defmodule Ash.Actions.Create.Bulk do
raise "Cannot upsert bulk actions currently"
end
# TODO: add process context without a changeset
{_, opts} = Ash.Actions.Helpers.add_process_context(api, Ash.Changeset.new(resource), opts)
manual_action_can_bulk? =
@ -111,17 +119,7 @@ defmodule Ash.Actions.Create.Bulk do
|> Ash.Changeset.prepare_changeset_for_action(action, opts, input)
|> Ash.Changeset.run_before_transaction_hooks()
end)
|> Stream.map(fn changeset ->
if changeset.valid? do
{:ok, changeset}
else
if opts[:stop_on_error?] do
throw({:error, Ash.Error.to_error_class(changeset.error), 0, []})
else
{:error, changeset}
end
end
end)
|> transform_and_stop_on_errors(opts)
|> Stream.transform(
fn -> %{batch: [], count: 0, must_return_records?: opts[:notify?]} end,
fn
@ -195,6 +193,8 @@ defmodule Ash.Actions.Create.Bulk do
opts[:tracer]
)
batch = authorize(batch, api, opts)
{batch, changesets_by_index} =
Enum.reduce(batch, {[], %{}}, fn changeset,
{changesets, changesets_by_index} ->
@ -263,6 +263,8 @@ defmodule Ash.Actions.Create.Bulk do
opts[:tracer]
)
batch = authorize(batch, api, opts)
{batch, changesets_by_index} =
Enum.reduce(batch, {[], %{}}, fn changeset, {changesets, changesets_by_index} ->
{[changeset | changesets],
@ -290,51 +292,138 @@ defmodule Ash.Actions.Create.Bulk do
end)
if opts[:return_stream?] do
raise "can't do this yet"
Stream.transform(
result,
fn -> nil end,
fn item, nil ->
case item do
{:error, error} ->
if opts[:stop_on_error?] do
{:halt, {[], error}}
else
{error
|> List.wrap()
|> Stream.map(&{:error, &1}), nil}
end
{:error, notifications, error} ->
if opts[:stop_on_error?] do
{:halt, {[], error}}
else
{error
|> List.wrap()
|> Stream.map(&{:error, &1})
|> notify_stream(notifications, resource, action, opts), nil}
end
{:ok, invalid, notifications} ->
if opts[:stop_on_error?] && !Enum.empty?(invalid) do
{:halt, {[], invalid}}
else
{invalid
|> Stream.map(&{:error, &1})
|> notify_stream(notifications, resource, action, opts), nil}
end
{:ok, batch_result, invalid, notifications} ->
if opts[:stop_on_error?] && !Enum.empty?(invalid) do
{:halt, {invalid, batch_result}}
else
if opts[:return_records?] do
{batch_result
|> Stream.map(&{:ok, &1})
|> Stream.concat(Stream.map(invalid, &{:error, &1}))
|> notify_stream(notifications, resource, action, opts), nil}
else
{invalid
|> Stream.map(&{:error, &1})
|> notify_stream(notifications, resource, action, opts), nil}
end
end
:ok ->
{[], nil}
end
end,
fn
{errors, successes} ->
remaining_items =
successes
|> Stream.map(&{:ok, &1})
|> Stream.concat(Stream.map(errors, &{:error, &1}))
{remaining_items, nil}
nil ->
{[], nil}
end,
& &1
)
else
result
|> Enum.reduce(%Ash.BulkResult{status: :success, records: [], errors: []}, fn
|> Enum.reduce(%Ash.BulkResult{status: :empty, records: [], errors: []}, fn
{:error, error}, result ->
%{
result
| errors: [error | result.errors],
status: errored(result.status)
}
{:error, notifications, error}, result ->
%{
result
| errors: [error | result.errors],
status: :partial_success,
notifications: notifications
status: errored(result.status),
notifications: (result.notifications || []) ++ notifications
}
{:ok, batch_result, notifications}, result ->
notifications =
if opts[:notify?] do
Ash.Notifier.notify(notifications)
else
[]
{:ok, [], notifications}, result ->
%{
result
| notifications: (result.notifications || []) ++ notifications,
status: success(result.status)
}
{:ok, invalid, notifications}, result ->
%{
result
| errors: invalid ++ result.errors,
status: :partial_success,
notifications: (result.notifications || []) ++ notifications
}
{:ok, batch_result, invalid, notifications}, result ->
status =
case invalid do
[] -> success(result.status)
_ -> :partial_success
end
records = Enum.concat(Enum.to_list(batch_result), result.records)
records =
if opts[:return_records?] do
Enum.concat(Enum.to_list(batch_result), result.records)
else
result.records
end
if opts[:return_notifications] do
%{result | records: records, notifications: notifications}
else
notifications =
if Process.get(:ash_started_transaction?) do
current_notifications = Process.get(:ash_notifications, [])
Process.put(
:ash_notifications,
current_notifications ++ notifications
)
[]
else
notifications
end
%{result | records: records, notifications: notifications}
end
%{
result
| records: records,
errors: invalid ++ result.errors,
status: status,
notifications: (result.notifications || []) ++ notifications
}
:ok, result ->
result
end)
|> case do
%{status: :empty} = result ->
%{result | status: :success}
other ->
other
end
end
catch
{:error, error, batch_number, notifications} ->
@ -352,12 +441,83 @@ defmodule Ash.Actions.Create.Bulk do
}
end
defp handle_bulk_result(bulk_result, resource, action, opts) do
defp notify_stream(stream, notifications, resource, action, opts) do
if opts[:notify?] do
notifications = List.wrap(notifications)
if opts[:return_notifications?] do
Stream.concat(stream, Stream.map(notifications, &{:notification, &1}))
else
remaining = Ash.Notifier.notify(notifications)
Ash.Actions.Helpers.warn_missed!(resource, action, %{
resource_notifications: remaining
})
stream
end
else
stream
end
end
defp errored(:empty), do: :error
defp errored(:partial_success), do: :partial_success
defp errored(:success), do: :partial_success
defp errored(:error), do: :error
defp success(:empty), do: :success
defp success(:partial_success), do: :partial_success
defp success(:error), do: :partial_success
defp success(:success), do: :success
defp transform_and_stop_on_errors(stream, opts) do
Stream.map(stream, fn changeset ->
if changeset.valid? do
{:ok, changeset}
else
if opts[:stop_on_error?] && !opts[:return_stream?] do
throw({:error, Ash.Error.to_error_class(changeset.error), 0, []})
else
changeset
end
end
end)
end
defp authorize(batch, api, opts) do
if opts[:authorize?] do
batch
|> Stream.map(fn changeset ->
if changeset.valid? do
case api.can(changeset, opts[:actor], return_forbidden_error?: true, maybe_is: false) do
{:ok, true} ->
changeset
{:ok, false, error} ->
Ash.Changeset.add_error(changeset, error)
{:error, error} ->
Ash.Changeset.add_error(changeset, error)
end
else
changeset
end
end)
else
batch
end
end
defp handle_bulk_result(%Ash.BulkResult{} = bulk_result, resource, action, opts) do
bulk_result
|> notify(resource, action, opts)
|> sort(opts)
end
# for when we return a stream
defp handle_bulk_result(stream, _, _, _), do: stream
defp sort(%{records: records} = result, opts) when is_list(records) do
if opts[:sorted?] do
%{result | records: Enum.sort_by(records, & &1.__metadata__.bulk_create_index)}
@ -399,93 +559,116 @@ defmodule Ash.Actions.Create.Bulk do
data_layer_can_bulk?,
api
) do
{batch, notifications} =
Enum.reduce(batch, {[], []}, fn changeset, {changesets, notifications} ->
{changeset, %{notifications: new_notifications}} =
Ash.Changeset.run_before_actions(changeset)
{batch, invalid, notifications} =
Enum.reduce(batch, {[], [], []}, fn changeset, {changesets, invalid, notifications} ->
if changeset.valid? do
{changeset, %{notifications: new_notifications}} =
Ash.Changeset.run_before_actions(changeset)
{[changeset | changesets], notifications ++ new_notifications}
if changeset.valid? do
{[changeset | changesets], invalid, notifications ++ new_notifications}
else
if opts[:stop_on_error?] && !opts[:return_stream?] do
throw({:error, Ash.Error.to_error_class(changeset.error), 0, []})
end
{changesets, [changeset | invalid], notifications ++ new_notifications}
end
else
{changesets, [changeset | invalid], notifications}
end
end)
case action.manual do
{mod, opts} ->
if function_exported?(mod, :bulk_create, 3) do
mod.bulk_create(batch, opts, %{
actor: opts[:actor],
authorize?: opts[:authorize?],
tracer: opts[:tracer],
api: api,
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
else
[changeset] = batch
case batch do
[] ->
{:ok, [], invalid, notifications}
case mod.create(changeset, opts, %{
actor: opts[:actor],
tenant: opts[:tenant],
authorize?: opts[:authorize?],
tracer: opts[:tracer],
api: api
}) do
{:ok, result} ->
{:ok,
[
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)
]}
batch ->
case action.manual do
{mod, opts} ->
if function_exported?(mod, :bulk_create, 3) do
mod.bulk_create(batch, opts, %{
actor: opts[:actor],
authorize?: opts[:authorize?],
tracer: opts[:tracer],
api: api,
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
else
[changeset] = batch
{:error, error} ->
{:error, error}
end
case mod.create(changeset, opts, %{
actor: opts[:actor],
tenant: opts[:tenant],
authorize?: opts[:authorize?],
tracer: opts[:tracer],
api: api
}) do
{:ok, result} ->
{:ok,
[
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)
]}
{:error, error} ->
{:error, error}
end
end
_ ->
if data_layer_can_bulk? do
Ash.DataLayer.bulk_create(resource, batch, %{
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
else
[changeset] = batch
case Ash.DataLayer.create(resource, changeset) do
{:ok, result} ->
{:ok,
[
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)
]}
{:error, error} ->
{:error, error}
end
end
end
|> case do
{:ok, result} ->
{:ok, result, invalid, notifications}
_ ->
if data_layer_can_bulk? do
Ash.DataLayer.bulk_create(resource, batch, %{
batch_size: count,
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,
tenant: opts[:tenant]
})
else
[changeset] = batch
:ok ->
{:ok, invalid, notifications}
case Ash.DataLayer.create(resource, changeset) do
{:ok, result} ->
{:ok,
[
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context.bulk_create.index
)
]}
{:error, error} ->
{:error, error}
end
other ->
other
end
end
|> case do
{:ok, result} ->
{:ok, result, notifications}
other ->
other
end
end
defp run_after_action_hooks(:ok, _), do: :ok
defp run_after_action_hooks({:ok, invalid, notifications}, changesets_by_index) do
{:ok, invalid, notifications, changesets_by_index}
end
defp run_after_action_hooks({:ok, batch_results, notifications}, changesets_by_index) do
defp run_after_action_hooks({:ok, batch_results, invalid, notifications}, changesets_by_index) do
batch_results
|> Enum.reduce_while(
{:ok, [], notifications, changesets_by_index},
@ -505,7 +688,7 @@ defmodule Ash.Actions.Create.Bulk do
)
|> case do
{:ok, results, notifications, changesets_by_index} ->
{:ok, results, notifications, changesets_by_index}
{:ok, results, invalid, notifications, changesets_by_index}
other ->
other
@ -516,10 +699,11 @@ defmodule Ash.Actions.Create.Bulk do
{:error, error}
end
defp process_results(:ok, _, _, _), do: :ok
defp process_results({:ok, invalid, notifications, _changesets_by_index}, _, _, _),
do: {:ok, invalid, notifications}
defp process_results(
{:ok, batch, notifications, changesets_by_index},
{:ok, batch, invalid, notifications, changesets_by_index},
changes,
all_changes,
opts
@ -549,7 +733,7 @@ defmodule Ash.Actions.Create.Bulk do
{results, notifications, changesets_by_index, []} ->
case run_bulk_after_changes(changes, all_changes, results, changesets_by_index, opts) do
{results, new_notifications, []} ->
{:ok, results, new_notifications ++ notifications}
{:ok, results, invalid, new_notifications ++ notifications}
{_results, _new_notifications, errors} ->
{:error, notifications, Ash.Error.to_ash_error(errors)}
@ -630,7 +814,6 @@ defmodule Ash.Actions.Create.Bulk do
end
defp run_action_changes(batch, all_changes, _action, actor, authorize?, tracer) do
# TODO: support action.delay_global_validations?
Enum.reduce(
all_changes,
%{must_return_records?: false, batch: batch, changes: %{}, notifications: []},
@ -661,7 +844,7 @@ defmodule Ash.Actions.Create.Bulk do
changes: state.changes
}
{%{change: {module, opts}} = change, change_index}, state ->
{%{change: {module, change_opts}} = change, change_index}, state ->
if Enum.empty?(change.where) && !change.only_when_valid? do
context = %{
actor: actor,
@ -669,7 +852,7 @@ defmodule Ash.Actions.Create.Bulk do
tracer: tracer
}
batch = module.batch_change(batch, opts, context)
batch = batch_change(module, batch, change_opts, context, actor)
must_return_records? =
state.must_return_records? || function_exported?(module, :after_batch, 3)
@ -719,7 +902,7 @@ defmodule Ash.Actions.Create.Bulk do
tracer: tracer
}
matches = module.batch_change(matches, opts, context)
matches = batch_change(module, matches, change_opts, context, actor)
must_return_records? =
state.must_return_records? || function_exported?(module, :after_batch, 3)
@ -740,6 +923,35 @@ defmodule Ash.Actions.Create.Bulk do
)
end
defp batch_change(module, batch, change_opts, context, actor) do
built_change_opts =
Ash.Filter.build_filter_from_template(
change_opts,
actor,
%{},
context
)
# TODO: We should figure out how to remove this requirement
# the basic problem is that if someone writes `set_attribute(:foo, ^arg(:arg))`
# we can't use `.batch_change/3` (because which argument value do we pass in?)
if function_exported?(module, :batch_change, 3) && built_change_opts == change_opts do
module.batch_change(batch, change_opts, context)
else
Stream.map(batch, fn changeset ->
change_opts =
Ash.Filter.build_filter_from_template(
change_opts,
actor,
changeset.arguments,
changeset.context
)
module.change(changeset, change_opts, context)
end)
end
end
defp batch(state) do
{:batch,
%{

View file

@ -371,6 +371,19 @@ defmodule Ash.Api do
doc:
"Wether or not to return all of the records that were inserted. Defaults to false to account for large inserts."
],
return_stream?: [
type: :boolean,
default: false,
doc: """
If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned.
Potential elements:
`{:notification, notification}` - if `return_notifications?` is set to `true`
`{:ok, record}` - if `return_records?` is set to `true`
`{:error, error}` - an error that occurred. May be changeset or an invidual error.
"""
],
stop_on_error?: [
type: :boolean,
default: false,
@ -488,7 +501,7 @@ defmodule Ash.Api do
actor :: term,
opts :: Keyword.t()
) ::
{:ok, boolean | :maybe} | {:error, term}
{:ok, boolean | :maybe} | {:ok, false, Exception.t()} | {:error, term}
def can(api, action_or_query_or_changeset, actor, opts \\ []) do
opts = Keyword.put_new(opts, :maybe_is, :maybe)
opts = Keyword.put_new(opts, :run_queries?, true)
@ -598,7 +611,7 @@ defmodule Ash.Api do
case authorizer.strict_check(authorizer_state, context) do
{:error, %{class: :forbidden} = e} when is_exception(e) ->
{:halt, {false, nil}}
{:halt, {false, e}}
{:error, error} ->
{:halt, {:error, error}}
@ -618,14 +631,22 @@ defmodule Ash.Api do
{:cont, {true, Ash.Query.filter(query, ^filter)}}
{:continue, _} ->
{:halt, {:maybe, nil}}
{:continue, authorizer_state} ->
if opts[:maybe_is] == false do
{:halt, {false, Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)}}
else
{:halt, {:maybe, nil}}
end
{:filter_and_continue, _, _} ->
{:halt, {:maybe, nil}}
{:filter_and_continue, _, authorizer_state} ->
if opts[:maybe_is] == false do
{:halt, {false, Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)}}
else
{:halt, {:maybe, nil}}
end
:forbidden ->
{:halt, {false, nil}}
{:halt, {false, Ash.Authorizer.exception(authorizer, :forbidden, authorizer_state)}}
end
end
)
@ -655,7 +676,15 @@ defmodule Ash.Api do
{:ok, data_layer_query} ->
case Ash.DataLayer.run_query(data_layer_query, query.resource) do
{:ok, results} ->
{:ok, Enum.count(results) == Enum.count(data)}
if Enum.count(results) == Enum.count(data) do
{:ok, true}
else
if opts[:return_forbidden_error?] do
{:ok, false, Ash.Error.Forbidden.exception([])}
else
{:ok, false}
end
end
{:error, error} ->
{:error, error}
@ -687,18 +716,33 @@ defmodule Ash.Api do
{:error, error}
_ ->
{:ok, false}
if opts[:return_forbidden_error?] do
{:ok, false, Ash.Error.Forbidden.exception([])}
else
{:ok, false}
end
end
end
end
%Ash.Changeset{} ->
{:ok, false}
if opts[:return_forbidden_error?] do
{:ok, false, Ash.Error.Forbidden.exception([])}
else
{:ok, false}
end
end
else
{:ok, :maybe}
end
{false, error} ->
if opts[:return_forbidden_error?] do
{:ok, false, error}
else
{:ok, false}
end
{other, _} ->
{:ok, other}
end
@ -1224,6 +1268,11 @@ defmodule Ash.Api do
params :: Keyword.t()
) ::
Ash.BulkResult.t()
| Enumerable.t(
{:ok, Ash.Resource.record()}
| {:error, Ash.Changeset.t() | Ash.Error.t()}
| {:notification, Ash.Notifier.Notification.t()}
)
@doc """
Creates many records, raising on any errors. See `bulk_create/2` for more.

View file

@ -7,7 +7,7 @@ defmodule Ash.BulkResult do
status: :success | :partial_success | :error,
notifications: list(Ash.Notifier.Notification.t()) | nil,
records: list(Ash.Resource.record()) | nil,
errors: list(term) | nil
errors: list(Ash.Error.t() | Ash.Changeset.t()) | nil
}
defstruct [

View file

@ -118,7 +118,7 @@ defmodule Ash.Resource.Change do
| Ash.Notifier.Notification.t()
)
@optional_callbacks after_batch: 3
@optional_callbacks after_batch: 3, batch_change: 3
defmacro __using__(_) do
quote do
@ -126,13 +126,7 @@ defmodule Ash.Resource.Change do
def init(opts), do: {:ok, opts}
def batch_change(changesets, opts, context) do
Stream.map(changesets, fn changeset ->
change(changeset, opts, context)
end)
end
defoverridable init: 1, batch_change: 3
defoverridable init: 1
end
end
end

View file

@ -4,7 +4,9 @@ defmodule Ash.Test.Actions.BulkCreateTest do
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
use Ash.Resource,
data_layer: Ash.DataLayer.Ets,
authorizers: [Ash.Policy.Authorizer]
ets do
private? true
@ -31,6 +33,18 @@ defmodule Ash.Test.Actions.BulkCreateTest do
{:ok, %{result | title: result.title <> "_stuff"}}
end)
end
create :create_with_policy do
argument :authorize?, :boolean, allow_nil?: false
change set_context(%{authorize?: arg(:authorize?)})
end
end
policies do
policy action(:create_with_policy) do
authorize_if context_equals(:authorize?, true)
end
end
attributes do
@ -96,4 +110,143 @@ defmodule Ash.Test.Actions.BulkCreateTest do
sorted?: true
)
end
describe "authorization" do
test "policy success results in successes" do
assert %Ash.BulkResult{records: [%{title: "title1"}, %{title: "title2"}]} =
Api.bulk_create!(
[%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}],
Post,
:create_with_policy,
authorize?: true,
return_records?: true,
sorted?: true
)
end
test "policy failure results in failures" do
assert %Ash.BulkResult{errors: [_, _]} =
Api.bulk_create!(
[%{title: "title1", authorize?: false}, %{title: "title2", authorize?: false}],
Post,
:create_with_policy,
authorize?: true,
return_records?: true,
sorted?: true
)
end
end
describe "streaming" do
test "by default nothing is returned in the stream" do
assert [] =
[%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
|> Api.bulk_create!(
Post,
:create_with_policy,
authorize?: true,
return_stream?: true
)
|> Enum.to_list()
end
test "by returning notifications, you get the notifications in the stream" do
assert [{:notification, _}, {:notification, _}] =
[%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
|> Api.bulk_create!(
Post,
:create_with_policy,
authorize?: true,
return_stream?: true,
notify?: true,
return_notifications?: true
)
|> Enum.to_list()
end
test "by returning records, you get the records in the stream" do
assert [{:ok, %{title: "title1"}}, {:ok, %{title: "title2"}}] =
[%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
|> Api.bulk_create!(
Post,
:create_with_policy,
authorize?: true,
return_stream?: true,
return_records?: true
)
|> Enum.to_list()
|> Enum.sort_by(fn
{:ok, v} ->
v.title
_ ->
nil
end)
end
test "by returning notifications and records, you get them both in the stream" do
assert [
{:notification, _},
{:notification, _},
{:ok, %{title: "title1"}},
{:ok, %{title: "title2"}}
] =
[%{title: "title1", authorize?: true}, %{title: "title2", authorize?: true}]
|> Api.bulk_create!(
Post,
:create_with_policy,
authorize?: true,
notify?: true,
return_stream?: true,
return_notifications?: true,
return_records?: true
)
|> Enum.to_list()
|> Enum.sort_by(fn
{:ok, v} ->
v.title
{:notification, _} ->
true
_ ->
nil
end)
end
test "any errors are also returned in the stream" do
assert [
{:error, %Ash.Changeset{}},
{:notification, _},
{:ok, %{title: "title1"}}
] =
[
%{title: "title1", authorize?: true},
%{title: "title2", authorize?: false}
]
|> Api.bulk_create!(
Post,
:create_with_policy,
authorize?: true,
notify?: true,
return_stream?: true,
return_notifications?: true,
return_records?: true
)
|> Enum.to_list()
|> Enum.sort_by(fn
{:ok, v} ->
v.title
{:notification, _} ->
true
{:error, _} ->
false
_ ->
nil
end)
end
end
end