2023-04-28 14:07:05 +12:00
|
|
|
defmodule AshOban.Transformers.DefineSchedulers do
|
2023-09-17 03:17:40 +12:00
|
|
|
# Define scheduler and worker modules.
|
|
|
|
@moduledoc false
|
2023-04-28 14:36:29 +12:00
|
|
|
|
2023-04-28 14:07:05 +12:00
|
|
|
use Spark.Dsl.Transformer
|
|
|
|
alias Spark.Dsl.Transformer
|
|
|
|
|
2023-08-26 01:15:21 +12:00
|
|
|
def after?(_), do: true
|
2023-04-28 14:07:05 +12:00
|
|
|
|
|
|
|
def transform(dsl) do
|
|
|
|
module = Transformer.get_persisted(dsl, :module)
|
|
|
|
|
|
|
|
dsl
|
|
|
|
|> AshOban.Info.oban_triggers()
|
|
|
|
|> Enum.reduce(dsl, fn trigger, dsl ->
|
2023-08-25 08:15:04 +12:00
|
|
|
scheduler_module_name =
|
|
|
|
if trigger.scheduler_cron do
|
|
|
|
module_name(module, trigger, "Scheduler")
|
|
|
|
end
|
|
|
|
|
2023-04-28 14:07:05 +12:00
|
|
|
worker_module_name = module_name(module, trigger, "Worker")
|
|
|
|
|
|
|
|
dsl
|
|
|
|
|> Transformer.replace_entity([:oban, :triggers], %{
|
|
|
|
trigger
|
|
|
|
| scheduler: scheduler_module_name,
|
|
|
|
worker: worker_module_name
|
|
|
|
})
|
|
|
|
|> Transformer.async_compile(fn ->
|
|
|
|
define_worker(module, worker_module_name, trigger, dsl)
|
|
|
|
end)
|
2023-08-25 08:15:04 +12:00
|
|
|
|> then(fn dsl ->
|
|
|
|
if trigger.scheduler_cron do
|
|
|
|
Transformer.async_compile(dsl, fn ->
|
|
|
|
define_scheduler(module, scheduler_module_name, worker_module_name, trigger, dsl)
|
|
|
|
end)
|
|
|
|
else
|
|
|
|
dsl
|
|
|
|
end
|
2023-04-28 14:07:05 +12:00
|
|
|
end)
|
|
|
|
end)
|
|
|
|
|> then(&{:ok, &1})
|
|
|
|
end
|
|
|
|
|
|
|
|
defp module_name(module, trigger, type) do
|
|
|
|
module
|
|
|
|
|> List.wrap()
|
|
|
|
|> Enum.concat(["AshOban", type])
|
|
|
|
|> Enum.concat([Macro.camelize(to_string(trigger.name))])
|
|
|
|
|> Module.concat()
|
|
|
|
end
|
|
|
|
|
|
|
|
defp define_scheduler(resource, scheduler_module_name, worker_module_name, trigger, dsl) do
|
|
|
|
api = AshOban.Info.oban_api!(dsl)
|
|
|
|
primary_key = Ash.Resource.Info.primary_key(dsl)
|
2023-08-24 10:24:53 +12:00
|
|
|
|
2023-04-28 14:07:05 +12:00
|
|
|
pro? = AshOban.Info.pro?()
|
|
|
|
|
|
|
|
filter =
|
|
|
|
if not is_nil(trigger.where) do
|
2023-04-29 05:59:56 +12:00
|
|
|
quote location: :keep do
|
2023-04-28 14:07:05 +12:00
|
|
|
def filter(query) do
|
|
|
|
Ash.Query.do_filter(query, unquote(Macro.escape(trigger.where)))
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2023-06-11 05:15:29 +12:00
|
|
|
limit_stream =
|
|
|
|
if trigger.record_limit do
|
|
|
|
quote do
|
|
|
|
def limit_stream(query) do
|
|
|
|
Ash.Query.limit(query, unquote(trigger.record_limit))
|
|
|
|
end
|
|
|
|
end
|
|
|
|
else
|
|
|
|
quote do
|
|
|
|
def limit_stream(query) do
|
|
|
|
query
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2023-08-25 08:15:04 +12:00
|
|
|
batch_opts =
|
|
|
|
if trigger.stream_batch_size do
|
|
|
|
quote do
|
|
|
|
[batch_size: unquote(trigger.stream_batch_size)]
|
|
|
|
end
|
|
|
|
else
|
|
|
|
quote do
|
|
|
|
[]
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2023-04-28 14:07:05 +12:00
|
|
|
stream =
|
|
|
|
if is_nil(trigger.where) do
|
2023-04-29 05:59:56 +12:00
|
|
|
quote location: :keep do
|
2023-04-28 14:07:05 +12:00
|
|
|
def stream(resource) do
|
|
|
|
resource
|
|
|
|
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|
|
|
|
|> Ash.Query.select(unquote(primary_key))
|
2023-06-11 05:15:29 +12:00
|
|
|
|> limit_stream()
|
2023-04-28 14:07:05 +12:00
|
|
|
|> Ash.Query.for_read(unquote(trigger.read_action))
|
2023-08-25 08:15:04 +12:00
|
|
|
|> unquote(api).stream!(unquote(batch_opts))
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
|
|
|
end
|
|
|
|
else
|
2023-04-29 05:59:56 +12:00
|
|
|
quote location: :keep do
|
2023-04-28 14:07:05 +12:00
|
|
|
def stream(resource) do
|
|
|
|
resource
|
|
|
|
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|
|
|
|
|> Ash.Query.select(unquote(primary_key))
|
2023-06-11 05:15:29 +12:00
|
|
|
|> limit_stream()
|
2023-04-28 14:07:05 +12:00
|
|
|
|> filter()
|
2023-06-11 05:15:29 +12:00
|
|
|
|> Ash.Query.for_read(unquote(trigger.read_action))
|
2023-04-28 14:07:05 +12:00
|
|
|
|> unquote(api).stream!()
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
insert =
|
|
|
|
if pro? do
|
2023-04-29 05:59:56 +12:00
|
|
|
quote location: :keep do
|
2023-09-01 03:25:05 +12:00
|
|
|
defp insert(stream) do
|
|
|
|
count =
|
|
|
|
stream
|
|
|
|
|> Stream.chunk_every(100)
|
|
|
|
|> Stream.map(fn batch ->
|
|
|
|
Oban.insert_all(batch)
|
|
|
|
Enum.count(batch)
|
|
|
|
end)
|
|
|
|
|> Enum.sum()
|
|
|
|
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"Scheduled #{count} jobs for trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)}",
|
|
|
|
unquote(trigger.debug?)
|
2023-09-01 03:25:05 +12:00
|
|
|
)
|
|
|
|
|
|
|
|
:ok
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
|
|
|
end
|
|
|
|
else
|
2023-04-29 05:59:56 +12:00
|
|
|
quote location: :keep do
|
2023-09-01 03:25:05 +12:00
|
|
|
defp insert(stream) do
|
|
|
|
count =
|
|
|
|
stream
|
|
|
|
|> Stream.each(&Oban.insert!/1)
|
|
|
|
|> Enum.count()
|
|
|
|
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"Scheduled #{count} jobs for trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)}",
|
|
|
|
unquote(trigger.debug?)
|
2023-09-01 03:25:05 +12:00
|
|
|
)
|
|
|
|
|
|
|
|
:ok
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
worker =
|
|
|
|
if pro? do
|
|
|
|
Oban.Pro.Worker
|
|
|
|
else
|
|
|
|
Oban.Worker
|
|
|
|
end
|
|
|
|
|
|
|
|
function_name =
|
|
|
|
if pro? do
|
|
|
|
:process
|
|
|
|
else
|
|
|
|
:perform
|
|
|
|
end
|
|
|
|
|
|
|
|
quoted =
|
2023-04-29 05:59:56 +12:00
|
|
|
quote location: :keep do
|
2023-04-28 14:07:05 +12:00
|
|
|
use unquote(worker),
|
2023-08-24 10:24:53 +12:00
|
|
|
priority: unquote(trigger.worker_priority),
|
2023-04-28 14:07:05 +12:00
|
|
|
queue: unquote(trigger.scheduler_queue),
|
|
|
|
unique: [
|
|
|
|
period: :infinity,
|
|
|
|
states: [
|
|
|
|
:available,
|
|
|
|
:retryable,
|
|
|
|
:scheduled
|
|
|
|
]
|
|
|
|
],
|
|
|
|
max_attempts: unquote(trigger.max_scheduler_attempts)
|
|
|
|
|
|
|
|
require Logger
|
|
|
|
|
|
|
|
@impl unquote(worker)
|
|
|
|
if unquote(trigger.state != :active) do
|
|
|
|
def unquote(function_name)(%Oban.Job{}) do
|
|
|
|
{:discard, unquote(trigger.state)}
|
|
|
|
end
|
|
|
|
else
|
|
|
|
def unquote(function_name)(%Oban.Job{}) do
|
2023-08-25 08:15:04 +12:00
|
|
|
metadata =
|
|
|
|
case AshOban.Info.oban_trigger(unquote(resource), unquote(trigger.name)) do
|
|
|
|
%{read_metadata: read_metadata} when is_function(read_metadata) ->
|
|
|
|
read_metadata
|
|
|
|
|
|
|
|
_ ->
|
|
|
|
fn _ -> %{} end
|
|
|
|
end
|
|
|
|
|
2023-04-28 14:07:05 +12:00
|
|
|
unquote(resource)
|
|
|
|
|> stream()
|
|
|
|
|> Stream.map(fn record ->
|
|
|
|
unquote(worker_module_name).new(%{
|
2023-08-25 08:15:04 +12:00
|
|
|
primary_key: Map.take(record, unquote(primary_key)),
|
|
|
|
metadata: metadata.(record)
|
2023-04-28 14:07:05 +12:00
|
|
|
})
|
|
|
|
end)
|
|
|
|
|> insert()
|
2023-07-20 00:36:00 +12:00
|
|
|
rescue
|
|
|
|
e ->
|
2023-07-20 06:08:08 +12:00
|
|
|
Logger.error(
|
|
|
|
"Error running AshOban scheduler #{__MODULE__}.\n#{Exception.format(:error, e, __STACKTRACE__)}"
|
|
|
|
)
|
2023-07-20 00:36:00 +12:00
|
|
|
|
|
|
|
reraise e, __STACKTRACE__
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2023-06-11 05:15:29 +12:00
|
|
|
unquote(limit_stream)
|
2023-04-28 14:07:05 +12:00
|
|
|
unquote(stream)
|
|
|
|
unquote(filter)
|
|
|
|
unquote(insert)
|
|
|
|
end
|
|
|
|
|
|
|
|
Module.create(scheduler_module_name, quoted, Macro.Env.location(__ENV__))
|
|
|
|
end
|
|
|
|
|
2023-04-29 05:59:56 +12:00
|
|
|
# sobelow_skip ["SQL.Query"]
|
2023-04-28 14:07:05 +12:00
|
|
|
defp define_worker(resource, worker_module_name, trigger, dsl) do
|
|
|
|
api = AshOban.Info.oban_api!(dsl)
|
|
|
|
pro? = AshOban.Info.pro?()
|
|
|
|
|
|
|
|
worker =
|
|
|
|
if pro? do
|
|
|
|
Oban.Pro.Worker
|
|
|
|
else
|
|
|
|
Oban.Worker
|
|
|
|
end
|
|
|
|
|
2023-04-29 05:59:56 +12:00
|
|
|
query = query(trigger, resource)
|
2023-04-28 14:07:05 +12:00
|
|
|
|
2023-04-29 05:59:56 +12:00
|
|
|
can_transact? = Ash.DataLayer.data_layer_can?(dsl, :transact)
|
|
|
|
|
|
|
|
on_error_transaction? =
|
|
|
|
can_transact? && trigger.on_error &&
|
|
|
|
Ash.Resource.Info.action(dsl, trigger.on_error).transaction?
|
|
|
|
|
|
|
|
work_transaction? =
|
|
|
|
can_transact? && Ash.Resource.Info.action(dsl, trigger.action).transaction?
|
|
|
|
|
|
|
|
can_lock? = Ash.DataLayer.data_layer_can?(dsl, {:lock, :for_update})
|
|
|
|
|
2023-07-21 00:02:02 +12:00
|
|
|
read_action =
|
|
|
|
trigger.worker_read_action || trigger.read_action ||
|
|
|
|
Ash.Resource.Info.primary_action!(resource, :read).name
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
get_and_lock =
|
|
|
|
if can_lock? do
|
|
|
|
quote do
|
|
|
|
Ash.Changeset.before_action(changeset, fn changeset ->
|
|
|
|
query()
|
|
|
|
|> Ash.Query.do_filter(primary_key)
|
|
|
|
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|
2023-07-21 00:02:02 +12:00
|
|
|
|> Ash.Query.for_read(unquote(read_action))
|
2023-05-02 09:05:49 +12:00
|
|
|
|> Ash.Query.lock(:for_update)
|
|
|
|
|> unquote(api).read_one()
|
|
|
|
|> case do
|
|
|
|
{:ok, nil} ->
|
|
|
|
Ash.Changeset.add_error(changeset, "trigger no longer applies")
|
|
|
|
|
|
|
|
{:ok, record} ->
|
|
|
|
%{changeset | data: record}
|
|
|
|
|
|
|
|
{:error, error} ->
|
2023-05-02 10:10:41 +12:00
|
|
|
Ash.Changeset.add_error(changeset, error)
|
2023-05-02 09:05:49 +12:00
|
|
|
end
|
|
|
|
end)
|
|
|
|
end
|
|
|
|
else
|
|
|
|
quote do
|
|
|
|
Ash.Changeset.before_action(changeset, fn changeset ->
|
|
|
|
query()
|
|
|
|
|> Ash.Query.do_filter(primary_key)
|
|
|
|
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|
2023-07-21 00:02:02 +12:00
|
|
|
|> Ash.Query.for_read(unquote(read_action))
|
2023-05-02 09:05:49 +12:00
|
|
|
|> unquote(api).read_one()
|
|
|
|
|> case do
|
|
|
|
{:ok, nil} ->
|
|
|
|
Ash.Changeset.add_error(changeset, "trigger no longer applies")
|
|
|
|
|
|
|
|
{:ok, record} ->
|
|
|
|
%{changeset | data: record}
|
|
|
|
|
|
|
|
{:error, error} ->
|
2023-05-02 10:10:41 +12:00
|
|
|
Ash.Changeset.add_error(changeset, error)
|
2023-04-29 05:59:56 +12:00
|
|
|
end
|
2023-05-02 09:05:49 +12:00
|
|
|
end)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
prepare_error =
|
|
|
|
if on_error_transaction? do
|
|
|
|
quote location: :keep do
|
|
|
|
defp prepare_error(changeset, primary_key) do
|
|
|
|
unquote(get_and_lock)
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
2023-05-02 09:05:49 +12:00
|
|
|
end
|
|
|
|
else
|
|
|
|
quote location: :keep do
|
|
|
|
defp prepare_error(changeset, primary_key), do: changeset
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
prepare =
|
|
|
|
if work_transaction? do
|
|
|
|
quote location: :keep do
|
|
|
|
defp prepare(changeset, primary_key) do
|
|
|
|
unquote(get_and_lock)
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
|
|
|
end
|
2023-05-02 09:05:49 +12:00
|
|
|
else
|
|
|
|
quote location: :keep do
|
|
|
|
defp prepare(changeset, primary_key), do: changeset
|
|
|
|
end
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
|
|
|
|
2023-08-05 09:11:11 +12:00
|
|
|
handle_error = handle_error(trigger, resource, api, read_action)
|
2023-04-29 05:59:56 +12:00
|
|
|
|
2023-09-20 04:32:50 +12:00
|
|
|
work = work(trigger, worker, pro?, read_action, resource, api)
|
2023-04-29 05:59:56 +12:00
|
|
|
|
2023-04-28 14:07:05 +12:00
|
|
|
Module.create(
|
|
|
|
worker_module_name,
|
2023-04-29 05:59:56 +12:00
|
|
|
quote location: :keep do
|
2023-04-28 14:07:05 +12:00
|
|
|
use unquote(worker),
|
2023-08-24 10:24:53 +12:00
|
|
|
priority: unquote(trigger.scheduler_priority),
|
2023-04-29 05:59:56 +12:00
|
|
|
max_attempts: unquote(trigger.max_attempts),
|
2023-04-28 14:07:05 +12:00
|
|
|
queue: unquote(trigger.queue),
|
|
|
|
unique: [
|
|
|
|
period: :infinity,
|
|
|
|
states: [
|
|
|
|
:available,
|
|
|
|
:retryable,
|
|
|
|
:scheduled
|
|
|
|
]
|
|
|
|
]
|
|
|
|
|
|
|
|
require Logger
|
|
|
|
|
2023-04-29 05:59:56 +12:00
|
|
|
unquote(work)
|
|
|
|
unquote(query)
|
|
|
|
unquote(handle_error)
|
2023-05-02 09:05:49 +12:00
|
|
|
unquote(prepare)
|
|
|
|
unquote(prepare_error)
|
2023-04-29 05:59:56 +12:00
|
|
|
end,
|
|
|
|
Macro.Env.location(__ENV__)
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
2023-08-05 09:11:11 +12:00
|
|
|
defp handle_error(trigger, resource, api, read_action) do
|
2023-04-29 05:59:56 +12:00
|
|
|
if trigger.on_error do
|
|
|
|
# We look up the record again since we have exited any potential transaction we were in before
|
2023-05-02 09:05:49 +12:00
|
|
|
quote location: :keep do
|
2023-08-05 09:22:53 +12:00
|
|
|
def handle_error(
|
|
|
|
%{max_attempts: max_attempts, attempt: attempt},
|
|
|
|
error,
|
|
|
|
_primary_key,
|
|
|
|
stacktrace
|
|
|
|
)
|
|
|
|
when max_attempts != attempt do
|
2023-09-20 04:32:50 +12:00
|
|
|
reraise error, stacktrace
|
2023-08-05 09:22:53 +12:00
|
|
|
end
|
|
|
|
|
|
|
|
def handle_error(
|
|
|
|
%{max_attempts: max_attempts, attempt: max_attempts} = job,
|
|
|
|
error,
|
|
|
|
primary_key,
|
|
|
|
stacktrace
|
|
|
|
) do
|
2023-05-02 09:05:49 +12:00
|
|
|
query()
|
|
|
|
|> Ash.Query.do_filter(primary_key)
|
|
|
|
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|
2023-08-05 09:11:11 +12:00
|
|
|
|> Ash.Query.for_read(unquote(read_action))
|
2023-05-02 09:05:49 +12:00
|
|
|
|> unquote(api).read_one()
|
|
|
|
|> case do
|
2023-09-01 03:25:05 +12:00
|
|
|
{:error, error} ->
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"""
|
|
|
|
Record with primary key #{inspect(primary_key)} encountered an error in #{unquote(inspect(resource))}#{unquote(trigger.name)}
|
|
|
|
|
|
|
|
#{Exception.format(:error, error, AshOban.stacktrace(error))}
|
|
|
|
""",
|
|
|
|
unquote(trigger.debug?)
|
|
|
|
)
|
|
|
|
|
2023-09-01 03:25:05 +12:00
|
|
|
{:error, error}
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
{:ok, nil} ->
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
|
|
|
|
unquote(trigger.debug?)
|
2023-09-01 03:25:05 +12:00
|
|
|
)
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
{:discard, :trigger_no_longer_applies}
|
|
|
|
|
|
|
|
{:ok, record} ->
|
|
|
|
record
|
|
|
|
|> Ash.Changeset.new()
|
|
|
|
|> prepare_error(primary_key)
|
|
|
|
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|
2023-09-01 03:25:05 +12:00
|
|
|
|> Ash.Changeset.for_action(unquote(trigger.on_error), %{error: error})
|
|
|
|
|> AshOban.update_or_destroy(unquote(api))
|
2023-05-02 09:05:49 +12:00
|
|
|
|> case do
|
2023-09-01 03:25:05 +12:00
|
|
|
:ok ->
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"Performed #{unquote(trigger.action)} on #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
|
|
|
|
unquote(trigger.debug?)
|
2023-09-01 03:25:05 +12:00
|
|
|
)
|
|
|
|
|
|
|
|
:ok
|
|
|
|
|
2023-08-05 09:22:53 +12:00
|
|
|
{:ok, result} ->
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"Performed #{unquote(trigger.action)} on #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
|
|
|
|
unquote(trigger.debug?)
|
2023-09-01 03:25:05 +12:00
|
|
|
)
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
:ok
|
|
|
|
|
|
|
|
{:error, error} ->
|
2023-09-20 04:32:50 +12:00
|
|
|
error = Ash.Error.to_ash_error(error, stacktrace)
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
Logger.error("""
|
|
|
|
Error handler failed for #{inspect(unquote(resource))}: #{inspect(primary_key)}!
|
|
|
|
|
2023-09-20 04:32:50 +12:00
|
|
|
#{inspect(Exception.format(:error, error, AshOban.stacktrace(error)))}
|
2023-05-02 09:05:49 +12:00
|
|
|
""")
|
|
|
|
|
2023-09-20 04:32:50 +12:00
|
|
|
reraise error, stacktrace
|
2023-05-02 09:05:49 +12:00
|
|
|
end
|
2023-04-29 05:59:56 +12:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
else
|
|
|
|
quote location: :keep do
|
2023-08-05 09:22:53 +12:00
|
|
|
def handle_error(_job, error, _, stacktrace) do
|
2023-09-20 04:32:50 +12:00
|
|
|
reraise error, stacktrace
|
2023-04-29 05:59:56 +12:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
defp query(trigger, resource) do
|
|
|
|
if is_nil(trigger.where) do
|
|
|
|
quote location: :keep do
|
|
|
|
def query do
|
|
|
|
unquote(resource)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
else
|
|
|
|
quote location: :keep do
|
|
|
|
def query do
|
|
|
|
Ash.Query.do_filter(unquote(resource), unquote(Macro.escape(trigger.where)))
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2023-09-20 04:32:50 +12:00
|
|
|
defp work(trigger, worker, pro?, read_action, resource, api) do
|
2023-04-29 05:59:56 +12:00
|
|
|
function_name =
|
|
|
|
if pro? do
|
|
|
|
:process
|
|
|
|
else
|
|
|
|
:perform
|
|
|
|
end
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
if trigger.state != :active do
|
|
|
|
quote location: :keep do
|
|
|
|
@impl unquote(worker)
|
|
|
|
def unquote(function_name)(_) do
|
|
|
|
{:discard, unquote(trigger.state)}
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
2023-05-02 09:05:49 +12:00
|
|
|
end
|
|
|
|
else
|
2023-10-07 13:49:50 +13:00
|
|
|
quote location: :keep, generated: true do
|
2023-05-02 09:05:49 +12:00
|
|
|
@impl unquote(worker)
|
2023-10-07 13:44:01 +13:00
|
|
|
def unquote(function_name)(%Oban.Job{args: %{"primary_key" => primary_key} = args} = job) do
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered for primary key #{inspect(primary_key)}",
|
|
|
|
unquote(trigger.debug?)
|
|
|
|
)
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
query()
|
|
|
|
|> Ash.Query.do_filter(primary_key)
|
|
|
|
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|
2023-07-20 06:08:08 +12:00
|
|
|
|> Ash.Query.for_read(unquote(read_action))
|
2023-05-02 09:05:49 +12:00
|
|
|
|> unquote(api).read_one()
|
|
|
|
|> case do
|
|
|
|
{:ok, nil} ->
|
2023-09-20 04:32:50 +12:00
|
|
|
AshOban.debug(
|
|
|
|
"Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}",
|
|
|
|
unquote(trigger.debug?)
|
|
|
|
)
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
{:discard, :trigger_no_longer_applies}
|
|
|
|
|
|
|
|
{:ok, record} ->
|
2023-10-07 13:49:50 +13:00
|
|
|
args =
|
|
|
|
if unquote(is_nil(trigger.read_metadata)) do
|
|
|
|
%{}
|
|
|
|
else
|
|
|
|
%{metadata: args["metadata"]}
|
|
|
|
end
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
record
|
|
|
|
|> Ash.Changeset.new()
|
|
|
|
|> prepare(primary_key)
|
|
|
|
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|
2023-10-07 13:49:50 +13:00
|
|
|
|> Ash.Changeset.for_action(unquote(trigger.action), args)
|
2023-09-01 03:25:05 +12:00
|
|
|
|> AshOban.update_or_destroy(unquote(api))
|
2023-05-02 09:05:49 +12:00
|
|
|
|> case do
|
2023-09-01 03:25:05 +12:00
|
|
|
:ok ->
|
|
|
|
:ok
|
|
|
|
|
2023-05-02 09:05:49 +12:00
|
|
|
{:ok, result} ->
|
|
|
|
{:ok, result}
|
|
|
|
|
|
|
|
{:error, error} ->
|
|
|
|
raise Ash.Error.to_error_class(error)
|
|
|
|
end
|
|
|
|
|
|
|
|
# we don't have the record here, so we can't do the `on_error` behavior
|
|
|
|
other ->
|
|
|
|
other
|
2023-04-29 05:59:56 +12:00
|
|
|
end
|
2023-05-02 09:05:49 +12:00
|
|
|
rescue
|
|
|
|
error ->
|
2023-09-20 04:32:50 +12:00
|
|
|
error = Ash.Error.to_ash_error(error, __STACKTRACE__)
|
|
|
|
|
|
|
|
Logger.error("""
|
|
|
|
Error handler failed for #{inspect(unquote(resource))}: #{inspect(primary_key)}!
|
|
|
|
|
|
|
|
#{inspect(Exception.format(:error, error, AshOban.stacktrace(error)))}
|
|
|
|
""")
|
|
|
|
|
2023-08-05 09:22:53 +12:00
|
|
|
handle_error(job, error, primary_key, __STACKTRACE__)
|
2023-04-29 05:59:56 +12:00
|
|
|
end
|
2023-05-02 09:05:49 +12:00
|
|
|
end
|
2023-04-29 05:59:56 +12:00
|
|
|
end
|
2023-04-28 14:07:05 +12:00
|
|
|
end
|
|
|
|
end
|