improvement: do not perform atomic upgrade on destroy actions

fix: correct atomic implementation of `present` validation
fix: track keys that are set to `nil` in changesets, for use in atomic upgrade
This commit is contained in:
Zach Daniel 2024-03-05 00:01:02 -05:00
parent 9b88628b07
commit f19fa6c6c0
4 changed files with 118 additions and 194 deletions

View file

@ -24,185 +24,54 @@ defmodule Ash.Actions.Destroy do
end
def run(api, changeset, action, opts) do
primary_read = Ash.Resource.Info.primary_action(changeset.resource, :read)
{changeset, opts} = Ash.Actions.Helpers.add_process_context(api, changeset, opts)
{fully_atomic_changeset, params} =
cond do
!Ash.DataLayer.data_layer_can?(changeset.resource, :expr_error) && opts[:authorize?] ->
{{:not_atomic, "data layer does not support adding errors to a query"}, nil}
Ash.Tracer.span :action,
Ash.Api.Info.span_name(
api,
changeset.resource,
action.name
),
opts[:tracer] do
metadata = %{
api: api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: opts[:actor],
tenant: opts[:tenant],
action: action.name,
authorize?: opts[:authorize?]
}
!Ash.DataLayer.data_layer_can?(changeset.resource, :destroy_query) ->
{{:not_atomic, "data layer does not support updating a query"}, nil}
Ash.Tracer.set_metadata(opts[:tracer], :action, metadata)
!primary_read ->
{{:not_atomic, "cannot atomically destroy a record without a primary read action"}, nil}
Ash.Tracer.telemetry_span [:ash, Ash.Api.Info.short_name(api), :destroy], metadata do
case do_run(api, changeset, action, opts) do
{:error, error} ->
if opts[:tracer] do
stacktrace =
case error do
%{stacktrace: %{stacktrace: stacktrace}} ->
stacktrace || []
true ->
params =
changeset.attributes
|> Map.merge(changeset.casted_attributes)
|> Map.merge(changeset.arguments)
|> Map.merge(changeset.casted_arguments)
_ ->
{:current_stacktrace, stacktrace} =
Process.info(self(), :current_stacktrace)
res =
Ash.Changeset.fully_atomic_changeset(
changeset.resource,
action,
params,
opts
|> Keyword.merge(
assume_casted?: true,
notify?: true,
atomics: changeset.atomics || [],
tenant: changeset.tenant
stacktrace
end
Ash.Tracer.set_handled_error(opts[:tracer], Ash.Error.to_error_class(error),
stacktrace: stacktrace
)
)
end
{res, params}
{:error, error}
other ->
other
end
end
case fully_atomic_changeset do
%Ash.Changeset{} = atomic_changeset ->
atomic_changeset =
%{atomic_changeset | data: changeset.data}
|> Ash.Changeset.set_context(%{data_layer: %{use_atomic_destroy_data?: true}})
|> Map.put(:load, changeset.load)
|> Map.put(:select, changeset.select)
|> Ash.Changeset.set_context(changeset.context)
{atomic_changeset, opts} =
Ash.Actions.Helpers.add_process_context(api, atomic_changeset, opts)
opts =
Keyword.merge(opts,
atomic_changeset: atomic_changeset,
return_records?: true,
notify?: true,
return_notifications?: opts[:return_notifications?],
return_errors?: true
)
primary_key = Ash.Resource.Info.primary_key(atomic_changeset.resource)
primary_key_filter = changeset.data |> Map.take(primary_key) |> Map.to_list()
query =
atomic_changeset.resource
|> Ash.Query.for_read(primary_read.name, %{},
actor: opts[:actor],
authorize?: false,
context: atomic_changeset.context,
tenant: atomic_changeset.tenant,
tracer: opts[:tracer]
)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.Query.do_filter(primary_key_filter)
case Ash.Actions.Destroy.Bulk.run(
api,
query,
fully_atomic_changeset.action,
params,
Keyword.merge(opts,
strategy: [:atomic],
authorize_query?: false,
atomic_changeset: atomic_changeset,
authorize_changeset_with: :error,
return_records?: true
)
) do
%Ash.BulkResult{status: :success, records: [record], notifications: notifications} ->
if opts[:return_notifications?] do
if opts[:return_destroyed?] do
{:ok, record, List.wrap(notifications)}
else
{:ok, List.wrap(notifications)}
end
else
if opts[:return_destroyed?] do
{:ok, record}
else
:ok
end
end
%Ash.BulkResult{status: :success, records: []} ->
primary_key = Ash.Resource.Info.primary_key(atomic_changeset.resource)
{:error,
Ash.Error.to_error_class(
Ash.Error.Changes.StaleRecord.exception(
resource: fully_atomic_changeset.resource,
filters: Map.take(changeset.data, primary_key)
)
)}
%Ash.BulkResult{status: :error, errors: errors} ->
{:error, Ash.Error.to_error_class(errors)}
end
other ->
if Ash.DataLayer.data_layer_can?(changeset.resource, :destroy_query) &&
action.require_atomic? &&
match?({:not_atomic, _reason}, other) do
{:not_atomic, reason} = other
{:error,
Ash.Error.Framework.MustBeAtomic.exception(
resource: changeset.resource,
action: action.name,
reason: reason
)}
else
{changeset, opts} = Ash.Actions.Helpers.add_process_context(api, changeset, opts)
Ash.Tracer.span :action,
Ash.Api.Info.span_name(
api,
changeset.resource,
action.name
),
opts[:tracer] do
metadata = %{
api: api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: opts[:actor],
tenant: opts[:tenant],
action: action.name,
authorize?: opts[:authorize?]
}
Ash.Tracer.set_metadata(opts[:tracer], :action, metadata)
Ash.Tracer.telemetry_span [:ash, Ash.Api.Info.short_name(api), :destroy], metadata do
case do_run(api, changeset, action, opts) do
{:error, error} ->
if opts[:tracer] do
stacktrace =
case error do
%{stacktrace: %{stacktrace: stacktrace}} ->
stacktrace || []
_ ->
{:current_stacktrace, stacktrace} =
Process.info(self(), :current_stacktrace)
stacktrace
end
Ash.Tracer.set_handled_error(opts[:tracer], Ash.Error.to_error_class(error),
stacktrace: stacktrace
)
end
{:error, error}
other ->
other
end
end
end
end
end
rescue
e ->

View file

@ -37,6 +37,20 @@ defmodule Ash.Actions.Update do
!Enum.empty?(changeset.relationships) ->
{{:not_atomic, "cannot atomically manage relationships"}, nil}
!Enum.empty?(changeset.before_action) ->
{{:not_atomic, "cannot atomically run a changeset with a before_action hook"}, nil}
!Enum.empty?(changeset.before_transaction) ->
{{:not_atomic, "cannot atomically run a changeset with a before_transaction hook"},
nil}
!Enum.empty?(changeset.around_action) ->
{{:not_atomic, "cannot atomically run a changeset with an around_action hook"}, nil}
!Enum.empty?(changeset.around_transaction) ->
{{:not_atomic, "cannot atomically run a changeset with an around_transaction hook"},
nil}
!primary_read ->
{{:not_atomic, "cannot atomically update a record without a primary read action"},
nil}
@ -48,6 +62,9 @@ defmodule Ash.Actions.Update do
|> Map.merge(changeset.arguments)
|> Map.merge(changeset.casted_arguments)
params =
Enum.reduce(changeset.nil_inputs, params, &Map.put(&2, &1, nil))
res =
Ash.Changeset.fully_atomic_changeset(
changeset.resource,

View file

@ -42,6 +42,7 @@ defmodule Ash.Changeset do
phase: :validate,
relationships: %{},
select: nil,
nil_inputs: [],
load: [],
valid?: true
]
@ -4217,15 +4218,25 @@ defmodule Ash.Changeset do
%{
changeset
| attributes: Map.delete(changeset.attributes, attribute.name),
nil_inputs: [attribute.name | changeset.nil_inputs],
defaults: changeset.defaults -- [attribute.name]
}
Ash.Type.equal?(attribute.type, casted, data_value) ->
%{
changeset
| attributes: Map.delete(changeset.attributes, attribute.name),
defaults: changeset.defaults -- [attribute.name]
}
if is_nil(casted) do
%{
changeset
| attributes: Map.delete(changeset.attributes, attribute.name),
defaults: changeset.defaults -- [attribute.name],
nil_inputs: [attribute.name | changeset.nil_inputs]
}
else
%{
changeset
| attributes: Map.delete(changeset.attributes, attribute.name),
defaults: changeset.defaults -- [attribute.name]
}
end
true ->
%{

View file

@ -92,42 +92,69 @@ defmodule Ash.Resource.Validation.Present do
|> Keyword.delete(:attributes)
|> Enum.map(fn
{:exactly, exactly} ->
attribute_count = length(opts[:attributes])
message =
cond do
exactly == 0 -> "must be absent"
length(opts[:attributes]) == 1 -> "must be present"
attribute_count == 1 -> "must be present"
true -> "exactly %{exactly} of %{keys} must be present"
end
{:atomic, [opts[:attribute]], expr(^nil_count == ^exactly),
expr(
error(^InvalidAttribute, %{
field: ^opts[:attribute],
value: ^atomic_ref(opts[:attribute]),
message: ^message,
vars: %{exactly: ^exactly, keys: ^values}
})
)}
if attribute_count == 1 do
attribute = Enum.at(opts[:attributes], 0)
condition =
if exactly == 0 do
expr(not is_nil(^atomic_ref(attribute)))
else
expr(is_nil(^atomic_ref(attribute)))
end
{:atomic, opts[:attributes], condition,
expr(
error(^InvalidAttribute, %{
field: ^Enum.at(opts[:attributes], 0),
value: ^atomic_ref(Enum.at(opts[:attributes], 0)),
message: ^message,
vars: %{exactly: ^exactly, keys: ^Enum.join(opts[:attributes], ", ")}
})
)}
else
{:atomic, opts[:attributes], expr(^nil_count == ^exactly),
expr(
error(^InvalidAttribute, %{
field: ^Enum.at(opts[:attributes], 0),
value: ^atomic_ref(Enum.at(opts[:attributes], 0)),
message: ^message,
vars: %{exactly: ^exactly, keys: ^Enum.join(opts[:attributes], ", ")}
})
)}
end
{:at_least, at_least} ->
{:atomic, [opts[:attribute]], expr(count_nils(^atomic_ref(opts[:attribute])) < ^at_least),
attributes = Enum.map(opts[:attributes], fn attr -> expr(^atomic_ref(attr)) end)
{:atomic, opts[:attributes], expr(count_nils(^attributes) < ^at_least),
expr(
error(^InvalidAttribute, %{
field: ^opts[:attribute],
value: ^atomic_ref(opts[:attribute]),
field: ^Enum.at(opts[:attributes], 0),
value: ^atomic_ref(Enum.at(opts[:attributes], 0)),
message: "at least %{at_least} of %{keys} must be present",
vars: %{at_least: ^at_least, keys: ^values}
vars: %{at_least: ^at_least, keys: ^Enum.join(opts[:attributes], ", ")}
})
)}
{:at_most, at_most} ->
{:atomic, [opts[:attribute]], expr(count_nils(^atomic_ref(opts[:attribute])) > ^at_most),
attributes = Enum.map(opts[:attributes], fn attr -> expr(^atomic_ref(attr)) end)
{:atomic, opts[:attributes], expr(count_nils(^attributes) > ^at_most),
expr(
error(^InvalidAttribute, %{
field: ^opts[:attribute],
value: ^atomic_ref(opts[:attribute]),
field: ^Enum.at(opts[:attributes], 0),
value: ^atomic_ref(Enum.at(opts[:attributes], 0)),
message: "at most %{at_most} of %{keys} must be present",
vars: %{at_most: ^at_most, keys: ^values}
vars: %{at_most: ^at_most, keys: ^Enum.join(opts[:attributes], ", ")}
})
)}
end)