fix: oban pro configuration (#21)

This commit is contained in:
Alessio Montagnani 2024-04-02 20:06:41 +02:00 committed by GitHub
parent a15acf77d3
commit cc78a8e980
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 86 additions and 6 deletions

View file

@ -528,19 +528,28 @@ defmodule AshOban do
def config(apis, base, opts \\ []) do
apis = List.wrap(apis)
opts = Spark.OptionsHelpers.validate!(opts, @config_schema)
pro? = AshOban.Info.pro?()
pro_dynamic_cron_plugin? =
base
|> Keyword.get(:plugins, [])
|> Enum.any?(fn {plugin, _opts} -> plugin == Oban.Pro.Plugins.DynamicCron end)
pro_dynamic_queues_plugin? =
base
|> Keyword.get(:plugins, [])
|> Enum.any?(fn {plugin, _opts} -> plugin == Oban.Pro.Plugins.DynamicQueues end)
cron_plugin =
if pro? do
if pro_dynamic_cron_plugin? do
Oban.Pro.Plugins.DynamicCron
else
Oban.Plugins.Cron
end
if pro? && base[:engine] not in [Oban.Pro.Queue.SmartEngine, Oban.Pro.Engines.Smart] do
if (pro_dynamic_cron_plugin? || pro_dynamic_queues_plugin?) && base[:engine] not in [Oban.Pro.Queue.SmartEngine, Oban.Pro.Engines.Smart] do
raise """
Expected oban engine to be Oban.Pro.Queue.SmartEngine or Oban.Pro.Engines.Smart, but got #{inspect(base[:engine])}.
This expectation is because you've set `config :ash_oban, pro?: true`.
This expectation is because you're using at least one Oban.Pro plugin`.
"""
end
@ -555,7 +564,7 @@ defmodule AshOban do
|> AshOban.Info.oban_triggers_and_scheduled_actions()
|> tap(fn triggers ->
if opts[:require?] do
Enum.each(triggers, &require_queues!(base, resource, &1))
Enum.each(triggers, &require_queues!(base, resource, pro_dynamic_queues_plugin?, &1))
end
end)
|> Enum.filter(fn
@ -615,7 +624,7 @@ defmodule AshOban do
end)
end
defp require_queues!(config, resource, trigger) do
defp require_queues!(config, resource, false, trigger) do
unless config[:queues][trigger.queue] do
raise """
Must configure the queue `:#{trigger.queue}`, required for
@ -633,6 +642,35 @@ defmodule AshOban do
end
end
defp require_queues!(config, resource, true, trigger) do
{_plugin_name, plugin_config} =
config[:plugins]
|> Enum.find({nil, nil}, fn {plugin, _opts} -> plugin == Oban.Pro.Plugins.DynamicQueues end)
if !is_list(plugin_config) || !Keyword.has_key?(plugin_config, :queues) || !is_list(plugin_config[:queues]) || !Keyword.has_key?(plugin_config[:queues], trigger.queue) do
raise """
Must configure the queue `:#{trigger.queue}`, required for
the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
if !is_nil(config[:queues]) && config[:queues] != false do
raise """
Must configure the queue through Oban.Pro.Plugins.DynamicQueues plugin
when Oban Pro is used
"""
end
if Map.has_key?(trigger, :scheduler_queue) do
unless plugin_config[:queues][trigger.scheduler_queue] do
raise """
Must configure the queue `:#{trigger.scheduler_queue}`, required for
the scheduler of the trigger `:#{trigger.name}` on #{inspect(resource)}
"""
end
end
end
defp require_cron!(config, name) do
unless Enum.find(config[:plugins] || [], &match?({^name, _}, &1)) do
ideal =

View file

@ -80,4 +80,46 @@ defmodule AshObanTest do
]
] = config
end
test "oban pro configuration" do
config =
AshOban.config([Api], [
engine: Oban.Pro.Engines.Smart,
plugins: [
{Oban.Pro.Plugins.DynamicCron, [
timezone: "Europe/Rome",
sync_mode: :automatic,
crontab: []
]},
{Oban.Pro.Plugins.DynamicQueues,
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
]}
],
queues: false
])
assert [
engine: Oban.Pro.Engines.Smart,
plugins: [
{Oban.Pro.Plugins.DynamicCron, [
timezone: "Europe/Rome",
sync_mode: :automatic,
crontab: [
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process, []}
]
]},
{Oban.Pro.Plugins.DynamicQueues,
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
]}
],
queues: false
] = config
end
end