ash_oban/lib/ash_oban.ex

453 lines
12 KiB
Elixir

defmodule AshOban do
require Logger
defmodule Trigger do
@moduledoc """
A configured trigger.
"""
@type t :: %__MODULE__{
name: atom,
action: atom,
read_action: atom,
queue: atom,
scheduler_cron: String.t(),
scheduler_queue: atom,
action_input: map(),
max_attempts: pos_integer(),
record_limit: pos_integer(),
log_final_error?: boolean(),
log_errors?: boolean(),
debug?: boolean(),
max_scheduler_attempts: pos_integer(),
read_metadata: (Ash.Resource.record() -> map),
stream_batch_size: pos_integer(),
scheduler_priority: non_neg_integer(),
worker_priority: non_neg_integer(),
where: Ash.Expr.t(),
scheduler: module | nil,
state: :active | :paused | :deleted,
worker: module,
__identifier__: atom,
on_error: atom
}
defstruct [
:name,
:action,
:read_action,
:action_input,
:worker_read_action,
:queue,
:debug?,
:read_metadata,
:scheduler_cron,
:scheduler_queue,
:scheduler_priority,
:worker_priority,
:max_attempts,
:stream_batch_size,
:max_scheduler_attempts,
:record_limit,
:where,
:state,
:scheduler,
:worker,
:on_error,
:log_final_error?,
:log_errors?,
:__identifier__
]
def transform(%{read_action: read_action, worker_read_action: nil} = trigger) do
{:ok, %{trigger | worker_read_action: read_action}}
end
def transform(other), do: {:ok, other}
end
@trigger %Spark.Dsl.Entity{
name: :trigger,
target: Trigger,
args: [:name],
identifier: :name,
imports: [Ash.Filter.TemplateHelpers],
transform: {Trigger, :transform, []},
examples: [
"""
trigger :process do
action :process
where expr(processed != true)
worker_read_action(:read)
end
"""
],
schema: [
name: [
type: :atom,
doc: "A unique identifier for this trigger."
],
action_input: [
type: :map,
doc:
"Static inputs to supply to the update/destroy action when it is called. Any metadata produced by `read_metadata` will overwrite these values."
],
scheduler_queue: [
type: :atom,
doc:
"The queue to place the scheduler job in. The same queue as job is used by default (but with a priority of 1 so schedulers run first)."
],
debug?: [
type: :boolean,
default: false,
doc:
"If set to `true`, detailed debug logging will be enabled for this trigger. You can also set `config :ash_oban, debug_all_triggers?: true` to enable debug logging for all triggers."
],
scheduler_cron: [
type: {:or, [:string, {:literal, false}]},
default: "* * * * *",
doc: """
A crontab configuration for when the job should run. Defaults to once per minute (\"* * * * *\"). Use `false` to disable the scheduler entirely.
"""
],
stream_batch_size: [
type: :pos_integer,
doc:
"The batch size to pass when streaming records from using `c:Ash.Api.stream!/2`. No batch size is passed if none is provided here, so the default is used."
],
queue: [
type: :atom,
doc: "The queue to place the worker job in. The trigger name is used by default."
],
record_limit: [
type: :pos_integer,
doc:
"If set, any given run of the scheduler will only ever schedule this many items maximum"
],
log_errors?: [
type: :boolean,
default: true,
doc: "Whether or not to log errors that occur when performing an action."
],
log_final_error?: [
type: :boolean,
default: true,
doc:
"If true, logs that an error occurred on the final attempt to perform an action even if `log_errors?` is set to false."
],
worker_priority: [
type: :non_neg_integer,
doc: "A number from 0 to 3, where 0 is the highest priority and 3 is the lowest.",
default: 2
],
scheduler_priority: [
type: :non_neg_integer,
doc: "A number from 0 to 3, where 0 is the highest priority and 3 is the lowest.",
default: 3
],
max_scheduler_attempts: [
type: :pos_integer,
default: 1,
doc: "How many times to attempt scheduling of the triggered action."
],
max_attempts: [
type: :pos_integer,
default: 1,
doc: """
How many times to attempt the job. After all attempts have been exhausted, the scheduler may just reschedule it. Use the `on_error` action to update the record to make the scheduler no longer apply.
"""
],
read_metadata: [
type: {:fun, 1},
doc: """
Takes a record, and returns metadata to be given to the update action as an argument called `metadata`.
"""
],
state: [
type: {:one_of, [:active, :paused, :deleted]},
default: :active,
doc: """
Describes the state of the cron job. See the getting started guide for more information. The most important thing is that you *do not remove a trigger from a resource if you are using oban pro*.
"""
],
read_action: [
type: :atom,
doc: """
The read action to use when querying records. Defaults to the primary read. This action *must* support keyset pagination.
"""
],
worker_read_action: [
type: :atom,
doc: """
The read action to use when fetching the individual records for the trigger. Defaults to `read_action`. If you customize this, ensure your action handles scenarios where the trigger is no longer relevant.
"""
],
action: [
type: :atom,
required: true,
doc:
"The action to be triggered. Defaults to the identifier of the resource plus the name of the trigger"
],
where: [
type: :any,
doc: "The filter expression to determine if something should be triggered"
],
on_error: [
type: :atom,
doc:
"An update action to call after the last attempt has failed. See the getting started guide for more."
]
]
}
@triggers %Spark.Dsl.Section{
name: :triggers,
entities: [@trigger],
examples: [
"""
triggers do
trigger :process do
action :process
where expr(processed != true)
worker_read_action(:read)
end
end
"""
]
}
@oban %Spark.Dsl.Section{
name: :oban,
examples: [
"""
oban do
api AshOban.Test.Api
triggers do
trigger :process do
action :process
where expr(processed != true)
worker_read_action(:read)
end
end
end
"""
],
schema: [
api: [
type: {:behaviour, Ash.Api},
doc: "The Api module to use when calling actions on this resource",
required: true
]
],
sections: [@triggers]
}
@sections [@oban]
@moduledoc """
Tools for working with AshOban triggers.
"""
use Spark.Dsl.Extension,
sections: @sections,
imports: [AshOban.Changes.BuiltinChanges],
transformers: [
AshOban.Transformers.SetDefaults,
AshOban.Transformers.DefineSchedulers
]
def schedule(resource, trigger) do
trigger =
case trigger do
%AshOban.Trigger{} ->
trigger
name when is_atom(name) ->
AshOban.Info.oban_trigger(resource, name)
end
%{}
|> trigger.scheduler.new()
|> Oban.insert!()
end
def run_trigger(%resource{} = record, trigger, oban_job_opts \\ []) do
trigger =
case trigger do
%AshOban.Trigger{} ->
trigger
name when is_atom(name) ->
AshOban.Info.oban_trigger(resource, name)
end
primary_key = Ash.Resource.Info.primary_key(resource)
metadata =
case trigger do
%{read_metadata: read_metadata} when is_function(read_metadata) ->
read_metadata.(record)
_ ->
%{}
end
%{primary_key: Map.take(record, primary_key), metadata: metadata}
|> trigger.worker.new(oban_job_opts)
|> Oban.insert!()
end
@config_schema [
require?: [
type: :boolean,
default: true,
doc: """
Whether to require queues and plugins to be defined in your oban config. This can be helpful to
allow the ability to split queues between nodes. See https://hexdocs.pm/oban/splitting-queues.html
"""
]
]
@doc """
Alters your oban configuration to include the required AshOban configuration.
# Options
#{Spark.OptionsHelpers.docs(@config_schema)}
"""
def config(apis, base, opts \\ []) do
opts = Spark.OptionsHelpers.validate!(opts, @config_schema)
pro? = AshOban.Info.pro?()
cron_plugin =
if pro? do
Oban.Pro.Plugins.DynamicCron
else
Oban.Plugins.Cron
end
if pro? && base[:engine] not in [Oban.Pro.Queue.SmartEngine, Oban.Pro.Engines.Smart] do
raise """
Expected oban engine to be Oban.Pro.Queue.SmartEngine or Oban.Pro.Engines.Smart, but got #{inspect(base[:engine])}.
This expectation is because you've set `config :ash_oban, pro?: true`.
"""
end
apis
|> Enum.flat_map(fn api ->
api
|> Ash.Api.Info.resources()
end)
|> Enum.uniq()
|> Enum.flat_map(fn resource ->
resource
|> AshOban.Info.oban_triggers()
|> tap(fn triggers ->
if opts[:require?] do
Enum.each(triggers, &require_queues!(base, resource, &1))
end
end)
|> Enum.filter(& &1.scheduler_cron)
|> Enum.map(&{resource, &1})
end)
|> case do
[] ->
base
resources_and_triggers ->
if opts[:require?] do
require_cron!(base, cron_plugin)
end
Enum.reduce(resources_and_triggers, base, fn {resource, trigger}, config ->
add_job(config, cron_plugin, resource, trigger)
end)
end
end
defp add_job(config, cron_plugin, _resource, trigger) do
Keyword.update!(config, :plugins, fn plugins ->
Enum.map(plugins, fn
{^cron_plugin, config} ->
opts =
case trigger.state do
:paused ->
[events: [paused: true]]
:deleted ->
[events: [delete: true]]
_ ->
[]
end
cron = {trigger.scheduler_cron, trigger.scheduler, opts}
{cron_plugin, Keyword.update(config, :crontab, [cron], &[cron | &1])}
other ->
other
end)
end)
end
defp require_queues!(config, resource, trigger) do
unless config[:queues][trigger.queue] do
raise """
Must configure the queue `:#{trigger.queue}`, required for
the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
unless config[:queues][trigger.scheduler_queue] do
raise """
Must configure the queue `:#{trigger.scheduler_queue}`, required for
the scheduler of the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
end
defp require_cron!(config, name) do
unless Enum.find(config[:plugins] || [], &match?({^name, _}, &1)) do
raise """
Must configure cron plugin #{name}.
See oban's documentation for more. AshOban will
add cron jobs to the configuration, but will not
add the basic configuration for you.
Configuration received:
#{inspect(config)}
"""
end
end
@doc false
def update_or_destroy(changeset, api) do
if changeset.action.type == :update do
api.update(changeset)
else
api.destroy(changeset)
end
end
@doc false
def debug(message, true) do
Logger.debug(message)
end
def debug(message, false) do
if Application.get_env(:ash_oban, :debug_all_triggers?) do
Logger.debug(message)
else
:ok
end
end
def stacktrace(%{stacktrace: %{stacktrace: stacktrace}}) when not is_nil(stacktrace) do
stacktrace
end
def stacktrace(_), do: nil
end