improvement: error handling/docs

This commit is contained in:
Zach Daniel 2023-04-28 13:59:56 -04:00
parent 871d986980
commit 230e6cb0c7
6 changed files with 349 additions and 114 deletions

View file

@ -3,6 +3,7 @@ spark_locals_without_parens = [
api: 1,
max_attempts: 1,
max_scheduler_attempts: 1,
on_error: 1,
queue: 1,
read_action: 1,
scheduler_cron: 1,

View file

@ -48,6 +48,7 @@ oban do
where expr(processed != true)
# checking for matches every minute
scheduler_cron "* * * * *"
on_error :errored
end
end
end
@ -55,8 +56,39 @@ end
See the DSL documentation for more: `AshOban`
## Handling Errors
Error handling is done by adding an `on_error` to your trigger. This is an update action that will get the error as an argument called `:error`. The error will be an Ash error class. These error classes can contain many kinds of errors, so you will need to figure out handling specific errors on your own. Be sure to add the `:error` argument to the action if you want to receive the error.
This is *not* foolproof. You want to be sure that your `on_error` action is as simple as possible, because if an exception is raised during the `on_error` action, the oban job will fail. If you are relying on your `on_error` logic to alter the resource to make it no longer apply to a trigger, consider making your action do *only that*. Then you can add another trigger watching for things in an errored state to do more rich error handling behavior.
## Changing Triggers
To remove or disable triggers, *do not just remove them from your resource*. Due to the way that oban implements cron jobs, if you just remove them from your resource, the cron will attempt to continue scheduling jobs. Instead, set `paused true` or `delete true` on the trigger. See the oban docs for more: https://getoban.pro/docs/pro/0.14.1/Oban.Pro.Plugins.DynamicCron.html#module-using-and-configuring
## Transactions
AshOban adds two new transaction reasons, as it uses explicit transactions to ensure that each triggered record is properly locked and executed in serially.
```elixir
%{
type: :ash_oban_trigger,
metadata: %{
resource: Resource,
trigger: :trigger_name,
primary_key: %{primary_key_fields: value}
}
}
```
and
```elixir
%{
type: :ash_oban_trigger_error,
metadata: %{
resource: Resource
trigger: :trigger_name,
primary_key: %{primary_key_fields: value},
error: <the error (this will be an ash error class)>
}
}
```

View file

@ -17,7 +17,8 @@ defmodule AshOban do
scheduler: module,
state: :active | :paused | :deleted,
worker: module,
__identifier__: atom
__identifier__: atom,
on_error: atom
}
defstruct [
@ -33,6 +34,7 @@ defmodule AshOban do
:state,
:scheduler,
:worker,
:on_error,
:__identifier__
]
end
@ -71,7 +73,13 @@ defmodule AshOban do
max_attempts: [
type: :pos_integer,
default: 1,
doc: "How many times to attempt the job."
doc: """
How many times to attempt the job.
Keep in mind: after all of these attempts, the scheduler will likely just reschedule the job,
leading to infinite retries. To solve for this, configure an `on_error` action that will make
the trigger no longer apply to failed jobs.
"""
],
state: [
type: {:one_of, [:active, :paused, :deleted]},
@ -102,6 +110,11 @@ defmodule AshOban do
where: [
type: :any,
doc: "The filter expression to determine if something should be triggered"
],
on_error: [
type: :atom,
doc:
"An update action to call after the last attempt has failed. See the getting started guide for more."
]
]
}
@ -149,6 +162,24 @@ defmodule AshOban do
AshOban.Transformers.DefineSchedulers
]
def run_trigger(%resource{} = record, trigger) do
trigger =
case trigger do
%AshOban.Trigger{} ->
trigger
name when is_atom(name) ->
AshOban.Info.oban_trigger(resource, name)
end
primary_key = Ash.Resource.Info.primary_key(changeset.resource)
%{primary_key: Map.take(record, primary_key)}
|> trigger.worker.new()
|> Oban.insert!()
end
@doc "Alters your oban configuration to include the required AshOban configuration."
def config(apis, base) do
pro? = AshOban.Info.pro?()

View file

@ -7,17 +7,13 @@ defmodule AshOban.Changes.RunObanTrigger do
def change(changeset, opts, _context) do
trigger = AshOban.Info.oban_trigger(changeset.resource, opts[:trigger])
primary_key = Ash.Resource.Info.primary_key(changeset.resource)
if !trigger do
raise "No such trigger #{opts[:trigger]} for resource #{inspect(changeset.resource)}"
end
Ash.Changeset.after_action(changeset, fn _changeset, result ->
%{primary_key: Map.take(result, primary_key)}
|> trigger.worker.new()
|> Oban.insert!()
AshOban.run_trigger(result, trigger)
{:ok, result}
end)
end

View file

@ -49,7 +49,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
filter =
if not is_nil(trigger.where) do
quote do
quote location: :keep do
def filter(query) do
Ash.Query.do_filter(query, unquote(Macro.escape(trigger.where)))
end
@ -58,7 +58,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
stream =
if is_nil(trigger.where) do
quote do
quote location: :keep do
def stream(resource) do
resource
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
@ -68,7 +68,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end
else
quote do
quote location: :keep do
def stream(resource) do
resource
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
@ -82,7 +82,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
insert =
if pro? do
quote do
quote location: :keep do
def insert(stream) do
stream
|> Stream.chunk_every(100)
@ -91,7 +91,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end
else
quote do
quote location: :keep do
def insert(stream) do
stream
|> Stream.each(&Oban.insert!/1)
@ -115,7 +115,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
quoted =
quote do
quote location: :keep do
use unquote(worker),
queue: unquote(trigger.scheduler_queue),
unique: [
@ -156,12 +156,11 @@ defmodule AshOban.Transformers.DefineSchedulers do
Module.create(scheduler_module_name, quoted, Macro.Env.location(__ENV__))
end
# sobelow_skip ["SQL.Query"]
defp define_worker(resource, worker_module_name, trigger, dsl) do
api = AshOban.Info.oban_api!(dsl)
pro? = AshOban.Info.pro?()
transaction? = Ash.Resource.Info.action(dsl, trigger.action).transaction?
worker =
if pro? do
Oban.Pro.Worker
@ -169,33 +168,43 @@ defmodule AshOban.Transformers.DefineSchedulers do
Oban.Worker
end
function_name =
if pro? do
:process
else
:perform
query = query(trigger, resource)
can_transact? = Ash.DataLayer.data_layer_can?(dsl, :transact)
on_error_transaction? =
can_transact? && trigger.on_error &&
Ash.Resource.Info.action(dsl, trigger.on_error).transaction?
work_transaction? =
can_transact? && Ash.Resource.Info.action(dsl, trigger.action).transaction?
can_lock? = Ash.DataLayer.data_layer_can?(dsl, {:lock, :for_update})
lock =
if work_transaction? || on_error_transaction? do
if can_lock? do
quote location: :keep do
defp lock(query) do
Ash.Query.lock(query, :for_update)
end
end
else
quote location: :keep do
defp lock(query), do: query
end
end
end
query =
if is_nil(trigger.where) do
quote do
def query do
unquote(resource)
end
end
else
quote do
def query do
Ash.Query.do_filter(unquote(resource), unquote(Macro.escape(trigger.where)))
end
end
end
handle_error = handle_error(trigger, on_error_transaction?, resource, api)
work = work(trigger, worker, pro?, resource, api, work_transaction?)
Module.create(
worker_module_name,
quote do
quote location: :keep do
use unquote(worker),
max_attempts: 3,
max_attempts: unquote(trigger.max_attempts),
queue: unquote(trigger.queue),
unique: [
period: :infinity,
@ -208,85 +217,251 @@ defmodule AshOban.Transformers.DefineSchedulers do
require Logger
@impl unquote(worker)
if unquote(trigger.state != :active) do
def unquote(function_name)(_) do
{:discard, unquote(trigger.state)}
end
else
def unquote(function_name)(%Oban.Job{args: %{"primary_key" => primary_key}}) do
if Ash.DataLayer.data_layer_can?(unquote(resource), :transact) &&
unquote(transaction?) do
Ash.DataLayer.transaction(
unquote(resource),
fn ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> then(fn query ->
if Ash.DataLayer.data_layer_can?(unquote(resource), {:lock, :for_update}) do
Ash.Query.lock(query, :for_update)
else
query
end
end)
|> Ash.Query.for_read(unquote(trigger.read_action), authorize?: false)
|> unquote(api).read_one()
|> case do
{:ok, nil} ->
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
record
|> Ash.Changeset.new()
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_update(unquote(trigger.action), %{})
|> unquote(api).update!()
end
end,
nil,
%{
type: :ash_oban_trigger,
metadata: %{
resource: unquote(resource),
trigger: unquote(trigger.name)
}
}
)
|> case do
{:ok, {:discard, reason}} ->
{:discard, reason}
{:ok, _} ->
:ok
other ->
other
end
else
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(trigger.read_action), authorize?: false)
|> unquote(api).read_one()
|> case do
{:ok, nil} ->
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
record
|> Ash.Changeset.new()
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_update(unquote(trigger.action), %{})
|> unquote(api).update()
end
end
end
end
unquote(work)
unquote(query)
unquote(handle_error)
unquote(lock)
end,
Macro.Env.location(__ENV__)
)
end
defp handle_error(trigger, on_error_transaction?, resource, api) do
if trigger.on_error do
# We look up the record again since we have exited any potential transaction we were in before
if on_error_transaction? do
quote location: :keep do
def handle_error(error, primary_key) do
Ash.DataLayer.transaction(
unquote(resource),
fn ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> lock()
|> Ash.Query.for_read(unquote(trigger.read_action), authorize?: false)
|> unquote(api).read_one()
|> case do
{:ok, nil} ->
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
record
|> Ash.Changeset.new()
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_update(unquote(trigger.on_error), %{error: error})
|> unquote(api).update(return_notifications?: true)
|> case do
{:ok, result, notifications} ->
notifications
{:error, error} ->
Ash.DataLayer.rollback(unquote(resource), error)
end
end
end,
nil,
%{
type: :ash_oban_trigger_error,
metadata: %{
resource: unquote(resource),
trigger: unquote(trigger.name),
primary_key: primary_key,
error: error
}
}
)
|> case do
{:ok, {:discard, reason}} ->
{:discard, reason}
{:ok, notifications} ->
Ash.Notifier.notify(notifications)
:ok
{:error, error} ->
Logger.error("""
Error handler failed for #{inspect(unquote(resource))}: #{inspect(primary_key)}!
#{inspect(Exception.message(error))}
""")
{:error, error}
end
end
end
else
quote location: :keep do
def handle_error(error, primary_key) do
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(trigger.read_action), authorize?: false)
|> unquote(api).read_one()
|> case do
{:ok, nil} ->
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
record
|> Ash.Changeset.new()
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_update(unquote(trigger.on_error), %{error: error})
|> unquote(api).update()
|> case do
{:ok, _result} ->
:ok
{:error, error} ->
Logger.error("""
Error handler failed for #{inspect(unquote(resource))}: #{inspect(primary_key)}!
#{inspect(Exception.message(error))}
""")
{:error, error}
end
end
end
end
end
else
quote location: :keep do
def handle_error(error, _) do
{:error, error}
end
end
end
end
defp query(trigger, resource) do
if is_nil(trigger.where) do
quote location: :keep do
def query do
unquote(resource)
end
end
else
quote location: :keep do
def query do
Ash.Query.do_filter(unquote(resource), unquote(Macro.escape(trigger.where)))
end
end
end
end
defp work(trigger, worker, pro?, resource, api, work_transaction?) do
function_name =
if pro? do
:process
else
:perform
end
cond do
trigger.state != :active ->
quote location: :keep do
@impl unquote(worker)
def unquote(function_name)(_) do
{:discard, unquote(trigger.state)}
end
end
work_transaction? ->
quote location: :keep do
@impl unquote(worker)
def unquote(function_name)(%Oban.Job{args: %{"primary_key" => primary_key}} = job) do
Ash.DataLayer.transaction(
unquote(resource),
fn ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> lock()
|> Ash.Query.for_read(unquote(trigger.read_action), authorize?: false)
|> unquote(api).read_one()
|> case do
{:ok, nil} ->
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
record
|> Ash.Changeset.new()
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_update(unquote(trigger.action), %{})
|> unquote(api).update(return_notifications?: true)
|> case do
{:ok, _result, notifications} ->
notifications
{:error, error} ->
Ash.DataLayer.rollback(unquote(resource), error)
end
end
end,
nil,
%{
type: :ash_oban_trigger,
metadata: %{
resource: unquote(resource),
trigger: unquote(trigger.name),
primary_key: primary_key
}
}
)
|> case do
{:ok, {:discard, reason}} ->
{:discard, reason}
{:ok, notifications} ->
Ash.Notifier.notify(notifications)
:ok
{:error, error} ->
raise Ash.Error.to_error_class(error)
end
rescue
error ->
handle_error(error, primary_key)
end
end
true ->
quote location: :keep do
@impl unquote(worker)
def unquote(function_name)(%Oban.Job{args: %{"primary_key" => primary_key}} = job) do
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(trigger.read_action), authorize?: false)
|> unquote(api).read_one()
|> case do
{:ok, nil} ->
{:discard, :trigger_no_longer_applies}
{:ok, record} ->
record
|> Ash.Changeset.new()
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_update(unquote(trigger.action), %{})
|> unquote(api).update()
|> case do
{:ok, result} ->
{:ok, result}
{:error, error} ->
raise Ash.Error.to_error_class(error)
end
# we don't have the record here, so we can't do the `on_error` behavior
other ->
other
end
rescue
error ->
handle_error(error, primary_key)
end
end
end
end
end

View file

@ -1,5 +1,5 @@
%{
"ash": {:hex, :ash, "2.8.0", "8a19f5fdd03b833719e768d9fc08db2ca1353a28517f5aa82cfcbeda6011b8d2", [:mix], [{:comparable, "~> 1.0", [hex: :comparable, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: true]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8.0", [hex: :ets, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: false]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:spark, "~> 1.0", [hex: :spark, repo: "hexpm", optional: false]}, {:stream_data, "~> 0.5.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "aa21b95cfbfb9d070cfd15ff09e798500e4cbae824cae652ac41f5f6cdcc0bfe"},
"ash": {:git, "https://github.com/ash-project/ash.git", "587549636778c52e6d2061708000e5e34935886a", []},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"castore": {:hex, :castore, "1.0.1", "240b9edb4e9e94f8f56ab39d8d2d0a57f49e46c56aced8f873892df8ff64ff5a", [:mix], [], "hexpm", "b4951de93c224d44fac71614beabd88b71932d0b1dea80d2f80fb9044e01bbb3"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
@ -46,7 +46,7 @@
"plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},
"sobelow": {:hex, :sobelow, "0.12.2", "45f4d500e09f95fdb5a7b94c2838d6b26625828751d9f1127174055a78542cf5", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "2f0b617dce551db651145662b84c8da4f158e7abe049a76daaaae2282df01c5d"},
"sourceror": {:hex, :sourceror, "0.12.2", "2ae55efd149193572e0eb723df7c7a1bda9ab33c43373c82642931dbb2f4e428", [:mix], [], "hexpm", "7ad74ade6fb079c71f29fae10c34bcf2323542d8c51ee1bcd77a546cfa89d59c"},
"spark": {:hex, :spark, "1.1.3", "5577146f14f7af85c9c56da283d633377428377952cdafae862e55aabdbd5133", [:mix], [{:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "4512bd2b9b9b20cc47450c83420d7e7e223fd08c4ac75ce0846d0f522d82e50b"},
"spark": {:hex, :spark, "1.1.4", "ded19c8c3431277cadc5ccf6c807df9498f21fa5c273dce479d82f533b88f788", [:mix], [{:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "8ca1e69e3cb230407cb1a0581071e9d89dca21fb7556fb5e2bf221df4774654c"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},