2023-04-22 16:46:04 +12:00
defmodule AshOban do
2023-09-20 04:32:50 +12:00
require Logger
2024-02-27 04:31:28 +13:00
@pro Application . compile_env ( :ash_oban , :pro? ) || false
2023-04-22 16:58:26 +12:00
defmodule Trigger do
2023-04-28 14:36:29 +12:00
@moduledoc """
A configured trigger .
"""
2023-04-22 16:58:26 +12:00
@type t :: % __MODULE__ {
2023-04-28 14:07:05 +12:00
name : atom ,
2023-04-22 16:58:26 +12:00
action : atom ,
2023-04-28 14:07:05 +12:00
read_action : atom ,
queue : atom ,
scheduler_cron : String . t ( ) ,
scheduler_queue : atom ,
2023-10-28 08:38:14 +13:00
action_input : map ( ) ,
2023-04-28 14:07:05 +12:00
max_attempts : pos_integer ( ) ,
2023-06-11 05:15:29 +12:00
record_limit : pos_integer ( ) ,
2023-12-05 12:18:11 +13:00
log_final_error? : boolean ( ) ,
2023-12-06 05:28:38 +13:00
log_errors? : boolean ( ) ,
2023-09-20 04:32:50 +12:00
debug? : boolean ( ) ,
2023-04-28 14:07:05 +12:00
max_scheduler_attempts : pos_integer ( ) ,
2023-08-25 08:15:04 +12:00
read_metadata : ( Ash.Resource . record ( ) -> map ) ,
stream_batch_size : pos_integer ( ) ,
2023-08-24 10:34:02 +12:00
scheduler_priority : non_neg_integer ( ) ,
worker_priority : non_neg_integer ( ) ,
2023-04-28 14:07:05 +12:00
where : Ash.Expr . t ( ) ,
2023-08-25 08:15:04 +12:00
scheduler : module | nil ,
2023-04-28 14:36:29 +12:00
state : :active | :paused | :deleted ,
2023-04-28 14:07:05 +12:00
worker : module ,
2023-04-29 05:59:56 +12:00
__identifier__ : atom ,
on_error : atom
2023-04-22 16:58:26 +12:00
}
2023-04-28 14:07:05 +12:00
defstruct [
:name ,
:action ,
:read_action ,
2023-10-28 08:38:14 +13:00
:action_input ,
2023-07-20 06:18:15 +12:00
:worker_read_action ,
2023-04-28 14:07:05 +12:00
:queue ,
2023-09-20 04:32:50 +12:00
:debug? ,
2023-08-25 08:15:04 +12:00
:read_metadata ,
2023-04-28 14:07:05 +12:00
:scheduler_cron ,
:scheduler_queue ,
2023-08-24 10:34:02 +12:00
:scheduler_priority ,
:worker_priority ,
2023-04-28 14:07:05 +12:00
:max_attempts ,
2023-08-25 08:15:04 +12:00
:stream_batch_size ,
2023-04-28 14:07:05 +12:00
:max_scheduler_attempts ,
2023-06-11 05:15:29 +12:00
:record_limit ,
2023-04-28 14:07:05 +12:00
:where ,
2023-04-28 14:36:29 +12:00
:state ,
2023-04-28 14:07:05 +12:00
:scheduler ,
:worker ,
2023-04-29 05:59:56 +12:00
:on_error ,
2023-12-05 12:18:11 +13:00
:log_final_error? ,
2023-12-06 05:28:38 +13:00
:log_errors? ,
2023-04-28 14:07:05 +12:00
:__identifier__
]
2023-07-20 06:18:15 +12:00
def transform ( %{ read_action : read_action , worker_read_action : nil } = trigger ) do
{ :ok , %{ trigger | worker_read_action : read_action } }
end
def transform ( other ) , do : { :ok , other }
2023-04-22 16:58:26 +12:00
end
@trigger % Spark.Dsl.Entity {
name : :trigger ,
target : Trigger ,
2023-04-28 14:07:05 +12:00
args : [ :name ] ,
identifier : :name ,
2023-04-22 16:58:26 +12:00
imports : [ Ash.Filter.TemplateHelpers ] ,
2023-07-20 06:18:15 +12:00
transform : { Trigger , :transform , [ ] } ,
2023-09-17 03:17:40 +12:00
examples : [
"""
trigger :process do
action :process
where expr ( processed != true )
worker_read_action ( :read )
end
"""
] ,
2023-04-22 16:58:26 +12:00
schema : [
2023-04-28 14:07:05 +12:00
name : [
type : :atom ,
doc : " A unique identifier for this trigger. "
] ,
2023-10-28 08:38:14 +13:00
action_input : [
type : :map ,
doc :
" Static inputs to supply to the update/destroy action when it is called. Any metadata produced by `read_metadata` will overwrite these values. "
] ,
2023-04-28 14:07:05 +12:00
scheduler_queue : [
type : :atom ,
doc :
2023-05-09 06:49:22 +12:00
" The queue to place the scheduler job in. The same queue as job is used by default (but with a priority of 1 so schedulers run first). "
2023-04-28 14:07:05 +12:00
] ,
2023-09-20 04:32:50 +12:00
debug? : [
type : :boolean ,
default : false ,
doc :
" If set to `true`, detailed debug logging will be enabled for this trigger. You can also set `config :ash_oban, debug_all_triggers?: true` to enable debug logging for all triggers. "
] ,
2023-04-28 14:07:05 +12:00
scheduler_cron : [
2023-08-25 08:15:04 +12:00
type : { :or , [ :string , { :literal , false } ] } ,
2023-04-28 14:07:05 +12:00
default : " * * * * * " ,
2023-08-25 08:15:04 +12:00
doc : """
2023-09-17 03:17:40 +12:00
A crontab configuration for when the job should run . Defaults to once per minute ( \ " * * * * * \" ). Use `false` to disable the scheduler entirely.
2023-08-25 08:15:04 +12:00
"""
] ,
stream_batch_size : [
type : :pos_integer ,
2023-04-28 14:07:05 +12:00
doc :
2023-09-17 03:17:40 +12:00
" The batch size to pass when streaming records from using `c:Ash.Api.stream!/2`. No batch size is passed if none is provided here, so the default is used. "
2023-04-28 14:07:05 +12:00
] ,
queue : [
type : :atom ,
doc : " The queue to place the worker job in. The trigger name is used by default. "
] ,
2023-06-11 05:15:29 +12:00
record_limit : [
type : :pos_integer ,
doc :
" If set, any given run of the scheduler will only ever schedule this many items maximum "
] ,
2023-12-06 05:28:38 +13:00
log_errors? : [
type : :boolean ,
default : true ,
doc : " Whether or not to log errors that occur when performing an action. "
] ,
2023-12-05 12:18:11 +13:00
log_final_error? : [
type : :boolean ,
default : true ,
2023-12-06 05:28:38 +13:00
doc :
" If true, logs that an error occurred on the final attempt to perform an action even if `log_errors?` is set to false. "
2023-12-05 12:18:11 +13:00
] ,
2023-08-24 10:24:53 +12:00
worker_priority : [
2023-08-24 10:34:02 +12:00
type : :non_neg_integer ,
2023-08-24 10:24:53 +12:00
doc : " A number from 0 to 3, where 0 is the highest priority and 3 is the lowest. " ,
default : 2
] ,
scheduler_priority : [
2023-08-24 10:34:02 +12:00
type : :non_neg_integer ,
2023-08-24 10:24:53 +12:00
doc : " A number from 0 to 3, where 0 is the highest priority and 3 is the lowest. " ,
default : 3
] ,
2023-04-28 14:07:05 +12:00
max_scheduler_attempts : [
type : :pos_integer ,
default : 1 ,
doc : " How many times to attempt scheduling of the triggered action. "
] ,
max_attempts : [
type : :pos_integer ,
default : 1 ,
2023-04-29 05:59:56 +12:00
doc : """
2023-09-17 03:17:40 +12:00
How many times to attempt the job . After all attempts have been exhausted , the scheduler may just reschedule it . Use the ` on_error ` action to update the record to make the scheduler no longer apply .
2023-04-29 05:59:56 +12:00
"""
2023-04-28 14:07:05 +12:00
] ,
2023-08-25 08:15:04 +12:00
read_metadata : [
type : { :fun , 1 } ,
doc : """
2023-09-17 03:17:40 +12:00
Takes a record , and returns metadata to be given to the update action as an argument called ` metadata ` .
2023-08-25 08:15:04 +12:00
"""
] ,
2023-04-28 14:07:05 +12:00
state : [
type : { :one_of , [ :active , :paused , :deleted ] } ,
2023-04-28 17:39:57 +12:00
default : :active ,
2023-04-28 14:07:05 +12:00
doc : """
2023-09-17 03:17:40 +12:00
Describes the state of the cron job . See the getting started guide for more information . The most important thing is that you * do not remove a trigger from a resource if you are using oban pro * .
2023-04-28 14:07:05 +12:00
"""
] ,
read_action : [
type : :atom ,
doc : """
2023-09-17 03:17:40 +12:00
The read action to use when querying records . Defaults to the primary read . This action * must * support keyset pagination .
2023-04-28 14:07:05 +12:00
"""
] ,
2023-07-20 06:18:15 +12:00
worker_read_action : [
type : :atom ,
doc : """
2023-09-17 03:17:40 +12:00
The read action to use when fetching the individual records for the trigger . Defaults to ` read_action ` . If you customize this , ensure your action handles scenarios where the trigger is no longer relevant .
2023-07-20 06:18:15 +12:00
"""
] ,
2023-04-22 16:58:26 +12:00
action : [
type : :atom ,
2023-04-28 14:36:29 +12:00
required : true ,
2023-04-28 14:07:05 +12:00
doc :
" The action to be triggered. Defaults to the identifier of the resource plus the name of the trigger "
2023-04-22 16:58:26 +12:00
] ,
2023-04-22 17:35:21 +12:00
where : [
2023-04-22 16:58:26 +12:00
type : :any ,
doc : " The filter expression to determine if something should be triggered "
2023-04-29 05:59:56 +12:00
] ,
on_error : [
type : :atom ,
doc :
" An update action to call after the last attempt has failed. See the getting started guide for more. "
2023-04-22 16:58:26 +12:00
]
]
}
2023-12-08 07:24:16 +13:00
defmodule Schedule do
@moduledoc """
A configured scheduled action .
"""
@type t :: % __MODULE__ {
name : atom ,
action : atom ,
cron : String . t ( ) ,
action_input : map ( ) ,
worker : module ( ) ,
2023-12-08 08:24:23 +13:00
max_attempts : non_neg_integer ( ) ,
2023-12-08 07:24:16 +13:00
queue : atom ,
debug? : boolean ,
2024-02-15 05:28:52 +13:00
state : :active | :paused | :deleted ,
2023-12-08 07:24:16 +13:00
priority : non_neg_integer ( )
}
defstruct [
:name ,
:action ,
:cron ,
:debug ,
:priority ,
:action_input ,
2023-12-08 08:24:23 +13:00
:max_attempts ,
2023-12-08 07:24:16 +13:00
:queue ,
:worker ,
:debug? ,
2024-02-15 05:28:52 +13:00
:state ,
2023-12-08 07:24:16 +13:00
:__identifier__
]
end
@schedule % Spark.Dsl.Entity {
name : :schedule ,
target : Schedule ,
args : [ :name , :cron ] ,
identifier : :name ,
schema : [
name : [
type : :atom ,
doc : " A unique identifier for this scheduled action. "
] ,
2023-12-08 07:29:49 +13:00
cron : [
type : :string ,
doc : " The schedule in crontab notation "
] ,
2023-12-08 07:24:16 +13:00
action_input : [
type : :map ,
doc : " Inputs to supply to the action when it is called. "
] ,
action : [
type : :atom ,
doc : " The generic or create action to call when the schedule is triggered. "
] ,
queue : [
type : :atom ,
doc :
" The queue to place the job in. Defaults to the resources short name plus the name of the scheduled action (not the action name). "
] ,
state : [
type : { :one_of , [ :active , :paused , :deleted ] } ,
default : :active ,
doc : """
Describes the state of the cron job . See the getting started guide for more information . The most important thing is that you * do not remove a scheduled action from a resource if you are using oban pro * .
"""
] ,
max_attempts : [
type : :pos_integer ,
default : 1 ,
doc : """
How many times to attempt the job . The action will receive a ` last_oban_attempt? ` argument on the last attempt , and you should handle errors accordingly .
"""
] ,
2023-12-08 08:24:23 +13:00
priority : [
type : :non_neg_integer ,
doc : " A number from 0 to 3, where 0 is the highest priority and 3 is the lowest. " ,
default : 3
] ,
2023-12-08 07:24:16 +13:00
debug? : [
type : :boolean ,
default : false ,
doc :
" If set to `true`, detailed debug logging will be enabled for this trigger. You can also set `config :ash_oban, debug_all_triggers?: true` to enable debug logging for all triggers. "
]
]
}
@scheduled_actions % Spark.Dsl.Section {
name : :scheduled_actions ,
entities : [ @schedule ] ,
describe : """
A section for configured scheduled actions . Supports generic and create actions .
""" ,
examples : [
"""
scheduled_actions do
schedule :import , " 0 */6 * * * " , action : :import
end
"""
]
}
2023-04-22 16:58:26 +12:00
@triggers % Spark.Dsl.Section {
name : :triggers ,
2023-09-17 03:17:40 +12:00
entities : [ @trigger ] ,
examples : [
"""
triggers do
trigger :process do
action :process
where expr ( processed != true )
worker_read_action ( :read )
end
end
"""
]
2023-04-22 16:58:26 +12:00
}
@oban % Spark.Dsl.Section {
name : :oban ,
2023-09-17 03:17:40 +12:00
examples : [
"""
oban do
api AshOban.Test.Api
triggers do
trigger :process do
action :process
where expr ( processed != true )
worker_read_action ( :read )
end
end
end
"""
] ,
2023-04-28 14:07:05 +12:00
schema : [
api : [
type : { :behaviour , Ash.Api } ,
doc : " The Api module to use when calling actions on this resource " ,
required : true
]
] ,
2023-12-08 07:24:16 +13:00
sections : [ @triggers , @scheduled_actions ]
2023-04-22 16:58:26 +12:00
}
2023-04-28 14:07:05 +12:00
@sections [ @oban ]
@moduledoc """
2023-09-17 03:17:40 +12:00
Tools for working with AshOban triggers .
2023-04-28 14:07:05 +12:00
"""
2023-04-22 17:35:21 +12:00
use Spark.Dsl.Extension ,
2023-09-17 03:17:40 +12:00
sections : @sections ,
2023-04-28 14:07:05 +12:00
imports : [ AshOban.Changes.BuiltinChanges ] ,
transformers : [
AshOban.Transformers.SetDefaults ,
2023-12-08 07:24:16 +13:00
AshOban.Transformers.DefineSchedulers ,
AshOban.Transformers.DefineActionWorkers
2023-04-22 17:35:21 +12:00
]
2023-04-28 14:07:05 +12:00
2023-12-13 03:40:03 +13:00
@type triggerable :: Ash.Resource . t ( ) | { Ash.Resource . t ( ) , atom ( ) } | Ash.Api . t ( ) | atom ( )
@type result :: %{
discard : non_neg_integer ( ) ,
cancelled : non_neg_integer ( ) ,
success : non_neg_integer ( ) ,
failure : non_neg_integer ( ) ,
2023-12-13 04:14:32 +13:00
snoozed : non_neg_integer ( ) ,
queues_not_drained : list ( atom )
2023-12-13 03:40:03 +13:00
}
2024-02-21 07:02:43 +13:00
@doc """
Schedules all relevant jobs for the provided trigger or scheduled action
## Options
` :actor ` - the actor to set on the job . Requires configuring an actor persister .
"""
def schedule ( resource , trigger , opts \\ [ ] ) do
2023-12-08 08:24:23 +13:00
case trigger do
% AshOban.Trigger { } ->
trigger
2023-08-05 08:40:21 +12:00
2023-12-08 08:24:23 +13:00
% AshOban.Schedule { } ->
trigger
2023-08-05 08:40:21 +12:00
2023-12-08 08:24:23 +13:00
name when is_atom ( name ) ->
AshOban.Info . oban_trigger ( resource , name ) ||
AshOban.Info . oban_scheduled_action ( resource , name )
end
|> case do
% AshOban.Schedule { worker : worker } ->
%{ }
2024-02-21 07:28:32 +13:00
|> store_actor ( opts [ :actor ] )
2023-12-08 08:24:23 +13:00
|> worker . new ( )
|> Oban . insert! ( )
% AshOban.Trigger { scheduler : scheduler } ->
%{ }
2024-02-21 07:28:32 +13:00
|> store_actor ( opts [ :actor ] )
2023-12-08 08:24:23 +13:00
|> scheduler . new ( )
|> Oban . insert! ( )
end
2023-08-05 08:40:21 +12:00
end
2024-02-21 07:28:32 +13:00
@spec authorize? :: boolean
def authorize? do
Application . get_env ( :ash_oban , :authorize? , true )
end
@spec store_actor ( args :: map , actor :: any ) :: any
def store_actor ( args , nil ) , do : args
2024-02-21 07:02:43 +13:00
2024-02-21 07:28:32 +13:00
def store_actor ( args , actor ) do
2024-02-21 07:02:43 +13:00
case Application . get_env ( :ash_oban , :actor_persister ) do
nil ->
args
persister ->
Map . put ( args , " actor " , persister . store ( actor ) )
end
end
@spec lookup_actor ( actor_json :: any ) :: any
def lookup_actor ( actor_json ) do
case Application . get_env ( :ash_oban , :actor_persister ) do
nil ->
{ :ok , nil }
persister ->
persister . lookup ( actor_json )
end
end
@doc """
Runs a specific trigger for the record provided .
## Options
- ` :actor ` - the actor to set on the job . Requires configuring an actor persister .
All other options are passed through to ` c :Oban.Worker . new / 2 `
"""
def run_trigger ( % resource { } = record , trigger , opts \\ [ ] ) do
{ opts , oban_job_opts } = Keyword . split ( opts , [ :actor ] )
2023-04-29 05:59:56 +12:00
trigger =
case trigger do
% AshOban.Trigger { } ->
trigger
name when is_atom ( name ) ->
AshOban.Info . oban_trigger ( resource , name )
end
2023-04-29 06:01:22 +12:00
primary_key = Ash.Resource.Info . primary_key ( resource )
2023-04-29 05:59:56 +12:00
2023-08-25 09:10:29 +12:00
metadata =
case trigger do
%{ read_metadata : read_metadata } when is_function ( read_metadata ) ->
read_metadata . ( record )
_ ->
%{ }
end
%{ primary_key : Map . take ( record , primary_key ) , metadata : metadata }
2024-02-21 07:28:32 +13:00
|> AshOban . store_actor ( opts [ :actor ] )
2023-09-02 07:45:34 +12:00
|> trigger . worker . new ( oban_job_opts )
2023-04-29 05:59:56 +12:00
|> Oban . insert! ( )
end
2023-11-14 10:00:08 +13:00
@config_schema [
require? : [
type : :boolean ,
default : true ,
doc : """
Whether to require queues and plugins to be defined in your oban config . This can be helpful to
allow the ability to split queues between nodes . See https :/ / hexdocs . pm / oban / splitting - queues . html
"""
]
]
@doc """
Alters your oban configuration to include the required AshOban configuration .
# Options
#{Spark.OptionsHelpers.docs(@config_schema)}
"""
def config ( apis , base , opts \\ [ ] ) do
2024-02-15 05:28:52 +13:00
apis = List . wrap ( apis )
2023-11-14 10:00:08 +13:00
opts = Spark.OptionsHelpers . validate! ( opts , @config_schema )
2023-04-28 14:07:05 +12:00
pro? = AshOban.Info . pro? ( )
cron_plugin =
if pro? do
Oban.Pro.Plugins.DynamicCron
else
2023-04-28 15:57:33 +12:00
Oban.Plugins.Cron
2023-04-28 14:07:05 +12:00
end
2023-07-26 14:37:30 +12:00
if pro? && base [ :engine ] not in [ Oban.Pro.Queue.SmartEngine , Oban.Pro.Engines.Smart ] do
2023-04-28 14:07:05 +12:00
raise """
2023-07-26 14:37:30 +12:00
Expected oban engine to be Oban.Pro.Queue.SmartEngine or Oban.Pro.Engines.Smart , but got #{inspect(base[:engine])}.
2023-04-28 14:07:05 +12:00
This expectation is because you ' ve set `config :ash_oban, pro?: true`.
"""
end
apis
|> Enum . flat_map ( fn api ->
api
|> Ash.Api.Info . resources ( )
end )
|> Enum . uniq ( )
|> Enum . flat_map ( fn resource ->
resource
2024-02-15 05:28:52 +13:00
|> AshOban.Info . oban_triggers_and_scheduled_actions ( )
2023-09-17 04:37:22 +12:00
|> tap ( fn triggers ->
2023-11-14 10:00:08 +13:00
if opts [ :require? ] do
Enum . each ( triggers , & require_queues! ( base , resource , &1 ) )
end
2023-09-17 04:37:22 +12:00
end )
2024-02-15 05:28:52 +13:00
|> Enum . filter ( fn
%{ scheduler_cron : scheduler_cron } ->
scheduler_cron
_ ->
true
end )
2023-04-28 14:07:05 +12:00
|> Enum . map ( & { resource , &1 } )
end )
2023-09-17 04:37:22 +12:00
|> case do
[ ] ->
base
2023-08-25 08:15:04 +12:00
2023-09-17 04:37:22 +12:00
resources_and_triggers ->
2023-11-14 10:00:08 +13:00
if opts [ :require? ] do
require_cron! ( base , cron_plugin )
end
2023-09-17 04:37:22 +12:00
Enum . reduce ( resources_and_triggers , base , fn { resource , trigger } , config ->
add_job ( config , cron_plugin , resource , trigger )
end )
end
2023-04-28 14:07:05 +12:00
end
defp add_job ( config , cron_plugin , _resource , trigger ) do
Keyword . update! ( config , :plugins , fn plugins ->
2023-04-28 17:39:57 +12:00
Enum . map ( plugins , fn
{ ^ cron_plugin , config } ->
2023-10-28 08:38:14 +13:00
opts =
2023-04-28 17:39:57 +12:00
case trigger . state do
:paused ->
2024-01-10 01:14:46 +13:00
[ paused : true ]
2023-04-28 14:07:05 +12:00
2023-04-28 17:39:57 +12:00
:deleted ->
2024-01-10 01:14:46 +13:00
[ delete : true ]
2023-04-28 14:07:05 +12:00
2023-04-28 17:39:57 +12:00
_ ->
[ ]
end
2023-04-28 14:07:05 +12:00
2024-02-15 05:28:52 +13:00
cron =
case trigger do
%{ scheduler_cron : scheduler_cron } ->
{ scheduler_cron , trigger . scheduler , opts }
%{ cron : cron } ->
{ cron , trigger . worker , opts }
end
2023-04-28 17:39:57 +12:00
{ cron_plugin , Keyword . update ( config , :crontab , [ cron ] , & [ cron | &1 ] ) }
other ->
other
2023-04-28 14:07:05 +12:00
end )
end )
end
defp require_queues! ( config , resource , trigger ) do
unless config [ :queues ] [ trigger . queue ] do
raise """
2023-04-28 15:57:33 +12:00
Must configure the queue ` : #{trigger.queue}`, required for
2023-04-28 14:07:05 +12:00
the trigger ` : #{trigger.name}` on #{inspect(resource)}
"""
end
2024-02-15 05:28:52 +13:00
if Map . has_key? ( trigger , :scheduler_queue ) do
unless config [ :queues ] [ trigger . scheduler_queue ] do
raise """
Must configure the queue ` : #{trigger.scheduler_queue}`, required for
the scheduler of the trigger ` : #{trigger.name}` on #{inspect(resource)}
"""
end
2023-04-28 14:07:05 +12:00
end
end
defp require_cron! ( config , name ) do
2023-04-28 15:57:33 +12:00
unless Enum . find ( config [ :plugins ] || [ ] , & match? ( { ^ name , _ } , &1 ) ) do
2024-02-15 05:28:52 +13:00
ideal =
if Keyword . keyword? ( config [ :plugins ] ) do
Keyword . update! ( config , :plugins , fn plugins ->
Keyword . put ( plugins , name , [ ] )
end )
end
ideal =
if ideal do
"""
Example :
#{inspect(ideal)}
"""
end
2023-04-28 14:07:05 +12:00
raise """
2024-02-15 05:28:52 +13:00
Must configure cron plugin #{inspect(name)}.
2023-04-28 14:07:05 +12:00
See oban ' s documentation for more. AshOban will
add cron jobs to the configuration , but will not
add the basic configuration for you .
2023-04-28 15:57:33 +12:00
Configuration received :
#{inspect(config)}
2024-02-15 05:28:52 +13:00
#{ideal}
2023-04-28 14:07:05 +12:00
"""
end
end
2023-09-01 03:25:05 +12:00
@doc false
def update_or_destroy ( changeset , api ) do
if changeset . action . type == :update do
api . update ( changeset )
else
api . destroy ( changeset )
end
end
2023-09-20 04:32:50 +12:00
@doc false
def debug ( message , true ) do
Logger . debug ( message )
end
def debug ( message , false ) do
if Application . get_env ( :ash_oban , :debug_all_triggers? ) do
Logger . debug ( message )
else
:ok
end
end
def stacktrace ( %{ stacktrace : %{ stacktrace : stacktrace } } ) when not is_nil ( stacktrace ) do
stacktrace
end
def stacktrace ( _ ) , do : nil
2023-12-13 03:40:03 +13:00
@doc """
Runs the schedulers for the given resource , api , or otp_app , or list of resources , apis , or otp_apps .
Options :
2023-12-13 04:14:32 +13:00
- ` drain_queues? ` - Defaults to false , drains the queues after scheduling . This is primarily for testing
- ` queue ` , ` with_limit ` , ` with_recursion ` , ` with_safety ` , ` with_scheduled ` - passed through to ` Oban . drain_queue / 2 ` , if it is called
2023-12-13 03:40:03 +13:00
- ` scheduled_actions? ` - Defaults to false , unless a scheduled action name was explicitly provided . Schedules all applicable scheduled actions .
- ` triggers? ` - Defaults to true , schedules all applicable scheduled actions .
2024-02-21 07:02:43 +13:00
- ` actor ` - The actor to schedule and run the triggers with
2024-02-28 03:42:27 +13:00
- ` oban ` - The oban module to use . Defaults to ` Oban `
2023-12-13 03:40:03 +13:00
If the input is :
* a list - each item is passed into ` schedule_and_run_triggers / 1 ` , and the results are merged together .
* an otp_app - each api configured in the ` ash_apis ` of that otp_app is passed into ` schedule_and_run_triggers / 1 ` , and the results are merged together .
* an api - each reosurce configured in that api is passed into ` schedule_and_run_triggers / 1 ` , and the results are merged together .
* a tuple of { resource , :trigger_name } - that trigger is scheduled , and the results are merged together .
* a resource - each trigger configured in that resource is scheduled , and the results are merged together .
"""
@spec schedule_and_run_triggers ( triggerable | list ( triggerable ) , keyword ( ) ) :: result
def schedule_and_run_triggers ( resources_or_apis_or_otp_apps , opts \\ [ ] ) do
opts =
opts
|> Keyword . put_new ( :scheduled_actions? , false )
|> Keyword . put_new ( :triggers? , true )
2023-12-20 14:52:52 +13:00
|> Keyword . put_new ( :drain_queues? , false )
2024-02-28 03:42:27 +13:00
|> Keyword . put_new ( :oban , Oban )
2023-12-13 03:40:03 +13:00
do_schedule_and_run_triggers ( resources_or_apis_or_otp_apps , opts )
end
def do_schedule_and_run_triggers ( resources_or_apis_or_otp_apps , opts )
when is_list ( resources_or_apis_or_otp_apps ) do
2024-02-21 07:02:43 +13:00
Enum . reduce ( resources_or_apis_or_otp_apps , default_acc ( ) , fn item , acc ->
2023-12-13 03:40:03 +13:00
item
|> do_schedule_and_run_triggers ( opts )
2023-12-13 04:14:32 +13:00
|> merge_results ( acc )
2023-12-13 03:40:03 +13:00
end )
end
def do_schedule_and_run_triggers ( { resource , trigger_name } , opts ) do
triggers =
resource
|> AshOban.Info . oban_triggers_and_scheduled_actions ( )
|> Enum . filter ( fn
% AshOban.Schedule { name : name } ->
name == trigger_name
trigger ->
trigger . scheduler && trigger . name == trigger_name
end )
Enum . each ( triggers , fn trigger ->
2024-02-21 07:02:43 +13:00
AshOban . schedule ( resource , trigger , actor : opts [ :actor ] )
2023-12-13 03:40:03 +13:00
end )
queues =
triggers
|> Enum . map ( & &1 . queue )
|> Enum . uniq ( )
# we drain each queue twice to do schedulers and then workers
2024-02-21 07:14:02 +13:00
drain_queues ( queues , opts )
2023-12-13 03:40:03 +13:00
end
def do_schedule_and_run_triggers ( resource_or_api_or_otp_app , opts ) do
cond do
Spark.Dsl . is? ( resource_or_api_or_otp_app , Ash.Api ) ->
resource_or_api_or_otp_app
|> Ash.Api.Info . resources ( )
|> Enum . reduce ( %{ } , fn resource , acc ->
resource
|> do_schedule_and_run_triggers ( opts )
2023-12-13 04:14:32 +13:00
|> merge_results ( acc )
2023-12-13 03:40:03 +13:00
end )
Spark.Dsl . is? ( resource_or_api_or_otp_app , Ash.Resource ) ->
triggers =
resource_or_api_or_otp_app
|> AshOban.Info . oban_triggers_and_scheduled_actions ( )
|> Enum . filter ( fn
% AshOban.Schedule { } ->
opts [ :scheduled_actions? ] && true
trigger ->
trigger . scheduler
end )
Enum . each ( triggers , fn trigger ->
2024-02-21 07:02:43 +13:00
AshOban . schedule ( resource_or_api_or_otp_app , trigger , actor : opts [ :actor ] )
2023-12-13 03:40:03 +13:00
end )
queues =
triggers
|> Enum . map ( & &1 . queue )
|> Enum . uniq ( )
# we drain each queue twice to do schedulers and then workers
2024-02-21 07:14:02 +13:00
drain_queues ( queues , opts )
2023-12-13 03:40:03 +13:00
true ->
resource_or_api_or_otp_app
|> Application . get_env ( :ash_apis , [ ] )
|> List . wrap ( )
2024-02-21 07:02:43 +13:00
|> Enum . reduce ( default_acc ( ) , fn api , acc ->
2023-12-13 03:40:03 +13:00
api
|> do_schedule_and_run_triggers ( opts )
2023-12-13 04:14:32 +13:00
|> merge_results ( acc )
2023-12-13 03:40:03 +13:00
end )
end
end
2023-12-13 04:14:32 +13:00
defp drain_queues ( queues , opts ) do
2023-12-20 14:52:52 +13:00
if opts [ :drain_queues? ] do
2024-02-21 07:14:02 +13:00
Enum . reduce ( queues ++ queues , default_acc ( ) , fn queue , acc ->
2023-12-13 04:14:32 +13:00
[ queue : queue ]
|> Keyword . merge (
2024-02-21 07:14:02 +13:00
Keyword . take ( opts , [
:queue ,
:with_limit ,
:with_recursion ,
:with_safety ,
:with_scheduled
] )
2023-12-13 04:14:32 +13:00
)
2024-02-27 04:31:28 +13:00
|> drain_queue ( )
2023-12-13 04:14:32 +13:00
|> Map . put ( :queues_not_drained , [ ] )
|> merge_results ( acc )
end )
else
2024-02-21 07:02:43 +13:00
default_acc ( )
|> Map . update! ( :queues_not_drained , & Enum . uniq ( &1 ++ queues ) )
2023-12-13 04:14:32 +13:00
end
end
2024-02-27 04:31:28 +13:00
if @pro do
defp drain_queue ( opts ) do
2024-02-28 03:42:27 +13:00
conf = Oban . config ( opts [ :oban ] )
opts = Keyword . put_new ( opts , :repo , conf . repo )
apply ( Oban.Pro.Testing , :drain_jobs , [ opts ] )
2024-02-27 04:31:28 +13:00
end
else
if Application . compile_env ( :ash_oban , :test ) || Mix . env ( ) == :test do
defp drain_queue ( opts ) do
Oban . drain_queue ( opts )
end
else
defp drain_queue ( _opts ) do
raise ArgumentError , """
Cannot use the ` drain_queues? : true ` option outside of the test environment , unless you are also using oban pro .
For more information , see this github issue : https :/ / github . com / sorentwo / oban / issues / 1037 #issuecomment-1962928460
"""
end
end
end
2024-02-21 07:14:02 +13:00
defp default_acc do
2024-02-21 07:02:43 +13:00
%{
discard : 0 ,
cancelled : 0 ,
success : 0 ,
failure : 0 ,
snoozed : 0 ,
queues_not_drained : [ ]
}
end
2023-12-13 04:14:32 +13:00
defp merge_results ( results , acc ) do
Map . merge ( results , acc , fn
:queues_not_drained , left , right ->
Enum . uniq ( left ++ right )
_key , left , right ->
left + right
end )
end
2023-04-22 16:46:04 +12:00
end