improvement: add before_transaction and after_transaction

This commit is contained in:
Zach Daniel 2023-02-10 14:12:19 -05:00
parent 97799dd856
commit 85a66b1d85
7 changed files with 621 additions and 319 deletions

View file

@ -200,9 +200,14 @@ All of these actions are run in a transaction if the data layer supports it. You
- Authorization is performed on the changes
- A before action hook is added to set up belongs_to relationships that are managed. This means potentially creating/modifying the destination of the relationship, and then changing the `destination_attribute` of the relationship.
- Before transaction hooks are called (`Ash.Changeset.before_transaction/2`)
- A transaction is opened if the action is configured for it (by default they are) and the data layer supports transactions
- Before action hooks are performed in reverse order they were added. (unless `append?` option was used)
- For manual actions, a before action hook must have set
- After action hooks are performed in the order they were added (unless `prepend?` option was used)
- For [Manual Actions](/documentation/topics/manual-actions.md), one of these after action hooks must have returned a result, otherwise an error is returned.
- Non-belongs-to relationships are managed, creating/updating/destroying related records.
- A transaction is opened if the action is configured for it (by default they are) and the data layer supports transactions
- If an `after_action` option was passed when running the action, it is run with the changeset and the result. Only supported for create & update actions.
- The transaction is closed, if one was opened
- After transaction hooks are invoked with the result of the transaction (even if it was an error)

View file

@ -75,6 +75,13 @@ defmodule Ash.Actions.Create do
verbose? = opts[:verbose?]
resource = changeset.resource
engine_timeout =
if Keyword.get(opts, :transaction?, true) && action.transaction? do
nil
else
opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api)
end
[]
|> as_requests(resource, api, action,
changeset: changeset,
@ -89,23 +96,16 @@ defmodule Ash.Actions.Create do
after_action: opts[:after_action]
)
|> Ash.Engine.run(
transaction_reason: %{
type: :create,
metadata: %{
resource: resource,
action: action.name
}
},
transaction?: false,
resource: resource,
verbose?: verbose?,
name: "#{inspect(resource)}.#{action.name}",
actor: actor,
timeout: engine_timeout,
tracer: opts[:tracer],
authorize?: authorize?,
notification_metadata: opts[:notification_metadata],
timeout: opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api),
return_notifications?: opts[:return_notifications?],
transaction?: Keyword.get(opts, :transaction?, true)
return_notifications?: opts[:return_notifications?]
)
|> case do
{:ok, %{data: %{commit: %^resource{} = created}} = engine_result} ->
@ -270,6 +270,7 @@ defmodule Ash.Actions.Create do
end
end),
action: action,
async?: !(Keyword.get(request_opts, :transaction?, true) && action.transaction?),
authorize?: true,
data: nil,
path: path ++ [:data],
@ -280,6 +281,7 @@ defmodule Ash.Actions.Create do
Request.new(
api: api,
resource: resource,
async?: !(Keyword.get(request_opts, :transaction?, true) && action.transaction?),
error_path: error_path,
changeset:
Request.resolve([path ++ [:data, :changeset]], fn data ->
@ -314,133 +316,147 @@ defmodule Ash.Actions.Create do
result =
changeset
|> Ash.Changeset.with_hooks(fn changeset ->
case Ash.Actions.ManagedRelationships.setup_managed_belongs_to_relationships(
changeset,
actor,
authorize?: authorize?,
actor: actor
) do
{:error, error} ->
{:error, error}
|> Ash.Changeset.with_hooks(
fn changeset ->
case Ash.Actions.ManagedRelationships.setup_managed_belongs_to_relationships(
changeset,
actor,
authorize?: authorize?,
actor: actor
) do
{:error, error} ->
{:error, error}
{changeset, manage_instructions} ->
changeset =
Ash.Changeset.require_values(
changeset,
:create
)
|> Ash.Changeset.require_values(
:update,
false,
action.require_attributes
)
{changeset, manage_instructions} ->
changeset =
Ash.Changeset.require_values(
changeset,
:create
)
|> Ash.Changeset.require_values(
:update,
false,
action.require_attributes
)
if changeset.valid? do
cond do
action.manual ->
{mod, opts} = action.manual
if changeset.valid? do
cond do
action.manual ->
{mod, opts} = action.manual
if result = changeset.context[:private][:action_result] do
result
else
mod.create(changeset, opts, %{
if result = changeset.context[:private][:action_result] do
result
else
mod.create(changeset, opts, %{
actor: actor,
tenant: changeset.tenant,
authorize?: authorize?,
api: changeset.api
})
end
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
tenant: changeset.tenant,
authorize?: authorize?,
api: changeset.api
})
end
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?,
upsert?: upsert?
)
action.manual? ->
{:ok, nil}
true ->
belongs_to_attrs =
changeset.resource
|> Ash.Resource.Info.relationships()
|> Enum.filter(&(&1.type == :belongs_to))
|> Enum.map(& &1.source_attribute)
final_check =
changeset.resource
|> Ash.Resource.Info.attributes()
|> Enum.reject(
&(&1.allow_nil? || &1.generated? || &1.name in belongs_to_attrs)
upsert?: upsert?
)
changeset =
changeset
|> Ash.Changeset.require_values(
:create,
true,
final_check
)
action.manual? ->
{:ok, nil}
{changeset, _} =
Ash.Actions.ManagedRelationships.validate_required_belongs_to(
{changeset, []},
false
)
true ->
belongs_to_attrs =
changeset.resource
|> Ash.Resource.Info.relationships()
|> Enum.filter(&(&1.type == :belongs_to))
|> Enum.map(& &1.source_attribute)
if changeset.valid? do
cond do
result = changeset.context[:private][:action_result] ->
result
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?,
upsert?: upsert?
)
final_check =
changeset.resource
|> Ash.Resource.Info.attributes()
|> Enum.reject(
&(&1.allow_nil? || &1.generated? || &1.name in belongs_to_attrs)
)
upsert? ->
resource
|> Ash.DataLayer.upsert(changeset, upsert_keys)
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?,
upsert?: upsert?
)
changeset =
changeset
|> Ash.Changeset.require_values(
:create,
true,
final_check
)
true ->
resource
|> Ash.DataLayer.create(changeset)
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?,
upsert?: upsert?
)
{changeset, _} =
Ash.Actions.ManagedRelationships.validate_required_belongs_to(
{changeset, []},
false
)
if changeset.valid? do
cond do
result = changeset.context[:private][:action_result] ->
result
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?,
upsert?: upsert?
)
upsert? ->
resource
|> Ash.DataLayer.upsert(changeset, upsert_keys)
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?,
upsert?: upsert?
)
true ->
resource
|> Ash.DataLayer.create(changeset)
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?,
upsert?: upsert?
)
end
|> case do
{:ok, result, instructions} ->
{:ok, result,
instructions
|> Map.update!(
:notifications,
&(&1 ++ manage_instructions.notifications)
)}
{:error, error} ->
{:error, Ash.Changeset.add_error(changeset, error)}
end
else
{:error, changeset}
end
|> case do
{:ok, result, instructions} ->
{:ok, result,
instructions
|> Map.update!(
:notifications,
&(&1 ++ manage_instructions.notifications)
)}
{:error, error} ->
{:error, Ash.Changeset.add_error(changeset, error)}
end
else
{:error, changeset}
end
end
else
{:error, changeset}
end
else
{:error, changeset}
end
end
end)
end
end,
transaction?:
Keyword.get(request_opts, :transaction?, true) && action.transaction?,
timeout: request_opts[:timeout],
tracer: request_opts[:tracer],
transaction_metadata: %{
type: :create,
metadata: %{
resource: resource,
action: action.name,
actor: actor
}
}
)
case result do
{:ok, nil, _changeset, _instructions} ->

View file

@ -81,24 +81,24 @@ defmodule Ash.Actions.Destroy do
changeset
end
engine_timeout =
if Keyword.get(opts, :transaction?, true) && action.transaction? do
nil
else
opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api)
end
[]
|> as_requests(resource, api, action,
changeset: changeset,
authorize?: authorize?,
actor: actor,
timeout: opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api),
tracer: opts[:tracer],
timeout: opts[:timeout],
tenant: opts[:tenant]
)
|> Ash.Engine.run(
transaction_reason: %{
type: :destroy,
metadata: %{
record: changeset.data,
resource: resource,
action: action.name
}
},
resource: resource,
verbose?: verbose?,
actor: actor,
@ -107,8 +107,8 @@ defmodule Ash.Actions.Destroy do
return_notifications?: opts[:return_notifications?],
notification_metadata: opts[:notification_metadata],
authorize?: authorize?,
timeout: opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api),
transaction?: true
timeout: engine_timeout,
transaction?: false
)
|> case do
{:ok, %{data: data} = engine_result} ->
@ -166,6 +166,7 @@ defmodule Ash.Actions.Destroy do
path: path ++ [:data],
action: action,
error_path: error_path,
async?: !(Keyword.get(request_opts, :transaction?, true) && action.transaction?),
changeset:
Request.resolve(changeset_dependencies, fn %{actor: actor, authorize?: authorize?} =
context ->
@ -264,6 +265,7 @@ defmodule Ash.Actions.Destroy do
action: action,
authorize?: false,
error_path: error_path,
async?: !(Keyword.get(request_opts, :transaction?, true) && action.transaction?),
changeset:
Request.resolve([path ++ [:data, :changeset]], fn context ->
{:ok, get_in(context, path ++ [:data, :changeset])}
@ -280,47 +282,60 @@ defmodule Ash.Actions.Destroy do
changeset
|> Ash.Changeset.put_context(:private, %{actor: actor, authorize?: authorize?})
|> Ash.Changeset.with_hooks(fn
%{valid?: false} = changeset ->
{:error, changeset}
|> Ash.Changeset.with_hooks(
fn
%{valid?: false} = changeset ->
{:error, changeset}
changeset ->
cond do
action.manual ->
{mod, opts} = action.manual
changeset ->
cond do
action.manual ->
{mod, opts} = action.manual
if result = changeset.context[:private][:action_result] do
result
else
mod.destroy(changeset, opts, %{
actor: actor,
tenant: changeset.tenant,
authorize?: authorize?,
api: changeset.api
})
end
action.manual? ->
{:ok, record}
true ->
if result = changeset.context[:private][:action_result] do
result
else
case Ash.DataLayer.destroy(resource, changeset) do
:ok ->
{:ok,
Ash.Resource.set_meta(record, %Ecto.Schema.Metadata{
state: :deleted,
schema: resource
})}
{:error, error} ->
{:error, Ash.Changeset.add_error(changeset, error)}
if result = changeset.context[:private][:action_result] do
result
else
mod.destroy(changeset, opts, %{
actor: actor,
tenant: changeset.tenant,
authorize?: authorize?,
api: changeset.api
})
end
end
end
end)
action.manual? ->
{:ok, record}
true ->
if result = changeset.context[:private][:action_result] do
result
else
case Ash.DataLayer.destroy(resource, changeset) do
:ok ->
{:ok,
Ash.Resource.set_meta(record, %Ecto.Schema.Metadata{
state: :deleted,
schema: resource
})}
{:error, error} ->
{:error, Ash.Changeset.add_error(changeset, error)}
end
end
end
end,
transaction?:
Keyword.get(request_opts, :transaction?, true) && action.transaction?,
timeout: request_opts[:timeout],
transaction_metadata: %{
type: :destroy,
metadata: %{
record: changeset.data,
resource: resource,
action: action.name
}
}
)
|> case do
{:ok, result, changeset, instructions} ->
instructions =

View file

@ -61,25 +61,24 @@ defmodule Ash.Actions.Update do
after_action = opts[:after_action]
resource = changeset.resource
engine_timeout =
if Keyword.get(opts, :transaction?, true) && action.transaction? do
nil
else
opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api)
end
[]
|> as_requests(resource, api, action,
changeset: changeset,
authorize?: authorize?,
actor: actor,
timeout: opts[:timeout],
timeout: opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api),
tracer: opts[:tracer],
after_action: after_action,
tenant: opts[:tenant]
)
|> Ash.Engine.run(
transaction_reason: %{
type: :update,
metadata: %{
record: changeset.data,
resource: resource,
action: action.name
}
},
resource: resource,
verbose?: verbose?,
actor: actor,
@ -88,9 +87,9 @@ defmodule Ash.Actions.Update do
notification_metadata: opts[:notification_metadata],
return_notifications?: opts[:return_notifications?],
authorize?: authorize?,
timeout: opts[:timeout] || changeset.timeout || Ash.Api.Info.timeout(api),
timeout: engine_timeout,
default_timeout: Ash.Api.Info.timeout(api),
transaction?: Keyword.get(opts, :transaction?, true)
transaction?: false
)
|> case do
{:ok, %{data: %{commit: %^resource{} = updated}} = engine_result} ->
@ -313,6 +312,7 @@ defmodule Ash.Actions.Update do
end
end),
authorize?: true,
async?: !(Keyword.get(request_opts, :transaction?, true) && action.transaction?),
path: path ++ [:data],
name: "prepare #{inspect(resource)}.#{action.name}"
)
@ -329,6 +329,7 @@ defmodule Ash.Actions.Update do
notify?: true,
error_path: error_path,
authorize?: false,
async?: !(Keyword.get(request_opts, :transaction?, true) && action.transaction?),
data:
Request.resolve(
[path ++ [:data, :changeset]],
@ -348,113 +349,126 @@ defmodule Ash.Actions.Update do
actor: actor
)
)
|> Ash.Changeset.with_hooks(fn changeset ->
case Ash.Actions.ManagedRelationships.setup_managed_belongs_to_relationships(
changeset,
actor,
actor: actor,
authorize?: authorize?
) do
{:error, error} ->
{:error, error}
|> Ash.Changeset.with_hooks(
fn changeset ->
case Ash.Actions.ManagedRelationships.setup_managed_belongs_to_relationships(
changeset,
actor,
actor: actor,
authorize?: authorize?
) do
{:error, error} ->
{:error, error}
{changeset, manage_instructions} ->
changeset =
changeset
|> Ash.Changeset.require_values(
:update,
true
)
|> Ash.Changeset.require_values(
:update,
false,
action.require_attributes
)
{changeset, manage_instructions} ->
changeset =
changeset
|> Ash.Changeset.require_values(
:update,
true
)
|> Ash.Changeset.require_values(
:update,
false,
action.require_attributes
)
changeset = set_tenant(changeset)
changeset = set_tenant(changeset)
if changeset.valid? do
cond do
action.manual ->
{mod, opts} = action.manual
if changeset.valid? do
cond do
action.manual ->
{mod, opts} = action.manual
if result = changeset.context[:private][:action_result] do
result
else
mod.update(changeset, opts, %{
actor: actor,
tenant: changeset.tenant,
authorize?: authorize?,
api: changeset.api
})
end
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?
)
action.manual? ->
{:ok, changeset.data, %{notifications: []}}
true ->
cond do
result = changeset.context[:private][:action_result] ->
if result = changeset.context[:private][:action_result] do
result
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
else
mod.update(changeset, opts, %{
actor: actor,
authorize?: authorize?
)
tenant: changeset.tenant,
authorize?: authorize?,
api: changeset.api
})
end
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?
)
Ash.Changeset.changing_attributes?(changeset) ->
changeset =
changeset
|> Ash.Changeset.set_defaults(:update, true)
|> Ash.Changeset.put_context(:changed?, true)
action.manual? ->
{:ok, changeset.data, %{notifications: []}}
resource
|> Ash.DataLayer.update(changeset)
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?
)
true ->
cond do
result = changeset.context[:private][:action_result] ->
result
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?
)
true ->
changeset =
Ash.Changeset.put_context(changeset, :changed?, false)
Ash.Changeset.changing_attributes?(changeset) ->
changeset =
changeset
|> Ash.Changeset.set_defaults(:update, true)
|> Ash.Changeset.put_context(:changed?, true)
{:ok, changeset.data}
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?
)
end
resource
|> Ash.DataLayer.update(changeset)
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?
)
true ->
changeset =
Ash.Changeset.put_context(changeset, :changed?, false)
{:ok, changeset.data}
|> add_tenant(changeset)
|> manage_relationships(api, changeset,
actor: actor,
authorize?: authorize?
)
end
end
|> case do
{:ok, result} ->
{:ok, result,
%{
notifications: manage_instructions.notifications
}}
{:ok, result, notifications} ->
{:ok, result,
Map.update!(
notifications,
:notifications,
&(&1 ++ manage_instructions.notifications)
)}
{:error, error} ->
{:error, Ash.Changeset.add_error(changeset, error)}
end
else
{:error, changeset}
end
|> case do
{:ok, result} ->
{:ok, result,
%{
notifications: manage_instructions.notifications
}}
{:ok, result, notifications} ->
{:ok, result,
Map.update!(
notifications,
:notifications,
&(&1 ++ manage_instructions.notifications)
)}
{:error, error} ->
{:error, Ash.Changeset.add_error(changeset, error)}
end
else
{:error, changeset}
end
end
end)
end
end,
transaction?:
Keyword.get(request_opts, :transaction?, true) && action.transaction?,
timeout: request_opts[:timeout],
transaction_metadata: %{
type: :update,
metadata: %{
record: changeset.data,
resource: resource,
action: action.name
}
}
)
case result do
{:ok, updated, changeset, instructions} ->

View file

@ -26,6 +26,8 @@ defmodule Ash.Changeset do
arguments: %{},
context: %{},
defaults: [],
before_transaction: [],
after_transaction: [],
after_action: [],
around_action: [],
before_action: [],
@ -134,8 +136,7 @@ defmodule Ash.Changeset do
Changes.NoSuchAttribute,
Changes.NoSuchRelationship,
Changes.Required,
Invalid.NoSuchResource,
Invalid.TimeoutNotSupported
Invalid.NoSuchResource
}
require Ash.Tracer
@ -1502,21 +1503,223 @@ defmodule Ash.Changeset do
t(),
(t() ->
{:ok, term, %{notifications: list(Ash.Notifier.Notification.t())}}
| {:error, term})
| {:error, term}),
Keyword.t()
) ::
{:ok, term, t(), %{notifications: list(Ash.Notifier.Notification.t())}} | {:error, term}
def with_hooks(%{valid?: false} = changeset, _func) do
def with_hooks(changeset, func, opts \\ [])
def with_hooks(%{valid?: false} = changeset, _func, _opts) do
{:error, changeset.errors}
end
def with_hooks(changeset, func) do
def with_hooks(changeset, func, opts) do
if changeset.valid? do
run_around_actions(changeset, func)
if opts[:transaction?] && Ash.DataLayer.data_layer_can?(changeset.resource, :transact) do
transaction_hooks(changeset, fn changeset ->
resources =
changeset.resource
|> List.wrap()
|> Enum.concat(changeset.action.touches_resources)
|> Enum.uniq()
Process.put(
:ash_after_transaction,
Process.get(:ash_after_transaction, []) ++ changeset.after_transaction
)
resources
|> Enum.reject(&Ash.DataLayer.in_transaction?/1)
|> Ash.DataLayer.transaction(
fn ->
case run_around_actions(changeset, func) do
{:error, error} ->
Ash.DataLayer.rollback(changeset.resource, error)
other ->
other
end
end,
changeset.timeout || :infinity,
opts[:transaction_metadata]
)
|> case do
{:ok, result} ->
result
{:error, error} ->
{:error, error}
end
end)
else
transaction_hooks(changeset, fn changeset ->
if changeset.timeout do
Ash.Engine.task_with_timeout(
fn ->
run_around_actions(changeset, func)
end,
changeset.resource,
changeset.timeout,
"#{inspect(changeset.resource)}.#{changeset.action.name}",
opts[:tracer]
)
else
run_around_actions(changeset, func)
end
end)
end
else
{:error, changeset.errors}
end
end
defp transaction_hooks(changeset, func) do
changeset =
Enum.reduce_while(
changeset.before_transaction,
changeset,
fn before_transaction, changeset ->
metadata = %{
api: changeset.api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: changeset.context[:private][:actor],
tenant: changeset.context[:private][:actor],
action: changeset.action && changeset.action.name,
authorize?: changeset.context[:private][:authorize?]
}
tracer = changeset.context[:private][:tracer]
result =
Ash.Tracer.span :before_action,
"before_action",
tracer do
Ash.Tracer.set_metadata(tracer, :before_transaction, metadata)
Ash.Tracer.telemetry_span [:ash, :before_transaction], metadata do
before_transaction.(changeset)
end
end
case result do
{:error, error} ->
{:halt, {:error, error}}
changeset ->
cont =
if changeset.valid? do
:cont
else
:halt
end
{cont, changeset}
end
end
)
result =
try do
func.(changeset)
rescue
exception ->
{:raise, exception, __STACKTRACE__}
catch
:exit, reason ->
{:exit, reason}
end
case result do
{:exit, reason} ->
error = Ash.Error.to_ash_error(reason)
case run_after_transactions({:error, error}, changeset) do
{:ok, result} ->
{:ok, result, %{}}
{:error, new_error} when new_error == error ->
exit(reason)
{:error, new_error} ->
exit(new_error)
end
{:raise, exception, stacktrace} ->
case run_after_transactions({:error, exception}, changeset) do
{:ok, result} ->
{:ok, result, changeset, %{}}
{:error, error} ->
reraise error, stacktrace
end
{:ok, result, changeset, notifications} ->
case run_after_transactions({:ok, result}, changeset) do
{:ok, result} ->
{:ok, result, changeset, notifications}
{:error, error} ->
{:error, error}
end
{:ok, result, notifications} ->
case run_after_transactions({:ok, result}, changeset) do
{:ok, result} ->
{:ok, result, changeset, notifications}
{:error, error} ->
{:error, error}
end
{:error, error} ->
case run_after_transactions({:error, error}, changeset) do
{:ok, result} ->
{:ok, result, changeset, %{}}
{:error, error} ->
{:error, error}
end
end
end
defp run_after_transactions(result, changeset) do
changeset.after_transaction
|> Enum.reduce(
result,
fn after_transaction, result ->
tracer = changeset.context[:private][:tracer]
metadata = %{
api: changeset.api,
resource: changeset.resource,
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
actor: changeset.context[:private][:actor],
tenant: changeset.context[:private][:actor],
action: changeset.action && changeset.action.name,
authorize?: changeset.context[:private][:authorize?]
}
Ash.Tracer.span :after_transaction,
"after_transaction",
tracer do
Ash.Tracer.set_metadata(tracer, :after_transaction, metadata)
Ash.Tracer.telemetry_span [:ash, :after_transaction], metadata do
after_transaction.(changeset, result)
end
end
end
)
|> case do
{:ok, new_result} ->
{:ok, new_result}
{:error, error} ->
{:error, error}
end
end
defp run_around_actions(%{around_action: []} = changeset, func) do
changeset = put_context(changeset, :private, %{in_before_action?: true})
@ -1755,11 +1958,7 @@ defmodule Ash.Changeset do
@spec timeout(t(), nil | pos_integer, nil | pos_integer) :: t()
def timeout(changeset, timeout, default \\ nil) do
if Ash.DataLayer.data_layer_can?(changeset.resource, :timeout) || is_nil(timeout) do
%{changeset | timeout: timeout || default}
else
add_error(changeset, TimeoutNotSupported.exception(resource: changeset.resource))
end
%{changeset | timeout: timeout || default}
end
@doc """
@ -3033,8 +3232,26 @@ defmodule Ash.Changeset do
end
@doc """
Adds an after_action hook to the changeset.
Adds a before_transaction hook to the changeset.
Provide the option `append?: true` to place the hook after all
other hooks instead of before.
"""
@spec before_transaction(
t(),
(t() -> t()),
Keyword.t()
) :: t()
def before_transaction(changeset, func, opts \\ []) do
if opts[:append?] do
%{changeset | before_transaction: changeset.before_transaction ++ [func]}
else
%{changeset | before_transaction: [func | changeset.before_transaction]}
end
end
@doc """
Adds an after_action hook to the changeset.
Provide the option `prepend?: true` to place the hook before all
other hooks instead of after.
@ -3055,6 +3272,30 @@ defmodule Ash.Changeset do
end
end
@doc """
Adds an after_transaction hook to the changeset.
`after_transaction` hooks differ from `after_action` hooks in that they are run
on success *and* failure of the action or some previous hook.
Provide the option `prepend?: true` to place the hook before all
other hooks instead of after.
"""
@spec after_transaction(
t(),
(t(), {:ok, Ash.Resource.record()} | {:error, term()} ->
{:ok, Ash.Resource.record()}
| {:error, term}),
Keyword.t()
) :: t()
def after_transaction(changeset, func, opts \\ []) do
if opts[:prepend?] do
%{changeset | after_transaction: [func | changeset.after_transaction]}
else
%{changeset | after_transaction: changeset.after_transaction ++ [func]}
end
end
@doc """
Adds an around_action hook to the changeset.

View file

@ -275,7 +275,7 @@ defmodule Ash.DataLayer do
data_layer.transaction(resource, func)
end
else
func.()
{:ok, func.()}
end
end
end

View file

@ -113,33 +113,15 @@ defmodule Ash.Engine do
end
true ->
if !Application.get_env(:ash, :disable_async?) &&
(is_nil(opts[:resource]) ||
Ash.DataLayer.data_layer_can?(opts[:resource], :async_engine)) && opts[:timeout] &&
opts[:timeout] != :infinity && !Ash.DataLayer.in_transaction?(opts[:resource]) do
task =
async(
fn ->
do_run(requests, opts)
end,
opts
)
try do
case Task.await(task, opts[:timeout]) do
{:__exception__, e, stacktrace} ->
reraise e, stacktrace
other ->
other
end
catch
:exit, {:timeout, {Task, :await, [^task, timeout]}} ->
{:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: opts[:name])}
end
else
do_run(requests, opts)
end
task_with_timeout(
fn ->
do_run(requests, opts)
end,
opts[:resource],
opts[:timeout],
opts[:name],
opts[:tracer]
)
end
|> case do
{:ok, %{resource_notifications: resource_notifications} = result} ->
@ -160,6 +142,35 @@ defmodule Ash.Engine do
end
end
@doc false
def task_with_timeout(fun, resource, timeout, name, tracer) do
if !Application.get_env(:ash, :disable_async?) &&
(is_nil(resource) ||
Ash.DataLayer.data_layer_can?(resource, :async_engine)) && timeout &&
timeout != :infinity && !Ash.DataLayer.in_transaction?(resource) do
task =
async(
fun,
tracer: tracer
)
try do
case Task.await(task, timeout) do
{:__exception__, e, stacktrace} ->
reraise e, stacktrace
other ->
other
end
catch
:exit, {:timeout, {Task, :await, [^task, timeout]}} ->
{:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: name)}
end
else
fun.()
end
end
defp transaction_metadata(opts) do
case opts[:transaction_reason] do
%{metadata: metadata} = reason ->