ash_oban/lib/ash_oban.ex

351 lines
9.6 KiB
Elixir
Raw Normal View History

2023-04-22 16:46:04 +12:00
defmodule AshOban do
2023-04-22 16:58:26 +12:00
defmodule Trigger do
2023-04-28 14:36:29 +12:00
@moduledoc """
A configured trigger.
"""
2023-04-22 16:58:26 +12:00
@type t :: %__MODULE__{
2023-04-28 14:07:05 +12:00
name: atom,
2023-04-22 16:58:26 +12:00
action: atom,
2023-04-28 14:07:05 +12:00
read_action: atom,
queue: atom,
scheduler_cron: String.t(),
scheduler_queue: atom,
max_attempts: pos_integer(),
record_limit: pos_integer(),
2023-04-28 14:07:05 +12:00
max_scheduler_attempts: pos_integer(),
read_metadata: (Ash.Resource.record() -> map),
stream_batch_size: pos_integer(),
scheduler_priority: non_neg_integer(),
worker_priority: non_neg_integer(),
2023-04-28 14:07:05 +12:00
where: Ash.Expr.t(),
scheduler: module | nil,
2023-04-28 14:36:29 +12:00
state: :active | :paused | :deleted,
2023-04-28 14:07:05 +12:00
worker: module,
2023-04-29 05:59:56 +12:00
__identifier__: atom,
on_error: atom
2023-04-22 16:58:26 +12:00
}
2023-04-28 14:07:05 +12:00
defstruct [
:name,
:action,
:read_action,
:worker_read_action,
2023-04-28 14:07:05 +12:00
:queue,
:read_metadata,
2023-04-28 14:07:05 +12:00
:scheduler_cron,
:scheduler_queue,
:scheduler_priority,
:worker_priority,
2023-04-28 14:07:05 +12:00
:max_attempts,
:stream_batch_size,
2023-04-28 14:07:05 +12:00
:max_scheduler_attempts,
:record_limit,
2023-04-28 14:07:05 +12:00
:where,
2023-04-28 14:36:29 +12:00
:state,
2023-04-28 14:07:05 +12:00
:scheduler,
:worker,
2023-04-29 05:59:56 +12:00
:on_error,
2023-04-28 14:07:05 +12:00
:__identifier__
]
def transform(%{read_action: read_action, worker_read_action: nil} = trigger) do
{:ok, %{trigger | worker_read_action: read_action}}
end
def transform(other), do: {:ok, other}
2023-04-22 16:58:26 +12:00
end
@trigger %Spark.Dsl.Entity{
name: :trigger,
target: Trigger,
2023-04-28 14:07:05 +12:00
args: [:name],
identifier: :name,
2023-04-22 16:58:26 +12:00
imports: [Ash.Filter.TemplateHelpers],
transform: {Trigger, :transform, []},
2023-04-22 16:58:26 +12:00
schema: [
2023-04-28 14:07:05 +12:00
name: [
type: :atom,
doc: "A unique identifier for this trigger."
],
scheduler_queue: [
type: :atom,
doc:
"The queue to place the scheduler job in. The same queue as job is used by default (but with a priority of 1 so schedulers run first)."
2023-04-28 14:07:05 +12:00
],
scheduler_cron: [
type: {:or, [:string, {:literal, false}]},
2023-04-28 14:07:05 +12:00
default: "* * * * *",
doc: """
A crontab configuration for when the job should run. Defaults to once per minute (\"* * * * *\").
Use `false` to disable the scheduler entirely.
"""
],
stream_batch_size: [
type: :pos_integer,
2023-04-28 14:07:05 +12:00
doc:
"The batch size to pass when streaming records from using `Ash.Api.stream/2`. No batch size is passed if none is provided here, so the default is used."
2023-04-28 14:07:05 +12:00
],
queue: [
type: :atom,
doc: "The queue to place the worker job in. The trigger name is used by default."
],
record_limit: [
type: :pos_integer,
doc:
"If set, any given run of the scheduler will only ever schedule this many items maximum"
],
worker_priority: [
type: :non_neg_integer,
doc: "A number from 0 to 3, where 0 is the highest priority and 3 is the lowest.",
default: 2
],
scheduler_priority: [
type: :non_neg_integer,
doc: "A number from 0 to 3, where 0 is the highest priority and 3 is the lowest.",
default: 3
],
2023-04-28 14:07:05 +12:00
max_scheduler_attempts: [
type: :pos_integer,
default: 1,
doc: "How many times to attempt scheduling of the triggered action."
],
max_attempts: [
type: :pos_integer,
default: 1,
2023-04-29 05:59:56 +12:00
doc: """
How many times to attempt the job.
Keep in mind: after all of these attempts, the scheduler will likely just reschedule the job,
leading to infinite retries. To solve for this, configure an `on_error` action that will make
the trigger no longer apply to failed jobs.
"""
2023-04-28 14:07:05 +12:00
],
read_metadata: [
type: {:fun, 1},
doc: """
Takes a record, and returns additional data of records from the read action.
This metadata will be stored in the database and serialized to json before
being passed to the update action as an argument called `metadata`.
"""
],
2023-04-28 14:07:05 +12:00
state: [
type: {:one_of, [:active, :paused, :deleted]},
default: :active,
2023-04-28 14:07:05 +12:00
doc: """
Describes the state of the cron job.
See the getting started guide for an explanation on the need for this field.
The most important thing is that you *do not remove a trigger from a resource*.
Oban's cron jobs are persisted, meaning you will get repeated errors whenever the cron
job tries to fire.
"""
],
read_action: [
type: :atom,
doc: """
The read action to use when querying records. Defaults to the primary read.
This action *must* support keyset pagination.
"""
],
worker_read_action: [
type: :atom,
doc: """
The read action to use when fetching the individual records for the trigger.
This defaults to `read_action`, allowing us to discard records that are no longer relevant.
You may need to change this, and if so make sure your action handles the scenario where the
trigger is no longer relevant.
"""
],
2023-04-22 16:58:26 +12:00
action: [
type: :atom,
2023-04-28 14:36:29 +12:00
required: true,
2023-04-28 14:07:05 +12:00
doc:
"The action to be triggered. Defaults to the identifier of the resource plus the name of the trigger"
2023-04-22 16:58:26 +12:00
],
2023-04-22 17:35:21 +12:00
where: [
2023-04-22 16:58:26 +12:00
type: :any,
doc: "The filter expression to determine if something should be triggered"
2023-04-29 05:59:56 +12:00
],
on_error: [
type: :atom,
doc:
"An update action to call after the last attempt has failed. See the getting started guide for more."
2023-04-22 16:58:26 +12:00
]
]
}
@triggers %Spark.Dsl.Section{
name: :triggers,
entities: [@trigger]
}
@oban %Spark.Dsl.Section{
name: :oban,
2023-04-28 14:07:05 +12:00
schema: [
api: [
type: {:behaviour, Ash.Api},
doc: "The Api module to use when calling actions on this resource",
required: true
]
],
2023-04-22 16:58:26 +12:00
sections: [@triggers]
}
2023-04-28 14:07:05 +12:00
@sections [@oban]
@moduledoc """
Dsl documentation for AshOban
<!--- ash-hq-hide-start --> <!--- -->
## DSL Documentation
### Index
#{Spark.Dsl.Extension.doc_index(@sections)}
### Docs
#{Spark.Dsl.Extension.doc(@sections)}
<!--- ash-hq-hide-stop --> <!--- -->
"""
2023-04-22 17:35:21 +12:00
use Spark.Dsl.Extension,
sections: [@oban],
2023-04-28 14:07:05 +12:00
imports: [AshOban.Changes.BuiltinChanges],
transformers: [
AshOban.Transformers.SetDefaults,
AshOban.Transformers.DefineSchedulers
2023-04-22 17:35:21 +12:00
]
2023-04-28 14:07:05 +12:00
def schedule(resource, trigger) do
trigger =
case trigger do
%AshOban.Trigger{} ->
trigger
name when is_atom(name) ->
AshOban.Info.oban_trigger(resource, name)
end
%{}
|> trigger.scheduler.new()
|> Oban.insert!()
end
2023-04-29 05:59:56 +12:00
def run_trigger(%resource{} = record, trigger) do
trigger =
case trigger do
%AshOban.Trigger{} ->
trigger
name when is_atom(name) ->
AshOban.Info.oban_trigger(resource, name)
end
2023-04-29 06:01:22 +12:00
primary_key = Ash.Resource.Info.primary_key(resource)
2023-04-29 05:59:56 +12:00
%{primary_key: Map.take(record, primary_key)}
|> trigger.worker.new()
|> Oban.insert!()
end
@doc "Alters your oban configuration to include the required AshOban configuration."
def config(apis, base) do
2023-04-28 14:07:05 +12:00
pro? = AshOban.Info.pro?()
cron_plugin =
if pro? do
Oban.Pro.Plugins.DynamicCron
else
Oban.Plugins.Cron
2023-04-28 14:07:05 +12:00
end
if pro? && base[:engine] not in [Oban.Pro.Queue.SmartEngine, Oban.Pro.Engines.Smart] do
2023-04-28 14:07:05 +12:00
raise """
Expected oban engine to be Oban.Pro.Queue.SmartEngine or Oban.Pro.Engines.Smart, but got #{inspect(base[:engine])}.
2023-04-28 14:07:05 +12:00
This expectation is because you've set `config :ash_oban, pro?: true`.
"""
end
require_cron!(base, cron_plugin)
apis
|> Enum.flat_map(fn api ->
api
|> Ash.Api.Info.resources()
end)
|> Enum.uniq()
|> Enum.flat_map(fn resource ->
resource
|> AshOban.Info.oban_triggers()
|> Enum.filter(& &1.scheduler_cron)
2023-04-28 14:07:05 +12:00
|> Enum.map(&{resource, &1})
end)
|> Enum.reduce(base, fn {resource, trigger}, config ->
require_queues!(config, resource, trigger)
2023-04-28 14:07:05 +12:00
add_job(config, cron_plugin, resource, trigger)
end)
end
defp add_job(config, cron_plugin, _resource, trigger) do
Keyword.update!(config, :plugins, fn plugins ->
Enum.map(plugins, fn
{^cron_plugin, config} ->
opts =
case trigger.state do
:paused ->
[paused: true]
2023-04-28 14:07:05 +12:00
:deleted ->
[delete: true]
2023-04-28 14:07:05 +12:00
_ ->
[]
end
2023-04-28 14:07:05 +12:00
cron = {trigger.scheduler_cron, trigger.scheduler, opts}
{cron_plugin, Keyword.update(config, :crontab, [cron], &[cron | &1])}
other ->
other
2023-04-28 14:07:05 +12:00
end)
end)
end
defp require_queues!(config, resource, trigger) do
unless config[:queues][trigger.queue] do
raise """
Must configure the queue `:#{trigger.queue}`, required for
2023-04-28 14:07:05 +12:00
the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
unless config[:queues][trigger.scheduler_queue] do
raise """
Must configure the queue `:#{trigger.scheduler_queue}`, required for
2023-04-28 14:07:05 +12:00
the scheduler of the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
end
defp require_cron!(config, name) do
unless Enum.find(config[:plugins] || [], &match?({^name, _}, &1)) do
2023-04-28 14:07:05 +12:00
raise """
Must configure cron plugin #{name}.
See oban's documentation for more. AshOban will
add cron jobs to the configuration, but will not
add the basic configuration for you.
Configuration received:
#{inspect(config)}
2023-04-28 14:07:05 +12:00
"""
end
end
2023-04-22 16:46:04 +12:00
end