improvement: add scheduled_action for scheduling create/generic actions

This commit is contained in:
Zach Daniel 2023-12-07 13:24:16 -05:00
parent 870c4ff447
commit 1a957248ae
3 changed files with 286 additions and 55 deletions

View file

@ -200,6 +200,97 @@ defmodule AshOban do
]
}
defmodule Schedule do
@moduledoc """
A configured scheduled action.
"""
@type t :: %__MODULE__{
name: atom,
action: atom,
cron: String.t(),
action_input: map(),
worker: module(),
queue: atom,
debug?: boolean,
priority: non_neg_integer()
}
defstruct [
:name,
:action,
:cron,
:debug,
:priority,
:action_input,
:queue,
:worker,
:debug?,
:__identifier__
]
end
@schedule %Spark.Dsl.Entity{
name: :schedule,
target: Schedule,
args: [:name, :cron],
identifier: :name,
schema: [
name: [
type: :atom,
doc: "A unique identifier for this scheduled action."
],
action_input: [
type: :map,
doc: "Inputs to supply to the action when it is called."
],
action: [
type: :atom,
doc: "The generic or create action to call when the schedule is triggered."
],
queue: [
type: :atom,
doc:
"The queue to place the job in. Defaults to the resources short name plus the name of the scheduled action (not the action name)."
],
state: [
type: {:one_of, [:active, :paused, :deleted]},
default: :active,
doc: """
Describes the state of the cron job. See the getting started guide for more information. The most important thing is that you *do not remove a scheduled action from a resource if you are using oban pro*.
"""
],
max_attempts: [
type: :pos_integer,
default: 1,
doc: """
How many times to attempt the job. The action will receive a `last_oban_attempt?` argument on the last attempt, and you should handle errors accordingly.
"""
],
debug?: [
type: :boolean,
default: false,
doc:
"If set to `true`, detailed debug logging will be enabled for this trigger. You can also set `config :ash_oban, debug_all_triggers?: true` to enable debug logging for all triggers."
]
]
}
@scheduled_actions %Spark.Dsl.Section{
name: :scheduled_actions,
entities: [@schedule],
describe: """
A section for configured scheduled actions. Supports generic and create actions.
""",
examples: [
"""
scheduled_actions do
schedule :import, "0 */6 * * *", action: :import
end
"""
]
}
@triggers %Spark.Dsl.Section{
name: :triggers,
entities: [@trigger],
@ -240,7 +331,7 @@ defmodule AshOban do
required: true
]
],
sections: [@triggers]
sections: [@triggers, @scheduled_actions]
}
@sections [@oban]
@ -254,7 +345,8 @@ defmodule AshOban do
imports: [AshOban.Changes.BuiltinChanges],
transformers: [
AshOban.Transformers.SetDefaults,
AshOban.Transformers.DefineSchedulers
AshOban.Transformers.DefineSchedulers,
AshOban.Transformers.DefineActionWorkers
]
def schedule(resource, trigger) do

View file

@ -0,0 +1,92 @@
defmodule AshOban.Transformers.DefineActionWorkers do
# Define scheduler and worker modules.
@moduledoc false
use Spark.Dsl.Transformer
alias Spark.Dsl.Transformer
def after?(_), do: true
def transform(dsl) do
module = Transformer.get_persisted(dsl, :module)
dsl
|> AshOban.Info.oban_scheduled_actions()
|> Enum.reduce(dsl, fn scheduled_action, dsl ->
worker_module_name = module_name(module, scheduled_action)
dsl
|> Transformer.replace_entity([:oban, :scheduled_actions], %{
scheduled_action
| worker: worker_module_name
})
|> Transformer.async_compile(fn ->
define_worker(module, worker_module_name, scheduled_action, dsl)
end)
end)
|> then(&{:ok, &1})
end
defp module_name(module, trigger) do
module
|> List.wrap()
|> Enum.concat(["AshOban", "ActionWorker"])
|> Enum.concat([Macro.camelize(to_string(trigger.name))])
|> Module.concat()
end
# sobelow_skip ["SQL.Query"]
defp define_worker(resource, worker_module_name, scheduled_action, dsl) do
api = AshOban.Info.oban_api!(dsl)
pro? = AshOban.Info.pro?()
function_name =
if pro? do
:process
else
:perform
end
worker =
if pro? do
Oban.Pro.Worker
else
Oban.Worker
end
Module.create(
worker_module_name,
quote location: :keep do
use unquote(worker),
priority: unquote(scheduled_action.priority),
max_attempts: unquote(scheduled_action.max_attempts),
queue: unquote(scheduled_action.queue),
unique: [
period: :infinity,
states: [
:available,
:retryable,
:scheduled
]
]
require Logger
@impl unquote(worker)
def unquote(function_name)(%Oban.Job{} = job) do
AshOban.debug(
"Scheduled action #{unquote(inspect(resource))}.#{unquote(scheduled_action.name)} triggered.",
unquote(scheduled_action.debug?)
)
unquote(resource)
|> Ash.ActionInput.for_action(
unquote(scheduled_action.action),
unquote(scheduled_action.action_input || %{})
)
|> unquote(api).run_action!()
end
end,
Macro.Env.location(__ENV__)
)
end
end

View file

@ -16,66 +16,113 @@ defmodule AshOban.Transformers.SetDefaults do
{:ok,
dsl
|> Transformer.get_entities([:oban, :triggers])
|> Enum.reduce(dsl, fn trigger, dsl ->
read_action =
case trigger.read_action do
nil ->
Ash.Resource.Info.primary_action(dsl, :read) ||
raise Spark.Error.DslError,
path: [
:oban,
:triggers,
trigger.name,
:read_action
],
module: module,
message: """
No read action was configured for this trigger, and no primary read action exists
"""
|> set_trigger_defaults(module)
|> set_scheduled_action_defaults(module)}
end
read_action ->
Ash.Resource.Info.action(dsl, read_action)
end
defp set_scheduled_action_defaults(dsl, module) do
dsl
|> Transformer.get_entities([:oban, :scheduled_actions])
|> Enum.reduce(dsl, fn scheduled_action, dsl ->
action_name = scheduled_action.action || scheduled_action.name
action_name = trigger.action || trigger.name
case Ash.Resource.Info.action(dsl, action_name) do
nil ->
key_name =
if scheduled_action.action do
:action
else
:name
end
unless Ash.Resource.Info.action(dsl, action_name) do
key_name =
if trigger.action do
:action
else
:name
end
raise Spark.Error.DslError,
path: [:oban, :scheduled_actions, scheduled_action.name, key_name],
module: module,
message: """
No such action #{inspect(action_name)} on #{inspect(module)}.
"""
raise Spark.Error.DslError,
path: [:oban, :triggers, trigger.name, key_name],
module: module,
message: """
No such action #{inspect(action_name)} on #{inspect(module)}.
"""
end
%{type: bad_type} when bad_type in [:update, :destroy] ->
raise Spark.Error.DslError,
path: [:oban, :scheduled_actions, scheduled_action.name],
module: module,
message: """
Scheduled actions of type #{inspect(bad_type)} are not supported.
"""
end
unless read_action.pagination && read_action.pagination.keyset? do
raise Spark.Error.DslError,
path: [:oban, :triggers, trigger.name, :read_action],
module: module,
message: """
The read action `:#{read_action.name}` must support keyset pagination in order to be
used by an AshOban trigger.
"""
end
queue = scheduled_action.queue || default_queue_name(dsl, scheduled_action)
queue = trigger.queue || default_queue_name(dsl, trigger)
Transformer.replace_entity(dsl, [:oban, :scheduled_actions], %{
scheduled_action
| action: action_name,
queue: queue
})
end)
end
Transformer.replace_entity(dsl, [:oban, :triggers], %{
trigger
| read_action: read_action.name,
queue: queue,
scheduler_queue: trigger.scheduler_queue || queue,
action: trigger.action || trigger.name
})
end)}
defp set_trigger_defaults(dsl, module) do
dsl
|> Transformer.get_entities([:oban, :triggers])
|> Enum.reduce(dsl, fn trigger, dsl ->
read_action =
case trigger.read_action do
nil ->
Ash.Resource.Info.primary_action(dsl, :read) ||
raise Spark.Error.DslError,
path: [
:oban,
:triggers,
trigger.name,
:read_action
],
module: module,
message: """
No read action was configured for this trigger, and no primary read action exists
"""
read_action ->
Ash.Resource.Info.action(dsl, read_action)
end
action_name = trigger.action || trigger.name
unless Ash.Resource.Info.action(dsl, action_name) do
key_name =
if trigger.action do
:action
else
:name
end
raise Spark.Error.DslError,
path: [:oban, :triggers, trigger.name, key_name],
module: module,
message: """
No such action #{inspect(action_name)} on #{inspect(module)}.
"""
end
unless read_action.pagination && read_action.pagination.keyset? do
raise Spark.Error.DslError,
path: [:oban, :triggers, trigger.name, :read_action],
module: module,
message: """
The read action `:#{read_action.name}` must support keyset pagination in order to be
used by an AshOban trigger.
"""
end
queue = trigger.queue || default_queue_name(dsl, trigger)
Transformer.replace_entity(dsl, [:oban, :triggers], %{
trigger
| read_action: read_action.name,
queue: queue,
scheduler_queue: trigger.scheduler_queue || queue,
action: trigger.action || trigger.name
})
end)
end
# sobelow_skip ["DOS.BinToAtom"]