mirror of
https://github.com/ash-project/ash_oban.git
synced 2024-09-19 21:03:00 +12:00
improvement: properly schedule scheduled actions
This commit is contained in:
parent
b82b41a228
commit
0dd7f68cf2
4 changed files with 89 additions and 11 deletions
|
@ -214,6 +214,7 @@ defmodule AshOban do
|
|||
max_attempts: non_neg_integer(),
|
||||
queue: atom,
|
||||
debug?: boolean,
|
||||
state: :active | :paused | :deleted,
|
||||
priority: non_neg_integer()
|
||||
}
|
||||
|
||||
|
@ -228,6 +229,7 @@ defmodule AshOban do
|
|||
:queue,
|
||||
:worker,
|
||||
:debug?,
|
||||
:state,
|
||||
:__identifier__
|
||||
]
|
||||
end
|
||||
|
@ -440,6 +442,7 @@ defmodule AshOban do
|
|||
#{Spark.OptionsHelpers.docs(@config_schema)}
|
||||
"""
|
||||
def config(apis, base, opts \\ []) do
|
||||
apis = List.wrap(apis)
|
||||
opts = Spark.OptionsHelpers.validate!(opts, @config_schema)
|
||||
pro? = AshOban.Info.pro?()
|
||||
|
||||
|
@ -465,13 +468,19 @@ defmodule AshOban do
|
|||
|> Enum.uniq()
|
||||
|> Enum.flat_map(fn resource ->
|
||||
resource
|
||||
|> AshOban.Info.oban_triggers()
|
||||
|> AshOban.Info.oban_triggers_and_scheduled_actions()
|
||||
|> tap(fn triggers ->
|
||||
if opts[:require?] do
|
||||
Enum.each(triggers, &require_queues!(base, resource, &1))
|
||||
end
|
||||
end)
|
||||
|> Enum.filter(& &1.scheduler_cron)
|
||||
|> Enum.filter(fn
|
||||
%{scheduler_cron: scheduler_cron} ->
|
||||
scheduler_cron
|
||||
|
||||
_ ->
|
||||
true
|
||||
end)
|
||||
|> Enum.map(&{resource, &1})
|
||||
end)
|
||||
|> case do
|
||||
|
@ -505,7 +514,15 @@ defmodule AshOban do
|
|||
[]
|
||||
end
|
||||
|
||||
cron = {trigger.scheduler_cron, trigger.scheduler, opts}
|
||||
cron =
|
||||
case trigger do
|
||||
%{scheduler_cron: scheduler_cron} ->
|
||||
{scheduler_cron, trigger.scheduler, opts}
|
||||
|
||||
%{cron: cron} ->
|
||||
{cron, trigger.worker, opts}
|
||||
end
|
||||
|
||||
{cron_plugin, Keyword.update(config, :crontab, [cron], &[cron | &1])}
|
||||
|
||||
other ->
|
||||
|
@ -522,18 +539,37 @@ defmodule AshOban do
|
|||
"""
|
||||
end
|
||||
|
||||
unless config[:queues][trigger.scheduler_queue] do
|
||||
raise """
|
||||
Must configure the queue `:#{trigger.scheduler_queue}`, required for
|
||||
the scheduler of the trigger `:#{trigger.name}` on #{inspect(resource)}
|
||||
"""
|
||||
if Map.has_key?(trigger, :scheduler_queue) do
|
||||
unless config[:queues][trigger.scheduler_queue] do
|
||||
raise """
|
||||
Must configure the queue `:#{trigger.scheduler_queue}`, required for
|
||||
the scheduler of the trigger `:#{trigger.name}` on #{inspect(resource)}
|
||||
"""
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp require_cron!(config, name) do
|
||||
unless Enum.find(config[:plugins] || [], &match?({^name, _}, &1)) do
|
||||
ideal =
|
||||
if Keyword.keyword?(config[:plugins]) do
|
||||
Keyword.update!(config, :plugins, fn plugins ->
|
||||
Keyword.put(plugins, name, [])
|
||||
end)
|
||||
end
|
||||
|
||||
ideal =
|
||||
if ideal do
|
||||
"""
|
||||
|
||||
Example:
|
||||
|
||||
#{inspect(ideal)}
|
||||
"""
|
||||
end
|
||||
|
||||
raise """
|
||||
Must configure cron plugin #{name}.
|
||||
Must configure cron plugin #{inspect(name)}.
|
||||
|
||||
See oban's documentation for more. AshOban will
|
||||
add cron jobs to the configuration, but will not
|
||||
|
@ -542,6 +578,7 @@ defmodule AshOban do
|
|||
Configuration received:
|
||||
|
||||
#{inspect(config)}
|
||||
#{ideal}
|
||||
"""
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,10 +5,41 @@ defmodule AshObanTest do
|
|||
alias AshOban.Test.Api
|
||||
alias AshOban.Test.Triggered
|
||||
|
||||
test "foo" do
|
||||
test "dsl introspection" do
|
||||
assert [
|
||||
%AshOban.Trigger{action: :process},
|
||||
%AshOban.Trigger{action: :process, scheduler: nil}
|
||||
] = AshOban.Info.oban_triggers(Triggered)
|
||||
end
|
||||
|
||||
test "cron configuration" do
|
||||
config =
|
||||
AshOban.config([Api],
|
||||
plugins: [
|
||||
{Oban.Plugins.Cron, []}
|
||||
],
|
||||
queues: [
|
||||
triggered_process: 10,
|
||||
triggered_process_2: 10,
|
||||
triggered_say_hello: 10
|
||||
]
|
||||
)
|
||||
|
||||
assert [
|
||||
plugins: [
|
||||
{Oban.Plugins.Cron,
|
||||
[
|
||||
crontab: [
|
||||
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello, []},
|
||||
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process, []}
|
||||
]
|
||||
]}
|
||||
],
|
||||
queues: [
|
||||
triggered_process: 10,
|
||||
triggered_process_2: 10,
|
||||
triggered_say_hello: 10
|
||||
]
|
||||
] = config
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,6 +3,6 @@ defmodule AshOban.Test.Api do
|
|||
use Ash.Api
|
||||
|
||||
resources do
|
||||
allow_unregistered? true
|
||||
resource AshOban.Test.Triggered
|
||||
end
|
||||
end
|
||||
|
|
|
@ -21,6 +21,10 @@ defmodule AshOban.Test.Triggered do
|
|||
scheduler_cron false
|
||||
end
|
||||
end
|
||||
|
||||
scheduled_actions do
|
||||
schedule :say_hello, "0 0 1 1 *"
|
||||
end
|
||||
end
|
||||
|
||||
actions do
|
||||
|
@ -34,6 +38,12 @@ defmodule AshOban.Test.Triggered do
|
|||
update :process do
|
||||
change set_attribute(:processed, true)
|
||||
end
|
||||
|
||||
action :say_hello, :string do
|
||||
run fn input, _ ->
|
||||
{:ok, "Hello"}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
ets do
|
||||
|
|
Loading…
Reference in a new issue