2023-04-22 16:46:04 +12:00
defmodule AshOban do
2023-04-22 16:58:26 +12:00
defmodule Trigger do
2023-04-28 14:36:29 +12:00
@moduledoc """
A configured trigger .
"""
2023-04-22 16:58:26 +12:00
@type t :: % __MODULE__ {
2023-04-28 14:07:05 +12:00
name : atom ,
2023-04-22 16:58:26 +12:00
action : atom ,
2023-04-28 14:07:05 +12:00
read_action : atom ,
queue : atom ,
scheduler_cron : String . t ( ) ,
scheduler_queue : atom ,
max_attempts : pos_integer ( ) ,
2023-06-11 05:15:29 +12:00
record_limit : pos_integer ( ) ,
2023-04-28 14:07:05 +12:00
max_scheduler_attempts : pos_integer ( ) ,
2023-08-25 08:15:04 +12:00
read_metadata : ( Ash.Resource . record ( ) -> map ) ,
stream_batch_size : pos_integer ( ) ,
2023-08-24 10:34:02 +12:00
scheduler_priority : non_neg_integer ( ) ,
worker_priority : non_neg_integer ( ) ,
2023-04-28 14:07:05 +12:00
where : Ash.Expr . t ( ) ,
2023-08-25 08:15:04 +12:00
scheduler : module | nil ,
2023-04-28 14:36:29 +12:00
state : :active | :paused | :deleted ,
2023-04-28 14:07:05 +12:00
worker : module ,
2023-04-29 05:59:56 +12:00
__identifier__ : atom ,
on_error : atom
2023-04-22 16:58:26 +12:00
}
2023-04-28 14:07:05 +12:00
defstruct [
:name ,
:action ,
:read_action ,
2023-07-20 06:18:15 +12:00
:worker_read_action ,
2023-04-28 14:07:05 +12:00
:queue ,
2023-08-25 08:15:04 +12:00
:read_metadata ,
2023-04-28 14:07:05 +12:00
:scheduler_cron ,
:scheduler_queue ,
2023-08-24 10:34:02 +12:00
:scheduler_priority ,
:worker_priority ,
2023-04-28 14:07:05 +12:00
:max_attempts ,
2023-08-25 08:15:04 +12:00
:stream_batch_size ,
2023-04-28 14:07:05 +12:00
:max_scheduler_attempts ,
2023-06-11 05:15:29 +12:00
:record_limit ,
2023-04-28 14:07:05 +12:00
:where ,
2023-04-28 14:36:29 +12:00
:state ,
2023-04-28 14:07:05 +12:00
:scheduler ,
:worker ,
2023-04-29 05:59:56 +12:00
:on_error ,
2023-04-28 14:07:05 +12:00
:__identifier__
]
2023-07-20 06:18:15 +12:00
def transform ( %{ read_action : read_action , worker_read_action : nil } = trigger ) do
{ :ok , %{ trigger | worker_read_action : read_action } }
end
def transform ( other ) , do : { :ok , other }
2023-04-22 16:58:26 +12:00
end
@trigger % Spark.Dsl.Entity {
name : :trigger ,
target : Trigger ,
2023-04-28 14:07:05 +12:00
args : [ :name ] ,
identifier : :name ,
2023-04-22 16:58:26 +12:00
imports : [ Ash.Filter.TemplateHelpers ] ,
2023-07-20 06:18:15 +12:00
transform : { Trigger , :transform , [ ] } ,
2023-04-22 16:58:26 +12:00
schema : [
2023-04-28 14:07:05 +12:00
name : [
type : :atom ,
doc : " A unique identifier for this trigger. "
] ,
scheduler_queue : [
type : :atom ,
doc :
2023-05-09 06:49:22 +12:00
" The queue to place the scheduler job in. The same queue as job is used by default (but with a priority of 1 so schedulers run first). "
2023-04-28 14:07:05 +12:00
] ,
scheduler_cron : [
2023-08-25 08:15:04 +12:00
type : { :or , [ :string , { :literal , false } ] } ,
2023-04-28 14:07:05 +12:00
default : " * * * * * " ,
2023-08-25 08:15:04 +12:00
doc : """
A crontab configuration for when the job should run . Defaults to once per minute ( \ " * * * * * \" ).
Use ` false ` to disable the scheduler entirely .
"""
] ,
stream_batch_size : [
type : :pos_integer ,
2023-04-28 14:07:05 +12:00
doc :
2023-08-25 08:15:04 +12:00
" The batch size to pass when streaming records from using `Ash.Api.stream/2`. No batch size is passed if none is provided here, so the default is used. "
2023-04-28 14:07:05 +12:00
] ,
queue : [
type : :atom ,
doc : " The queue to place the worker job in. The trigger name is used by default. "
] ,
2023-06-11 05:15:29 +12:00
record_limit : [
type : :pos_integer ,
doc :
" If set, any given run of the scheduler will only ever schedule this many items maximum "
] ,
2023-08-24 10:24:53 +12:00
worker_priority : [
2023-08-24 10:34:02 +12:00
type : :non_neg_integer ,
2023-08-24 10:24:53 +12:00
doc : " A number from 0 to 3, where 0 is the highest priority and 3 is the lowest. " ,
default : 2
] ,
scheduler_priority : [
2023-08-24 10:34:02 +12:00
type : :non_neg_integer ,
2023-08-24 10:24:53 +12:00
doc : " A number from 0 to 3, where 0 is the highest priority and 3 is the lowest. " ,
default : 3
] ,
2023-04-28 14:07:05 +12:00
max_scheduler_attempts : [
type : :pos_integer ,
default : 1 ,
doc : " How many times to attempt scheduling of the triggered action. "
] ,
max_attempts : [
type : :pos_integer ,
default : 1 ,
2023-04-29 05:59:56 +12:00
doc : """
How many times to attempt the job .
Keep in mind : after all of these attempts , the scheduler will likely just reschedule the job ,
leading to infinite retries . To solve for this , configure an ` on_error ` action that will make
the trigger no longer apply to failed jobs .
"""
2023-04-28 14:07:05 +12:00
] ,
2023-08-25 08:15:04 +12:00
read_metadata : [
type : { :fun , 1 } ,
doc : """
Takes a record , and returns additional data of records from the read action .
This metadata will be stored in the database and serialized to json before
being passed to the update action as an argument called ` metadata ` .
"""
] ,
2023-04-28 14:07:05 +12:00
state : [
type : { :one_of , [ :active , :paused , :deleted ] } ,
2023-04-28 17:39:57 +12:00
default : :active ,
2023-04-28 14:07:05 +12:00
doc : """
Describes the state of the cron job .
See the getting started guide for an explanation on the need for this field .
The most important thing is that you * do not remove a trigger from a resource * .
Oban ' s cron jobs are persisted, meaning you will get repeated errors whenever the cron
job tries to fire .
"""
] ,
read_action : [
type : :atom ,
doc : """
The read action to use when querying records . Defaults to the primary read .
This action * must * support keyset pagination .
"""
] ,
2023-07-20 06:18:15 +12:00
worker_read_action : [
type : :atom ,
doc : """
The read action to use when fetching the individual records for the trigger .
This defaults to ` read_action ` , allowing us to discard records that are no longer relevant .
You may need to change this , and if so make sure your action handles the scenario where the
trigger is no longer relevant .
"""
] ,
2023-04-22 16:58:26 +12:00
action : [
type : :atom ,
2023-04-28 14:36:29 +12:00
required : true ,
2023-04-28 14:07:05 +12:00
doc :
" The action to be triggered. Defaults to the identifier of the resource plus the name of the trigger "
2023-04-22 16:58:26 +12:00
] ,
2023-04-22 17:35:21 +12:00
where : [
2023-04-22 16:58:26 +12:00
type : :any ,
doc : " The filter expression to determine if something should be triggered "
2023-04-29 05:59:56 +12:00
] ,
on_error : [
type : :atom ,
doc :
" An update action to call after the last attempt has failed. See the getting started guide for more. "
2023-04-22 16:58:26 +12:00
]
]
}
@triggers % Spark.Dsl.Section {
name : :triggers ,
entities : [ @trigger ]
}
@oban % Spark.Dsl.Section {
name : :oban ,
2023-04-28 14:07:05 +12:00
schema : [
api : [
type : { :behaviour , Ash.Api } ,
doc : " The Api module to use when calling actions on this resource " ,
required : true
]
] ,
2023-04-22 16:58:26 +12:00
sections : [ @triggers ]
}
2023-04-28 14:07:05 +12:00
@sections [ @oban ]
@moduledoc """
Dsl documentation for AshOban
< ! -- - ash - hq - hide - start -- > < ! -- - -- >
## DSL Documentation
### Index
#{Spark.Dsl.Extension.doc_index(@sections)}
### Docs
#{Spark.Dsl.Extension.doc(@sections)}
< ! -- - ash - hq - hide - stop -- > < ! -- - -- >
"""
2023-04-22 17:35:21 +12:00
use Spark.Dsl.Extension ,
sections : [ @oban ] ,
2023-04-28 14:07:05 +12:00
imports : [ AshOban.Changes.BuiltinChanges ] ,
transformers : [
AshOban.Transformers.SetDefaults ,
AshOban.Transformers.DefineSchedulers
2023-04-22 17:35:21 +12:00
]
2023-04-28 14:07:05 +12:00
2023-08-05 08:40:21 +12:00
def schedule ( resource , trigger ) do
trigger =
case trigger do
% AshOban.Trigger { } ->
trigger
name when is_atom ( name ) ->
AshOban.Info . oban_trigger ( resource , name )
end
%{ }
|> trigger . scheduler . new ( )
|> Oban . insert! ( )
end
2023-09-03 23:01:28 +12:00
def run_trigger ( % resource { } = record , trigger , oban_job_opts \\ [ ] ) do
2023-04-29 05:59:56 +12:00
trigger =
case trigger do
% AshOban.Trigger { } ->
trigger
name when is_atom ( name ) ->
AshOban.Info . oban_trigger ( resource , name )
end
2023-04-29 06:01:22 +12:00
primary_key = Ash.Resource.Info . primary_key ( resource )
2023-04-29 05:59:56 +12:00
2023-08-25 09:10:29 +12:00
metadata =
case trigger do
%{ read_metadata : read_metadata } when is_function ( read_metadata ) ->
read_metadata . ( record )
_ ->
%{ }
end
%{ primary_key : Map . take ( record , primary_key ) , metadata : metadata }
2023-09-02 07:45:34 +12:00
|> trigger . worker . new ( oban_job_opts )
2023-04-29 05:59:56 +12:00
|> Oban . insert! ( )
end
@doc " Alters your oban configuration to include the required AshOban configuration. "
2023-04-28 15:57:33 +12:00
def config ( apis , base ) do
2023-04-28 14:07:05 +12:00
pro? = AshOban.Info . pro? ( )
cron_plugin =
if pro? do
Oban.Pro.Plugins.DynamicCron
else
2023-04-28 15:57:33 +12:00
Oban.Plugins.Cron
2023-04-28 14:07:05 +12:00
end
2023-07-26 14:37:30 +12:00
if pro? && base [ :engine ] not in [ Oban.Pro.Queue.SmartEngine , Oban.Pro.Engines.Smart ] do
2023-04-28 14:07:05 +12:00
raise """
2023-07-26 14:37:30 +12:00
Expected oban engine to be Oban.Pro.Queue.SmartEngine or Oban.Pro.Engines.Smart , but got #{inspect(base[:engine])}.
2023-04-28 14:07:05 +12:00
This expectation is because you ' ve set `config :ash_oban, pro?: true`.
"""
end
require_cron! ( base , cron_plugin )
apis
|> Enum . flat_map ( fn api ->
api
|> Ash.Api.Info . resources ( )
end )
|> Enum . uniq ( )
|> Enum . flat_map ( fn resource ->
resource
|> AshOban.Info . oban_triggers ( )
2023-08-25 08:15:04 +12:00
|> Enum . filter ( & &1 . scheduler_cron )
2023-04-28 14:07:05 +12:00
|> Enum . map ( & { resource , &1 } )
end )
|> Enum . reduce ( base , fn { resource , trigger } , config ->
require_queues! ( config , resource , trigger )
2023-08-25 08:15:04 +12:00
2023-04-28 14:07:05 +12:00
add_job ( config , cron_plugin , resource , trigger )
end )
end
defp add_job ( config , cron_plugin , _resource , trigger ) do
Keyword . update! ( config , :plugins , fn plugins ->
2023-04-28 17:39:57 +12:00
Enum . map ( plugins , fn
{ ^ cron_plugin , config } ->
opts =
case trigger . state do
:paused ->
[ paused : true ]
2023-04-28 14:07:05 +12:00
2023-04-28 17:39:57 +12:00
:deleted ->
[ delete : true ]
2023-04-28 14:07:05 +12:00
2023-04-28 17:39:57 +12:00
_ ->
[ ]
end
2023-04-28 14:07:05 +12:00
2023-04-28 17:39:57 +12:00
cron = { trigger . scheduler_cron , trigger . scheduler , opts }
{ cron_plugin , Keyword . update ( config , :crontab , [ cron ] , & [ cron | &1 ] ) }
other ->
other
2023-04-28 14:07:05 +12:00
end )
end )
end
defp require_queues! ( config , resource , trigger ) do
unless config [ :queues ] [ trigger . queue ] do
raise """
2023-04-28 15:57:33 +12:00
Must configure the queue ` : #{trigger.queue}`, required for
2023-04-28 14:07:05 +12:00
the trigger ` : #{trigger.name}` on #{inspect(resource)}
"""
end
unless config [ :queues ] [ trigger . scheduler_queue ] do
raise """
2023-04-28 15:57:33 +12:00
Must configure the queue ` : #{trigger.scheduler_queue}`, required for
2023-04-28 14:07:05 +12:00
the scheduler of the trigger ` : #{trigger.name}` on #{inspect(resource)}
"""
end
end
defp require_cron! ( config , name ) do
2023-04-28 15:57:33 +12:00
unless Enum . find ( config [ :plugins ] || [ ] , & match? ( { ^ name , _ } , &1 ) ) do
2023-04-28 14:07:05 +12:00
raise """
Must configure cron plugin #{name}.
See oban ' s documentation for more. AshOban will
add cron jobs to the configuration , but will not
add the basic configuration for you .
2023-04-28 15:57:33 +12:00
Configuration received :
#{inspect(config)}
2023-04-28 14:07:05 +12:00
"""
end
end
2023-09-01 03:25:05 +12:00
@doc false
def update_or_destroy ( changeset , api ) do
if changeset . action . type == :update do
api . update ( changeset )
else
api . destroy ( changeset )
end
end
2023-04-22 16:46:04 +12:00
end