2023-12-08 07:24:16 +13:00
|
|
|
defmodule AshOban.Transformers.DefineActionWorkers do
|
|
|
|
# Define scheduler and worker modules.
|
|
|
|
@moduledoc false
|
|
|
|
|
|
|
|
use Spark.Dsl.Transformer
|
|
|
|
alias Spark.Dsl.Transformer
|
|
|
|
|
|
|
|
def after?(_), do: true
|
|
|
|
|
|
|
|
def transform(dsl) do
|
|
|
|
module = Transformer.get_persisted(dsl, :module)
|
|
|
|
|
|
|
|
dsl
|
|
|
|
|> AshOban.Info.oban_scheduled_actions()
|
|
|
|
|> Enum.reduce(dsl, fn scheduled_action, dsl ->
|
2023-12-13 03:44:35 +13:00
|
|
|
worker_module_name =
|
|
|
|
module_name(module, scheduled_action)
|
2023-12-08 07:24:16 +13:00
|
|
|
|
|
|
|
dsl
|
2023-12-13 03:44:35 +13:00
|
|
|
|> Transformer.async_compile(fn ->
|
|
|
|
define_worker(module, worker_module_name, scheduled_action, dsl)
|
|
|
|
end)
|
2023-12-08 07:24:16 +13:00
|
|
|
|> Transformer.replace_entity([:oban, :scheduled_actions], %{
|
|
|
|
scheduled_action
|
|
|
|
| worker: worker_module_name
|
|
|
|
})
|
|
|
|
end)
|
|
|
|
|> then(&{:ok, &1})
|
|
|
|
end
|
|
|
|
|
|
|
|
defp module_name(module, trigger) do
|
|
|
|
module
|
|
|
|
|> List.wrap()
|
|
|
|
|> Enum.concat(["AshOban", "ActionWorker"])
|
|
|
|
|> Enum.concat([Macro.camelize(to_string(trigger.name))])
|
|
|
|
|> Module.concat()
|
|
|
|
end
|
|
|
|
|
|
|
|
# sobelow_skip ["SQL.Query"]
|
|
|
|
defp define_worker(resource, worker_module_name, scheduled_action, dsl) do
|
|
|
|
api = AshOban.Info.oban_api!(dsl)
|
|
|
|
pro? = AshOban.Info.pro?()
|
|
|
|
|
|
|
|
function_name =
|
|
|
|
if pro? do
|
|
|
|
:process
|
|
|
|
else
|
|
|
|
:perform
|
|
|
|
end
|
|
|
|
|
|
|
|
worker =
|
|
|
|
if pro? do
|
|
|
|
Oban.Pro.Worker
|
|
|
|
else
|
|
|
|
Oban.Worker
|
|
|
|
end
|
|
|
|
|
|
|
|
Module.create(
|
|
|
|
worker_module_name,
|
|
|
|
quote location: :keep do
|
|
|
|
use unquote(worker),
|
|
|
|
priority: unquote(scheduled_action.priority),
|
|
|
|
max_attempts: unquote(scheduled_action.max_attempts),
|
|
|
|
queue: unquote(scheduled_action.queue),
|
|
|
|
unique: [
|
|
|
|
period: :infinity,
|
|
|
|
states: [
|
|
|
|
:available,
|
|
|
|
:retryable,
|
|
|
|
:scheduled
|
|
|
|
]
|
|
|
|
]
|
|
|
|
|
|
|
|
require Logger
|
|
|
|
@impl unquote(worker)
|
|
|
|
def unquote(function_name)(%Oban.Job{} = job) do
|
|
|
|
AshOban.debug(
|
|
|
|
"Scheduled action #{unquote(inspect(resource))}.#{unquote(scheduled_action.name)} triggered.",
|
|
|
|
unquote(scheduled_action.debug?)
|
|
|
|
)
|
|
|
|
|
2023-12-08 08:24:23 +13:00
|
|
|
input = unquote(Macro.escape(scheduled_action.action_input || %{}))
|
|
|
|
|
|
|
|
input =
|
|
|
|
if job.max_attempts == job.attempt do
|
|
|
|
Map.put(input, :last_oban_attempt?, true)
|
|
|
|
else
|
|
|
|
Map.put(input, :last_oban_attempt?, false)
|
|
|
|
end
|
|
|
|
|
2023-12-08 07:24:16 +13:00
|
|
|
unquote(resource)
|
|
|
|
|> Ash.ActionInput.for_action(
|
|
|
|
unquote(scheduled_action.action),
|
2023-12-08 08:24:23 +13:00
|
|
|
input
|
2023-12-08 07:24:16 +13:00
|
|
|
)
|
2024-02-15 08:27:58 +13:00
|
|
|
|> unquote(api).run_action()
|
2023-12-08 07:24:16 +13:00
|
|
|
end
|
|
|
|
end,
|
|
|
|
Macro.Env.location(__ENV__)
|
|
|
|
)
|
|
|
|
end
|
|
|
|
end
|