improvement: support fully atomic actions

This commit is contained in:
Zach Daniel 2024-09-12 09:04:51 -04:00
parent 4b92d54bc4
commit 2f55f7a61b
5 changed files with 273 additions and 138 deletions

View file

@ -27,9 +27,9 @@ if Mix.env() == :test do
config :ash_oban, actor_persister: AshOban.Test.ActorPersister
# sobelow_skip ["Config.Secrets"]
config :ash_oban, AshOban.Test.Repo,
username: "postgres",
# sobelow_skip ["Config.Secrets"]
password: "postgres",
database: "ash_oban_test",
hostname: "localhost",

View file

@ -16,10 +16,9 @@ defmodule AshOban.Transformers.DefineActionWorkers do
worker_module_name =
module_name(module, scheduled_action)
define_worker(module, worker_module_name, scheduled_action, dsl)
dsl
|> Transformer.async_compile(fn ->
define_worker(module, worker_module_name, scheduled_action, dsl)
end)
|> Transformer.replace_entity([:oban, :scheduled_actions], %{
scheduled_action
| worker: worker_module_name

View file

@ -20,15 +20,14 @@ defmodule AshOban.Transformers.DefineSchedulers do
worker_module_name = module_name(module, trigger, "Worker")
define_worker(module, worker_module_name, trigger, dsl)
dsl
|> Transformer.replace_entity([:oban, :triggers], %{
trigger
| scheduler: scheduler_module_name,
worker: worker_module_name
})
|> Transformer.async_compile(fn ->
define_worker(module, worker_module_name, trigger, dsl)
end)
|> then(fn dsl ->
if trigger.scheduler_cron do
Transformer.async_compile(dsl, fn ->
@ -266,70 +265,82 @@ defmodule AshOban.Transformers.DefineSchedulers do
can_transact? && trigger.on_error &&
Ash.Resource.Info.action(dsl, trigger.on_error).transaction?
trigger_action = Ash.Resource.Info.action(dsl, trigger.action)
work_transaction? =
can_transact? && Ash.Resource.Info.action(dsl, trigger.action).transaction?
can_transact? && trigger_action.transaction?
atomic? = Map.get(trigger_action, :require_atomic?, false)
can_lock? = Ash.DataLayer.data_layer_can?(dsl, {:lock, :for_update})
read_action =
trigger.worker_read_action || trigger.read_action ||
Map.get(trigger_action, :atomic_upgrade_with) ||
Ash.Resource.Info.primary_action!(resource, :read).name
get_and_lock =
if can_lock? do
if atomic? do
quote do
Ash.Changeset.before_action(changeset, fn changeset ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
actor: actor,
domain: unquote(domain)
)
|> Ash.Query.lock(:for_update)
|> Ash.read_one()
|> case do
{:ok, nil} ->
Ash.Changeset.add_error(
changeset,
AshOban.Errors.TriggerNoLongerApplies.exception([])
)
{:ok, record} ->
%{changeset | data: record}
{:error, error} ->
Ash.Changeset.add_error(changeset, error)
end
end)
filter = query().filter
Ash.Changeset.filter(changeset, filter)
end
else
quote do
Ash.Changeset.before_action(changeset, fn changeset ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
actor: actor,
domain: unquote(domain)
)
|> Ash.read_one()
|> case do
{:ok, nil} ->
Ash.Changeset.add_error(
changeset,
AshOban.Errors.TriggerNoLongerApplies.exception([])
)
if can_lock? do
quote do
Ash.Changeset.before_action(changeset, fn changeset ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
actor: actor,
domain: unquote(domain)
)
|> Ash.Query.lock(:for_update)
|> Ash.read_one()
|> case do
{:ok, nil} ->
Ash.Changeset.add_error(
changeset,
AshOban.Errors.TriggerNoLongerApplies.exception([])
)
{:ok, record} ->
%{changeset | data: record}
{:ok, record} ->
%{changeset | data: record}
{:error, error} ->
Ash.Changeset.add_error(changeset, error)
end
end)
{:error, error} ->
Ash.Changeset.add_error(changeset, error)
end
end)
end
else
quote do
Ash.Changeset.before_action(changeset, fn changeset ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
actor: actor,
domain: unquote(domain)
)
|> Ash.read_one()
|> case do
{:ok, nil} ->
Ash.Changeset.add_error(
changeset,
AshOban.Errors.TriggerNoLongerApplies.exception([])
)
{:ok, record} ->
%{changeset | data: record}
{:error, error} ->
Ash.Changeset.add_error(changeset, error)
end
end)
end
end
end
@ -361,7 +372,8 @@ defmodule AshOban.Transformers.DefineSchedulers do
handle_error = handle_error(trigger, resource, domain, read_action)
work = work(trigger, worker, pro?, read_action, resource, domain)
work =
work(trigger, worker, atomic?, trigger_action.type, pro?, read_action, resource, domain)
Module.create(
worker_module_name,
@ -562,7 +574,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end
defp work(trigger, worker, pro?, read_action, resource, domain) do
defp work(trigger, worker, atomic?, trigger_action_type, pro?, read_action, resource, domain) do
function_name =
if pro? do
:process
@ -578,100 +590,197 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end
else
quote location: :keep, generated: true do
@impl unquote(worker)
def unquote(function_name)(%Oban.Job{args: %{"primary_key" => primary_key} = args} = job) do
AshOban.debug(
"Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered for primary key #{inspect(primary_key)}",
unquote(trigger.debug?)
)
if atomic? do
quote location: :keep, generated: true do
@impl unquote(worker)
def unquote(function_name)(
%Oban.Job{args: %{"primary_key" => primary_key} = args} = job
) do
AshOban.debug(
"Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered for primary key #{inspect(primary_key)}",
unquote(trigger.debug?)
)
case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
authorize? = AshOban.authorize?()
case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
authorize? = AshOban.authorize?()
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
actor: actor,
domain: unquote(domain)
)
|> Ash.read_one()
|> case do
{:ok, nil} ->
AshOban.debug(
"Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
unquote(trigger.debug?)
args =
if unquote(is_nil(trigger.read_metadata)) do
%{}
else
%{metadata: args["metadata"]}
end
query =
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
actor: actor,
domain: unquote(domain)
)
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
args =
if unquote(is_nil(trigger.read_metadata)) do
%{}
else
%{metadata: args["metadata"]}
end
record
|> Ash.Changeset.new()
|> prepare(primary_key, authorize?, actor)
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_action(
if unquote(trigger_action_type) == :update do
Ash.bulk_update!(
query,
unquote(trigger.action),
Map.merge(unquote(Macro.escape(trigger.action_input || %{})), args),
authorize?: authorize?,
actor: actor,
domain: unquote(domain),
skip_unknown_input: [:metadata]
skip_unknown_inputs: [:metadata],
return_errors?: true,
notify?: true,
return_records?: true
)
|> AshOban.update_or_destroy()
|> case do
:ok ->
:ok
else
Ash.bulk_destroy!(
query,
unquote(trigger.action),
Map.merge(unquote(Macro.escape(trigger.action_input || %{})), args),
authorize?: authorize?,
actor: actor,
domain: unquote(domain),
skip_unknown_inputs: [:metadata],
return_errors?: true,
notify?: true
)
end
{:ok, result} ->
{:ok, result}
:ok
{:error,
%Ash.Error.Invalid{errors: [%AshOban.Errors.TriggerNoLongerApplies{}]}} ->
AshOban.debug(
"Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
unquote(trigger.debug?)
)
{:error, error} ->
AshOban.debug(
"""
Record with primary key #{inspect(primary_key)} encountered an error in #{unquote(inspect(resource))}#{unquote(trigger.name)}
{:discard, :trigger_no_longer_applies}
Could not lookup actor with #{inspect(args["actor"])}
{:error, error} ->
raise Ash.Error.to_error_class(error)
end
end
#{Exception.format(:error, error, AshOban.stacktrace(error))}
""",
unquote(trigger.debug?)
)
{:error, error} ->
AshOban.debug(
"""
Record with primary key #{inspect(primary_key)} encountered an error in #{unquote(inspect(resource))}#{unquote(trigger.name)}
Could not lookup actor with #{inspect(args["actor"])}
#{Exception.format(:error, error, AshOban.stacktrace(error))}
""",
unquote(trigger.debug?)
raise Ash.Error.to_error_class(error)
end
rescue
error ->
handle_error(
job,
Ash.Error.to_ash_error(error, __STACKTRACE__),
primary_key,
__STACKTRACE__
)
raise Ash.Error.to_error_class(error)
end
rescue
error ->
handle_error(
job,
Ash.Error.to_ash_error(error, __STACKTRACE__),
primary_key,
__STACKTRACE__
end
else
quote location: :keep, generated: true do
@impl unquote(worker)
def unquote(function_name)(
%Oban.Job{args: %{"primary_key" => primary_key} = args} = job
) do
AshOban.debug(
"Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered for primary key #{inspect(primary_key)}",
unquote(trigger.debug?)
)
case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
authorize? = AshOban.authorize?()
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
actor: actor,
domain: unquote(domain)
)
|> Ash.read_one()
|> case do
{:ok, nil} ->
AshOban.debug(
"Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
unquote(trigger.debug?)
)
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
args =
if unquote(is_nil(trigger.read_metadata)) do
%{}
else
%{metadata: args["metadata"]}
end
record
|> Ash.Changeset.new()
|> prepare(primary_key, authorize?, actor)
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_action(
unquote(trigger.action),
Map.merge(unquote(Macro.escape(trigger.action_input || %{})), args),
authorize?: authorize?,
actor: actor,
domain: unquote(domain),
skip_unknown_input: [:metadata]
)
|> AshOban.update_or_destroy()
|> case do
:ok ->
:ok
{:ok, result} ->
{:ok, result}
{:error, %Ash.Error.Invalid{errors: [%Ash.Error.Changes.StaleRecord{}]}} ->
AshOban.debug(
"Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
unquote(trigger.debug?)
)
{:discard, :trigger_no_longer_applies}
{:error,
%Ash.Error.Invalid{errors: [%AshOban.Errors.TriggerNoLongerApplies{}]}} ->
AshOban.debug(
"Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
unquote(trigger.debug?)
)
{:discard, :trigger_no_longer_applies}
{:error, error} ->
raise Ash.Error.to_error_class(error)
end
end
{:error, error} ->
AshOban.debug(
"""
Record with primary key #{inspect(primary_key)} encountered an error in #{unquote(inspect(resource))}#{unquote(trigger.name)}
Could not lookup actor with #{inspect(args["actor"])}
#{Exception.format(:error, error, AshOban.stacktrace(error))}
""",
unquote(trigger.debug?)
)
raise Ash.Error.to_error_class(error)
end
rescue
error ->
handle_error(
job,
Ash.Error.to_ash_error(error, __STACKTRACE__),
primary_key,
__STACKTRACE__
)
end
end
end
end

View file

@ -20,7 +20,7 @@ defmodule AshObanTest do
end
test "nothing happens if no records exist" do
assert %{success: 1} = AshOban.Test.schedule_and_run_triggers(Triggered)
assert %{success: 2} = AshOban.Test.schedule_and_run_triggers(Triggered)
end
test "if a record exists, it is processed" do
@ -29,23 +29,35 @@ defmodule AshObanTest do
|> Ash.create!()
assert %{success: 2} =
AshOban.Test.schedule_and_run_triggers(Triggered,
AshOban.Test.schedule_and_run_triggers({Triggered, :process},
actor: %AshOban.Test.ActorPersister.FakeActor{id: 1}
)
end
test "actions done atomically will be done atomically" do
Triggered
|> Ash.Changeset.for_create(:create, %{})
|> Ash.create!()
assert %{success: 2} =
AshOban.Test.schedule_and_run_triggers({Triggered, :process_atomically})
assert Ash.read_first!(Triggered).processed
end
test "if an actor is not set, it is nil when executing the job" do
Triggered
|> Ash.Changeset.for_create(:create)
|> Ash.create!()
assert %{success: 1, failure: 1} =
assert %{success: 3, failure: 1} =
AshOban.Test.schedule_and_run_triggers(Triggered)
end
test "dsl introspection" do
assert [
%AshOban.Trigger{action: :process},
%AshOban.Trigger{action: :process_atomically},
%AshOban.Trigger{action: :process, scheduler: nil}
] = AshOban.Info.oban_triggers(Triggered)
end
@ -69,6 +81,7 @@ defmodule AshObanTest do
[
crontab: [
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process, []}
]
]}
@ -112,6 +125,8 @@ defmodule AshObanTest do
crontab: [
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process,
[paused: false]}
]

View file

@ -15,6 +15,14 @@ defmodule AshOban.Test.Triggered do
worker_read_action(:read)
end
trigger :process_atomically do
action :process_atomically
queue :triggered_process
where expr(processed != true)
max_attempts 2
worker_read_action(:read)
end
trigger :process_2 do
action :process
where expr(processed != true)
@ -47,6 +55,10 @@ defmodule AshOban.Test.Triggered do
pagination keyset?: true
end
update :process_atomically do
change set_attribute(:processed, true)
end
update :process do
require_atomic? false
change set_attribute(:processed, true)