fix: don't start async limiter tasks if async is disabled

fix: properly set default timeout to `:infinity`

this avoids unnecessary processes starting when in the vast majority of cases some external thing is imposing a timeout.

fix: pass down `identity` when doing upserts, for new feature support
This commit is contained in:
Zach Daniel 2024-06-17 10:56:48 -04:00
parent 1fe799c7bb
commit fa5d4a1a96
12 changed files with 136 additions and 57 deletions

View file

@ -33,7 +33,7 @@ defmodule Ash do
timeout: [
type: :timeout,
doc: """
A positive integer, or `:infinity`. If none is provided, the timeout configured on the domain is used (which defaults to `30_000`).
A positive integer, or `:infinity`. If none is provided, the timeout configured on the domain is used.
"""
],
tracer: [

View file

@ -1031,6 +1031,12 @@ defmodule Ash.Actions.Create.Bulk do
domain: domain,
upsert?: opts[:upsert?] || action.upsert?,
upsert_keys: upsert_keys,
identity:
(opts[:upsert_identity] || action.upsert_identity) &&
Ash.Resource.Info.identity(
resource,
opts[:upsert_identity] || action.upsert_identity
),
upsert_fields:
Ash.Changeset.expand_upsert_fields(
opts[:upsert_fields] || action.upsert_fields,
@ -1101,6 +1107,12 @@ defmodule Ash.Actions.Create.Bulk do
%{
select: opts[:select],
batch_size: opts[:batch_size],
identity:
(opts[:upsert_identity] || action.upsert_identity) &&
Ash.Resource.Info.identity(
resource,
opts[:upsert_identity] || action.upsert_identity
),
return_records?:
opts[:return_records?] || must_return_records? ||
must_return_records_for_changes?,

View file

@ -346,7 +346,15 @@ defmodule Ash.Actions.Create do
opts[:upsert?] ->
changeset.resource
|> Ash.DataLayer.upsert(changeset, upsert_keys)
|> Ash.DataLayer.upsert(
changeset,
upsert_keys,
(opts[:upsert_identity] || changeset.action.upsert_identity) &&
Ash.Resource.Info.identity(
changeset.resource,
opts[:upsert_identity] || changeset.action.upsert_identity
)
)
|> Helpers.rollback_if_in_transaction(
changeset.resource,
changeset

View file

@ -21,32 +21,36 @@ defmodule Ash.Actions.Read.AsyncLimiter do
func
)
when not is_nil(async_limiter) do
if Ash.DataLayer.data_layer_can?(resource, :async_engine) && !in_transaction?(query) do
claimed? =
Agent.get_and_update(async_limiter, fn
{limit, limit} ->
{false, {limit, limit}}
if Application.get_env(:ash, :disable_async?) do
func.()
else
if Ash.DataLayer.data_layer_can?(resource, :async_engine) && !in_transaction?(query) do
claimed? =
Agent.get_and_update(async_limiter, fn
{limit, limit} ->
{false, {limit, limit}}
{count, limit} ->
{true, {count + 1, limit}}
end)
{count, limit} ->
{true, {count + 1, limit}}
end)
if claimed? do
try do
Ash.ProcessHelpers.async(
fn ->
func.()
end,
opts
)
after
release(async_limiter)
if claimed? do
try do
Ash.ProcessHelpers.async(
fn ->
func.()
end,
opts
)
after
release(async_limiter)
end
else
func.()
end
else
func.()
end
else
func.()
end
end

View file

@ -169,6 +169,7 @@ defmodule Ash.DataLayer do
return_records?: boolean,
upsert?: boolean,
upsert_keys: nil | list(atom),
identity: Ash.Resource.Identity.t() | nil,
select: list(atom),
upsert_fields:
nil
@ -199,6 +200,13 @@ defmodule Ash.DataLayer do
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
@callback upsert(Ash.Resource.t(), Ash.Changeset.t(), list(atom)) ::
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
@callback upsert(
Ash.Resource.t(),
Ash.Changeset.t(),
list(atom),
Ash.Resource.Identity.t() | nil
) ::
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
@callback update(Ash.Resource.t(), Ash.Changeset.t()) ::
{:ok, Ash.Resource.record()} | {:error, term} | {:error, :no_rollback, term}
@ -292,6 +300,7 @@ defmodule Ash.DataLayer do
transaction: 4,
rollback: 2,
upsert: 3,
upsert: 4,
functions: 1,
in_transaction?: 1,
prefer_lateral_join_for_many_to_many?: 0,
@ -516,12 +525,22 @@ defmodule Ash.DataLayer do
Ash.DataLayer.data_layer(resource).set_tenant(resource, query, term)
end
@spec upsert(Ash.Resource.t(), Ash.Changeset.t(), list(atom)) ::
@spec upsert(
Ash.Resource.t(),
Ash.Changeset.t(),
list(atom),
identity :: Ash.Resource.Identity.t() | nil
) ::
{:ok, Ash.Resource.record()} | {:error, term}
def upsert(resource, changeset, keys) do
def upsert(resource, changeset, keys, identity \\ nil) do
changeset = %{changeset | tenant: changeset.to_tenant}
data_layer = Ash.DataLayer.data_layer(resource)
Ash.DataLayer.data_layer(resource).upsert(resource, changeset, keys)
if function_exported?(data_layer, :upsert, 4) do
data_layer.upsert(resource, changeset, keys, identity)
else
data_layer.upsert(resource, changeset, keys)
end
end
@spec set_context(Ash.Resource.t(), data_layer_query(), map) ::

View file

@ -956,11 +956,12 @@ defmodule Ash.DataLayer.Ets do
@doc false
@impl true
def upsert(resource, changeset, keys, from_bulk_create? \\ false) do
def upsert(resource, changeset, keys, identity, from_bulk_create? \\ false) do
pkey = Ash.Resource.Info.primary_key(resource)
keys = keys || pkey
if Enum.any?(keys, &is_nil(Ash.Changeset.get_attribute(changeset, &1))) do
if (is_nil(identity) || !identity.nils_distinct?) &&
Enum.any?(keys, &is_nil(Ash.Changeset.get_attribute(changeset, &1))) do
create(resource, changeset, from_bulk_create?)
else
key_filters =
@ -970,7 +971,16 @@ defmodule Ash.DataLayer.Ets do
Map.get(changeset.params, to_string(key))}
end)
query = Ash.Query.do_filter(resource, and: [key_filters])
query =
resource
|> Ash.Query.do_filter(and: [key_filters])
|> then(fn query ->
if is_nil(identity) || is_nil(identity.where) do
query
else
Ash.Query.do_filter(query, identity.where)
end
end)
query =
if is_nil(changeset.filter) do
@ -1024,7 +1034,13 @@ defmodule Ash.DataLayer.Ets do
private: %{upsert_fields: options[:upsert_fields] || []}
})
case upsert(resource, changeset, options.upsert_keys, true) do
case upsert(
resource,
changeset,
options.upsert_keys,
options.identity,
true
) do
{:ok, result} ->
{:cont,
{:ok,
@ -1523,10 +1539,22 @@ defmodule Ash.DataLayer.Ets do
end
defp bulk_create_operation(
%{upsert?: true, upsert_keys: upsert_keys, upsert_fields: upsert_fields},
%{
upsert?: true,
upsert_keys: upsert_keys,
upsert_fields: upsert_fields,
upsert_where: expr
},
stream
) do
"Upserting #{Enum.count(stream)} on #{inspect(upsert_keys)}, setting #{inspect(List.wrap(upsert_fields))}"
where_expr =
if is_nil(expr) do
""
else
"where #{inspect(expr)}"
end
"Upserting #{Enum.count(stream)} on #{inspect(upsert_keys)} #{where_expr}, setting #{inspect(List.wrap(upsert_fields))}"
end
defp bulk_create_operation(_options, stream) do

View file

@ -201,7 +201,7 @@ defmodule Ash.Domain.Info do
@doc "The execution timeout for a domain"
@spec timeout(Ash.Domain.t()) :: nil | :infinity | integer()
def timeout(domain) do
Extension.get_opt(domain, [:execution], :timeout, 30_000, true)
Extension.get_opt(domain, [:execution], :timeout, :infinity, true)
end
@doc "The short name for a domain"

View file

@ -60,30 +60,34 @@ defmodule Ash.ProcessHelpers do
Creates a task that will properly transfer the ash context to the new process, and timeout if it takes longer than the given timeout
"""
def task_with_timeout(fun, resource, timeout, name, tracer) do
if !Application.get_env(:ash, :disable_async?) &&
(is_nil(resource) ||
Ash.DataLayer.data_layer_can?(resource, :async_engine)) && timeout &&
timeout != :infinity && !Ash.DataLayer.in_transaction?(resource) do
task =
async(
fun,
tracer: tracer
)
try do
case Task.await(task, timeout) do
{:__exception__, e, stacktrace} ->
reraise e, stacktrace
other ->
other
end
catch
:exit, {:timeout, {Task, :await, [^task, timeout]}} ->
{:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: name)}
end
else
if Application.get_env(:ash, :disable_async?) do
fun.()
else
if (is_nil(resource) ||
Ash.DataLayer.data_layer_can?(resource, :async_engine)) && timeout &&
timeout != :infinity &&
!Ash.DataLayer.in_transaction?(resource) do
task =
async(
fun,
tracer: tracer
)
try do
case Task.await(task, timeout) do
{:__exception__, e, stacktrace} ->
reraise e, stacktrace
other ->
other
end
catch
:exit, {:timeout, {Task, :await, [^task, timeout]}} ->
{:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: name)}
end
else
fun.()
end
end
end
end

View file

@ -44,7 +44,7 @@ defmodule Ash.Reactor.Dsl.BulkUpdate do
stream_with: nil,
success_state: :success,
tenant: [],
timeout: 30_000,
timeout: :infinity,
transaction: false,
transform: nil,
type: :bulk_update,

View file

@ -89,6 +89,8 @@ defmodule Ash.Resource.Identity do
name: atom(),
keys: list(atom()),
description: String.t() | nil,
where: nil | Ash.Expr.t(),
nils_distinct?: boolean(),
all_tenants?: boolean()
}
end

View file

@ -15,6 +15,7 @@ defmodule Ash.Resource.ManualCreate do
:domain,
:upsert?,
:upsert_keys,
:identity,
:upsert_fields,
:return_records?,
:batch_size
@ -27,6 +28,7 @@ defmodule Ash.Resource.ManualCreate do
tracer: list(module),
authorize?: boolean(),
domain: Ash.Domain.t(),
identity: Ash.Resource.Identity.t() | nil,
upsert?: boolean(),
upsert_keys: list(atom),
upsert_fields: list(atom),

View file

@ -35,7 +35,7 @@ defmodule Ash.Test.Resource.Changes.LifecycleHooksTest do
change before_transaction(fn changeset, _context ->
send(changeset.arguments.caller, changeset.phase)
Ash.Changeset.add_error(changeset, "WHAT")
changeset
end)
end