diff --git a/lib/ash_oban.ex b/lib/ash_oban.ex index 38a8e87..ea3eeac 100644 --- a/lib/ash_oban.ex +++ b/lib/ash_oban.ex @@ -200,6 +200,97 @@ defmodule AshOban do ] } + defmodule Schedule do + @moduledoc """ + A configured scheduled action. + """ + + @type t :: %__MODULE__{ + name: atom, + action: atom, + cron: String.t(), + action_input: map(), + worker: module(), + queue: atom, + debug?: boolean, + priority: non_neg_integer() + } + + defstruct [ + :name, + :action, + :cron, + :debug, + :priority, + :action_input, + :queue, + :worker, + :debug?, + :__identifier__ + ] + end + + @schedule %Spark.Dsl.Entity{ + name: :schedule, + target: Schedule, + args: [:name, :cron], + identifier: :name, + schema: [ + name: [ + type: :atom, + doc: "A unique identifier for this scheduled action." + ], + action_input: [ + type: :map, + doc: "Inputs to supply to the action when it is called." + ], + action: [ + type: :atom, + doc: "The generic or create action to call when the schedule is triggered." + ], + queue: [ + type: :atom, + doc: + "The queue to place the job in. Defaults to the resources short name plus the name of the scheduled action (not the action name)." + ], + state: [ + type: {:one_of, [:active, :paused, :deleted]}, + default: :active, + doc: """ + Describes the state of the cron job. See the getting started guide for more information. The most important thing is that you *do not remove a scheduled action from a resource if you are using oban pro*. + """ + ], + max_attempts: [ + type: :pos_integer, + default: 1, + doc: """ + How many times to attempt the job. The action will receive a `last_oban_attempt?` argument on the last attempt, and you should handle errors accordingly. + """ + ], + debug?: [ + type: :boolean, + default: false, + doc: + "If set to `true`, detailed debug logging will be enabled for this trigger. You can also set `config :ash_oban, debug_all_triggers?: true` to enable debug logging for all triggers." + ] + ] + } + + @scheduled_actions %Spark.Dsl.Section{ + name: :scheduled_actions, + entities: [@schedule], + describe: """ + A section for configured scheduled actions. Supports generic and create actions. + """, + examples: [ + """ + scheduled_actions do + schedule :import, "0 */6 * * *", action: :import + end + """ + ] + } + @triggers %Spark.Dsl.Section{ name: :triggers, entities: [@trigger], @@ -240,7 +331,7 @@ defmodule AshOban do required: true ] ], - sections: [@triggers] + sections: [@triggers, @scheduled_actions] } @sections [@oban] @@ -254,7 +345,8 @@ defmodule AshOban do imports: [AshOban.Changes.BuiltinChanges], transformers: [ AshOban.Transformers.SetDefaults, - AshOban.Transformers.DefineSchedulers + AshOban.Transformers.DefineSchedulers, + AshOban.Transformers.DefineActionWorkers ] def schedule(resource, trigger) do diff --git a/lib/transformers/define_action_workers.ex b/lib/transformers/define_action_workers.ex new file mode 100644 index 0000000..399c969 --- /dev/null +++ b/lib/transformers/define_action_workers.ex @@ -0,0 +1,92 @@ +defmodule AshOban.Transformers.DefineActionWorkers do + # Define scheduler and worker modules. + @moduledoc false + + use Spark.Dsl.Transformer + alias Spark.Dsl.Transformer + + def after?(_), do: true + + def transform(dsl) do + module = Transformer.get_persisted(dsl, :module) + + dsl + |> AshOban.Info.oban_scheduled_actions() + |> Enum.reduce(dsl, fn scheduled_action, dsl -> + worker_module_name = module_name(module, scheduled_action) + + dsl + |> Transformer.replace_entity([:oban, :scheduled_actions], %{ + scheduled_action + | worker: worker_module_name + }) + |> Transformer.async_compile(fn -> + define_worker(module, worker_module_name, scheduled_action, dsl) + end) + end) + |> then(&{:ok, &1}) + end + + defp module_name(module, trigger) do + module + |> List.wrap() + |> Enum.concat(["AshOban", "ActionWorker"]) + |> Enum.concat([Macro.camelize(to_string(trigger.name))]) + |> Module.concat() + end + + # sobelow_skip ["SQL.Query"] + defp define_worker(resource, worker_module_name, scheduled_action, dsl) do + api = AshOban.Info.oban_api!(dsl) + pro? = AshOban.Info.pro?() + + function_name = + if pro? do + :process + else + :perform + end + + worker = + if pro? do + Oban.Pro.Worker + else + Oban.Worker + end + + Module.create( + worker_module_name, + quote location: :keep do + use unquote(worker), + priority: unquote(scheduled_action.priority), + max_attempts: unquote(scheduled_action.max_attempts), + queue: unquote(scheduled_action.queue), + unique: [ + period: :infinity, + states: [ + :available, + :retryable, + :scheduled + ] + ] + + require Logger + @impl unquote(worker) + def unquote(function_name)(%Oban.Job{} = job) do + AshOban.debug( + "Scheduled action #{unquote(inspect(resource))}.#{unquote(scheduled_action.name)} triggered.", + unquote(scheduled_action.debug?) + ) + + unquote(resource) + |> Ash.ActionInput.for_action( + unquote(scheduled_action.action), + unquote(scheduled_action.action_input || %{}) + ) + |> unquote(api).run_action!() + end + end, + Macro.Env.location(__ENV__) + ) + end +end diff --git a/lib/transformers/set_defaults.ex b/lib/transformers/set_defaults.ex index 29d5058..f65bb97 100644 --- a/lib/transformers/set_defaults.ex +++ b/lib/transformers/set_defaults.ex @@ -16,66 +16,113 @@ defmodule AshOban.Transformers.SetDefaults do {:ok, dsl - |> Transformer.get_entities([:oban, :triggers]) - |> Enum.reduce(dsl, fn trigger, dsl -> - read_action = - case trigger.read_action do - nil -> - Ash.Resource.Info.primary_action(dsl, :read) || - raise Spark.Error.DslError, - path: [ - :oban, - :triggers, - trigger.name, - :read_action - ], - module: module, - message: """ - No read action was configured for this trigger, and no primary read action exists - """ + |> set_trigger_defaults(module) + |> set_scheduled_action_defaults(module)} + end - read_action -> - Ash.Resource.Info.action(dsl, read_action) - end + defp set_scheduled_action_defaults(dsl, module) do + dsl + |> Transformer.get_entities([:oban, :scheduled_actions]) + |> Enum.reduce(dsl, fn scheduled_action, dsl -> + action_name = scheduled_action.action || scheduled_action.name - action_name = trigger.action || trigger.name + case Ash.Resource.Info.action(dsl, action_name) do + nil -> + key_name = + if scheduled_action.action do + :action + else + :name + end - unless Ash.Resource.Info.action(dsl, action_name) do - key_name = - if trigger.action do - :action - else - :name - end + raise Spark.Error.DslError, + path: [:oban, :scheduled_actions, scheduled_action.name, key_name], + module: module, + message: """ + No such action #{inspect(action_name)} on #{inspect(module)}. + """ - raise Spark.Error.DslError, - path: [:oban, :triggers, trigger.name, key_name], - module: module, - message: """ - No such action #{inspect(action_name)} on #{inspect(module)}. - """ - end + %{type: bad_type} when bad_type in [:update, :destroy] -> + raise Spark.Error.DslError, + path: [:oban, :scheduled_actions, scheduled_action.name], + module: module, + message: """ + Scheduled actions of type #{inspect(bad_type)} are not supported. + """ + end - unless read_action.pagination && read_action.pagination.keyset? do - raise Spark.Error.DslError, - path: [:oban, :triggers, trigger.name, :read_action], - module: module, - message: """ - The read action `:#{read_action.name}` must support keyset pagination in order to be - used by an AshOban trigger. - """ - end + queue = scheduled_action.queue || default_queue_name(dsl, scheduled_action) - queue = trigger.queue || default_queue_name(dsl, trigger) + Transformer.replace_entity(dsl, [:oban, :scheduled_actions], %{ + scheduled_action + | action: action_name, + queue: queue + }) + end) + end - Transformer.replace_entity(dsl, [:oban, :triggers], %{ - trigger - | read_action: read_action.name, - queue: queue, - scheduler_queue: trigger.scheduler_queue || queue, - action: trigger.action || trigger.name - }) - end)} + defp set_trigger_defaults(dsl, module) do + dsl + |> Transformer.get_entities([:oban, :triggers]) + |> Enum.reduce(dsl, fn trigger, dsl -> + read_action = + case trigger.read_action do + nil -> + Ash.Resource.Info.primary_action(dsl, :read) || + raise Spark.Error.DslError, + path: [ + :oban, + :triggers, + trigger.name, + :read_action + ], + module: module, + message: """ + No read action was configured for this trigger, and no primary read action exists + """ + + read_action -> + Ash.Resource.Info.action(dsl, read_action) + end + + action_name = trigger.action || trigger.name + + unless Ash.Resource.Info.action(dsl, action_name) do + key_name = + if trigger.action do + :action + else + :name + end + + raise Spark.Error.DslError, + path: [:oban, :triggers, trigger.name, key_name], + module: module, + message: """ + No such action #{inspect(action_name)} on #{inspect(module)}. + """ + end + + unless read_action.pagination && read_action.pagination.keyset? do + raise Spark.Error.DslError, + path: [:oban, :triggers, trigger.name, :read_action], + module: module, + message: """ + The read action `:#{read_action.name}` must support keyset pagination in order to be + used by an AshOban trigger. + """ + end + + queue = trigger.queue || default_queue_name(dsl, trigger) + + Transformer.replace_entity(dsl, [:oban, :triggers], %{ + trigger + | read_action: read_action.name, + queue: queue, + scheduler_queue: trigger.scheduler_queue || queue, + action: trigger.action || trigger.name + }) + end) end # sobelow_skip ["DOS.BinToAtom"]