ash_oban/lib/ash_oban.ex

374 lines
10 KiB
Elixir
Raw Normal View History

2023-04-22 16:46:04 +12:00
defmodule AshOban do
2023-04-22 16:58:26 +12:00
defmodule Trigger do
2023-04-28 14:36:29 +12:00
@moduledoc """
A configured trigger.
"""
2023-04-22 16:58:26 +12:00
@type t :: %__MODULE__{
2023-04-28 14:07:05 +12:00
name: atom,
2023-04-22 16:58:26 +12:00
action: atom,
2023-04-28 14:07:05 +12:00
read_action: atom,
queue: atom,
scheduler_cron: String.t(),
scheduler_queue: atom,
max_attempts: pos_integer(),
record_limit: pos_integer(),
2023-04-28 14:07:05 +12:00
max_scheduler_attempts: pos_integer(),
read_metadata: (Ash.Resource.record() -> map),
stream_batch_size: pos_integer(),
scheduler_priority: non_neg_integer(),
worker_priority: non_neg_integer(),
2023-04-28 14:07:05 +12:00
where: Ash.Expr.t(),
scheduler: module | nil,
2023-04-28 14:36:29 +12:00
state: :active | :paused | :deleted,
2023-04-28 14:07:05 +12:00
worker: module,
2023-04-29 05:59:56 +12:00
__identifier__: atom,
on_error: atom
2023-04-22 16:58:26 +12:00
}
2023-04-28 14:07:05 +12:00
defstruct [
:name,
:action,
:read_action,
:worker_read_action,
2023-04-28 14:07:05 +12:00
:queue,
:read_metadata,
2023-04-28 14:07:05 +12:00
:scheduler_cron,
:scheduler_queue,
:scheduler_priority,
:worker_priority,
2023-04-28 14:07:05 +12:00
:max_attempts,
:stream_batch_size,
2023-04-28 14:07:05 +12:00
:max_scheduler_attempts,
:record_limit,
2023-04-28 14:07:05 +12:00
:where,
2023-04-28 14:36:29 +12:00
:state,
2023-04-28 14:07:05 +12:00
:scheduler,
:worker,
2023-04-29 05:59:56 +12:00
:on_error,
2023-04-28 14:07:05 +12:00
:__identifier__
]
def transform(%{read_action: read_action, worker_read_action: nil} = trigger) do
{:ok, %{trigger | worker_read_action: read_action}}
end
def transform(other), do: {:ok, other}
2023-04-22 16:58:26 +12:00
end
@trigger %Spark.Dsl.Entity{
name: :trigger,
target: Trigger,
2023-04-28 14:07:05 +12:00
args: [:name],
identifier: :name,
2023-04-22 16:58:26 +12:00
imports: [Ash.Filter.TemplateHelpers],
transform: {Trigger, :transform, []},
examples: [
"""
trigger :process do
action :process
where expr(processed != true)
worker_read_action(:read)
end
"""
],
2023-04-22 16:58:26 +12:00
schema: [
2023-04-28 14:07:05 +12:00
name: [
type: :atom,
doc: "A unique identifier for this trigger."
],
scheduler_queue: [
type: :atom,
doc:
"The queue to place the scheduler job in. The same queue as job is used by default (but with a priority of 1 so schedulers run first)."
2023-04-28 14:07:05 +12:00
],
scheduler_cron: [
type: {:or, [:string, {:literal, false}]},
2023-04-28 14:07:05 +12:00
default: "* * * * *",
doc: """
A crontab configuration for when the job should run. Defaults to once per minute (\"* * * * *\"). Use `false` to disable the scheduler entirely.
"""
],
stream_batch_size: [
type: :pos_integer,
2023-04-28 14:07:05 +12:00
doc:
"The batch size to pass when streaming records from using `c:Ash.Api.stream!/2`. No batch size is passed if none is provided here, so the default is used."
2023-04-28 14:07:05 +12:00
],
queue: [
type: :atom,
doc: "The queue to place the worker job in. The trigger name is used by default."
],
record_limit: [
type: :pos_integer,
doc:
"If set, any given run of the scheduler will only ever schedule this many items maximum"
],
worker_priority: [
type: :non_neg_integer,
doc: "A number from 0 to 3, where 0 is the highest priority and 3 is the lowest.",
default: 2
],
scheduler_priority: [
type: :non_neg_integer,
doc: "A number from 0 to 3, where 0 is the highest priority and 3 is the lowest.",
default: 3
],
2023-04-28 14:07:05 +12:00
max_scheduler_attempts: [
type: :pos_integer,
default: 1,
doc: "How many times to attempt scheduling of the triggered action."
],
max_attempts: [
type: :pos_integer,
default: 1,
2023-04-29 05:59:56 +12:00
doc: """
How many times to attempt the job. After all attempts have been exhausted, the scheduler may just reschedule it. Use the `on_error` action to update the record to make the scheduler no longer apply.
2023-04-29 05:59:56 +12:00
"""
2023-04-28 14:07:05 +12:00
],
read_metadata: [
type: {:fun, 1},
doc: """
Takes a record, and returns metadata to be given to the update action as an argument called `metadata`.
"""
],
2023-04-28 14:07:05 +12:00
state: [
type: {:one_of, [:active, :paused, :deleted]},
default: :active,
2023-04-28 14:07:05 +12:00
doc: """
Describes the state of the cron job. See the getting started guide for more information. The most important thing is that you *do not remove a trigger from a resource if you are using oban pro*.
2023-04-28 14:07:05 +12:00
"""
],
read_action: [
type: :atom,
doc: """
The read action to use when querying records. Defaults to the primary read. This action *must* support keyset pagination.
2023-04-28 14:07:05 +12:00
"""
],
worker_read_action: [
type: :atom,
doc: """
The read action to use when fetching the individual records for the trigger. Defaults to `read_action`. If you customize this, ensure your action handles scenarios where the trigger is no longer relevant.
"""
],
2023-04-22 16:58:26 +12:00
action: [
type: :atom,
2023-04-28 14:36:29 +12:00
required: true,
2023-04-28 14:07:05 +12:00
doc:
"The action to be triggered. Defaults to the identifier of the resource plus the name of the trigger"
2023-04-22 16:58:26 +12:00
],
2023-04-22 17:35:21 +12:00
where: [
2023-04-22 16:58:26 +12:00
type: :any,
doc: "The filter expression to determine if something should be triggered"
2023-04-29 05:59:56 +12:00
],
on_error: [
type: :atom,
doc:
"An update action to call after the last attempt has failed. See the getting started guide for more."
2023-04-22 16:58:26 +12:00
]
]
}
@triggers %Spark.Dsl.Section{
name: :triggers,
entities: [@trigger],
examples: [
"""
triggers do
trigger :process do
action :process
where expr(processed != true)
worker_read_action(:read)
end
end
"""
]
2023-04-22 16:58:26 +12:00
}
@oban %Spark.Dsl.Section{
name: :oban,
examples: [
"""
oban do
api AshOban.Test.Api
triggers do
trigger :process do
action :process
where expr(processed != true)
worker_read_action(:read)
end
end
end
"""
],
2023-04-28 14:07:05 +12:00
schema: [
api: [
type: {:behaviour, Ash.Api},
doc: "The Api module to use when calling actions on this resource",
required: true
]
],
2023-04-22 16:58:26 +12:00
sections: [@triggers]
}
2023-04-28 14:07:05 +12:00
@sections [@oban]
@moduledoc """
Tools for working with AshOban triggers.
2023-04-28 14:07:05 +12:00
"""
2023-04-22 17:35:21 +12:00
use Spark.Dsl.Extension,
sections: @sections,
2023-04-28 14:07:05 +12:00
imports: [AshOban.Changes.BuiltinChanges],
transformers: [
AshOban.Transformers.SetDefaults,
AshOban.Transformers.DefineSchedulers
2023-04-22 17:35:21 +12:00
]
2023-04-28 14:07:05 +12:00
def schedule(resource, trigger) do
trigger =
case trigger do
%AshOban.Trigger{} ->
trigger
name when is_atom(name) ->
AshOban.Info.oban_trigger(resource, name)
end
%{}
|> trigger.scheduler.new()
|> Oban.insert!()
end
def run_trigger(%resource{} = record, trigger, oban_job_opts \\ []) do
2023-04-29 05:59:56 +12:00
trigger =
case trigger do
%AshOban.Trigger{} ->
trigger
name when is_atom(name) ->
AshOban.Info.oban_trigger(resource, name)
end
2023-04-29 06:01:22 +12:00
primary_key = Ash.Resource.Info.primary_key(resource)
2023-04-29 05:59:56 +12:00
metadata =
case trigger do
%{read_metadata: read_metadata} when is_function(read_metadata) ->
read_metadata.(record)
_ ->
%{}
end
%{primary_key: Map.take(record, primary_key), metadata: metadata}
|> trigger.worker.new(oban_job_opts)
2023-04-29 05:59:56 +12:00
|> Oban.insert!()
end
@doc "Alters your oban configuration to include the required AshOban configuration."
def config(apis, base) do
2023-04-28 14:07:05 +12:00
pro? = AshOban.Info.pro?()
cron_plugin =
if pro? do
Oban.Pro.Plugins.DynamicCron
else
Oban.Plugins.Cron
2023-04-28 14:07:05 +12:00
end
if pro? && base[:engine] not in [Oban.Pro.Queue.SmartEngine, Oban.Pro.Engines.Smart] do
2023-04-28 14:07:05 +12:00
raise """
Expected oban engine to be Oban.Pro.Queue.SmartEngine or Oban.Pro.Engines.Smart, but got #{inspect(base[:engine])}.
2023-04-28 14:07:05 +12:00
This expectation is because you've set `config :ash_oban, pro?: true`.
"""
end
require_cron!(base, cron_plugin)
apis
|> Enum.flat_map(fn api ->
api
|> Ash.Api.Info.resources()
end)
|> Enum.uniq()
|> Enum.flat_map(fn resource ->
resource
|> AshOban.Info.oban_triggers()
|> Enum.filter(& &1.scheduler_cron)
2023-04-28 14:07:05 +12:00
|> Enum.map(&{resource, &1})
end)
|> Enum.reduce(base, fn {resource, trigger}, config ->
require_queues!(config, resource, trigger)
2023-04-28 14:07:05 +12:00
add_job(config, cron_plugin, resource, trigger)
end)
end
defp add_job(config, cron_plugin, _resource, trigger) do
Keyword.update!(config, :plugins, fn plugins ->
Enum.map(plugins, fn
{^cron_plugin, config} ->
opts =
case trigger.state do
:paused ->
[paused: true]
2023-04-28 14:07:05 +12:00
:deleted ->
[delete: true]
2023-04-28 14:07:05 +12:00
_ ->
[]
end
2023-04-28 14:07:05 +12:00
cron = {trigger.scheduler_cron, trigger.scheduler, opts}
{cron_plugin, Keyword.update(config, :crontab, [cron], &[cron | &1])}
other ->
other
2023-04-28 14:07:05 +12:00
end)
end)
end
defp require_queues!(config, resource, trigger) do
unless config[:queues][trigger.queue] do
raise """
Must configure the queue `:#{trigger.queue}`, required for
2023-04-28 14:07:05 +12:00
the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
unless config[:queues][trigger.scheduler_queue] do
raise """
Must configure the queue `:#{trigger.scheduler_queue}`, required for
2023-04-28 14:07:05 +12:00
the scheduler of the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
end
defp require_cron!(config, name) do
unless Enum.find(config[:plugins] || [], &match?({^name, _}, &1)) do
2023-04-28 14:07:05 +12:00
raise """
Must configure cron plugin #{name}.
See oban's documentation for more. AshOban will
add cron jobs to the configuration, but will not
add the basic configuration for you.
Configuration received:
#{inspect(config)}
2023-04-28 14:07:05 +12:00
"""
end
end
@doc false
def update_or_destroy(changeset, api) do
if changeset.action.type == :update do
api.update(changeset)
else
api.destroy(changeset)
end
end
2023-04-22 16:46:04 +12:00
end