fix(Ash.Reactor): crash when calling an ash reactor for the first time.

Thanks to @carlgleisner for the [detailed reproduction](https://github.com/carlgleisner/reactor_notification_worker_issue).

The problem was caused by an attempt to not have nested reactors indepdently publish their notifications separate to the parents but contained a logic flaw which caused the agent to not start, but only the first time you try and use a given reactor.

The fix involves _always_ starting a notification agent for each reactor, but nesting them.  When a reactor completes it either publishes it's notifications to the parent reactor or to ash if there are no parent reactors.
This commit is contained in:
James Harton 2024-04-09 14:46:11 +12:00 committed by James Harton
parent 1854880def
commit 139058d4c0
6 changed files with 53 additions and 60 deletions

View file

@ -9,8 +9,6 @@ defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Transaction do
@impl true
def build(transaction, reactor) do
sub_reactor = Builder.new({Ash.Reactor.TransactionStep, transaction.name})
# force the sub-reactor to not be hooked.
sub_reactor = %{sub_reactor | context: Map.put(sub_reactor.context, :__ash_hooked__, true)}
with {:ok, reactor} <- ensure_hooked(reactor),
{:ok, sub_reactor} <- build_nested_steps(sub_reactor, transaction.steps),

View file

@ -61,12 +61,6 @@ defmodule Ash.Reactor.BuilderUtils do
@doc false
@spec ensure_hooked(Reactor.t()) :: {:ok, Reactor.t()} | {:error, any}
def ensure_hooked(reactor) when is_map_key(reactor.context, :__ash_hooked__),
do: {:ok, reactor}
def ensure_hooked(reactor) do
with {:ok, reactor} <- Reactor.Builder.ensure_middleware(reactor, Ash.Reactor.Notifications) do
{:ok, %{reactor | context: Map.put(reactor.context, :__ash_hooked__, true)}}
end
end
def ensure_hooked(reactor),
do: Reactor.Builder.ensure_middleware(reactor, Ash.Reactor.Notifications)
end

View file

@ -8,14 +8,16 @@ defmodule Ash.Reactor.Notifications do
require Logger
@context_agent_key :__ash_notification_agent__
@context_notification_key :__unpublished_ash_notifications__
@agent_key :ash_notification_agent
@notification_key :ash_notifications
defguardp has_agent?(context) when is_map_key(context, @context_agent_key)
defguardp has_agent?(context)
when is_map_key(context, @agent_key) and
:erlang.map_get(@agent_key, context) != []
defguardp has_notifications?(context)
when is_map_key(context, @context_notification_key) and
length(:erlang.map_get(@context_notification_key, context)) > 0
when is_map_key(context, @notification_key) and
:erlang.map_get(@notification_key, context) != []
@doc """
When starting a reactor, start an agent to act as a temporary store of
@ -23,10 +25,13 @@ defmodule Ash.Reactor.Notifications do
"""
@impl true
def init(context) when has_notifications?(context) do
with {:ok, notifications} <- Map.fetch(context, @context_notification_key),
with {:ok, notifications} <- Map.fetch(context, @notification_key),
{:ok, context} <- agent_start(context),
{:ok, context} <- agent_put(context, notifications) do
context = Map.delete(context, @context_notification_key)
context =
context
|> Map.delete(@notification_key)
{:ok, context}
end
end
@ -45,7 +50,7 @@ defmodule Ash.Reactor.Notifications do
{:ok,
Map.update(
context,
@context_notification_key,
@notification_key,
notifications,
&Enum.concat(&1, notifications)
)}
@ -61,8 +66,8 @@ defmodule Ash.Reactor.Notifications do
@impl true
def complete(result, context) when has_agent?(context) do
with {:ok, notifications} <- agent_get(context),
{:ok, _context} <- agent_stop(context),
[] <- __MODULE__.publish(notifications) do
{:ok, context} <- agent_stop(context),
[] <- __MODULE__.publish(context, notifications) do
{:ok, result}
else
{:error, reason} ->
@ -110,51 +115,47 @@ defmodule Ash.Reactor.Notifications do
@doc """
Dispatch notifications.
"""
@spec publish(Ash.Notifier.Notification.t() | [Ash.Notifier.Notification.t()]) ::
@spec publish(
Reactor.context(),
Ash.Notifier.Notification.t() | [Ash.Notifier.Notification.t()]
) ::
[Ash.Notifier.Notification.t()]
def publish(notifications), do: Ash.Notifier.notify(notifications)
defp agent_start(context) when has_agent?(context) do
case agent_get(context) do
{:ok, _} -> {:ok, context}
_ -> agent_start(Map.delete(context, @context_agent_key))
def publish(context, notifications) when has_agent?(context) do
case agent_put(context, notifications) do
{:ok, _} -> []
{:error, _} -> notifications
end
end
def publish(_, notifications), do: Ash.Notifier.notify(notifications)
defp agent_start(context) do
case Agent.start_link(fn -> [] end) do
{:ok, pid} -> {:ok, Map.put(context, :__ash_notification_agent__, pid)}
{:ok, pid} -> {:ok, Map.update(context, @agent_key, [pid], &[pid | &1])}
{:error, reason} -> {:error, reason}
end
end
defp agent_get(context) do
notifications =
context
|> Map.fetch!(@context_agent_key)
|> Agent.get(fn notifications -> notifications end, 100)
defp agent_get(%{@agent_key => [pid | _]}) do
notifications = Agent.get(pid, & &1, 100)
{:ok, notifications}
rescue
error -> {:error, error}
end
defp agent_stop(context) do
:ok =
context
|> Map.fetch!(@context_agent_key)
|> Agent.stop(:normal)
defp agent_get(%{@agent_key => []}),
do: {:error, "Context does not contain a notification agent"}
{:ok, Map.delete(context, @context_agent_key)}
defp agent_stop(%{@agent_key => [pid | agents]} = context) do
:ok = Agent.stop(pid, :normal)
{:ok, %{context | @agent_key => agents}}
rescue
error -> {:error, error}
end
defp agent_put(context, notifications) do
:ok =
context
|> Map.fetch!(@context_agent_key)
|> Agent.update(&Enum.concat(&1, notifications))
defp agent_put(%{@agent_key => [pid | _]} = context, notifications) do
:ok = Agent.update(pid, &Enum.concat(&1, notifications))
{:ok, context}
rescue

View file

@ -9,16 +9,16 @@ defmodule Ash.Test.Reactor.NotificationsTest do
test "it starts an agent" do
{:ok, context} = Notifications.init(%{})
assert [] == agent_get(context.__ash_notification_agent__)
assert [] == agent_get(context.ash_notification_agent)
end
test "when there are already notifications in the context it stores them in the agent" do
notifications = build_notifications()
{:ok, context} =
Notifications.init(%{__unpublished_ash_notifications__: notifications})
Notifications.init(%{ash_notifications: notifications})
enqueued = agent_get(context.__ash_notification_agent__)
enqueued = agent_get(context.ash_notification_agent)
assert enqueued == notifications
end
@ -26,9 +26,9 @@ defmodule Ash.Test.Reactor.NotificationsTest do
notifications = build_notifications()
{:ok, context} =
Notifications.init(%{__unpublished_ash_notifications__: notifications})
Notifications.init(%{ash_notifications: notifications})
refute is_map_key(context, :__unpublished_ash_notifications__)
refute is_map_key(context, :ash_notifications)
end
end
@ -39,10 +39,10 @@ defmodule Ash.Test.Reactor.NotificationsTest do
end
test "it stops the agent", %{context: context} do
agent = context.__ash_notification_agent__
[agent | _] = context.ash_notification_agent
{:ok, context} = Notifications.halt(context)
refute is_map_key(context, :__ash_notification_agent__)
assert context.ash_notification_agent == []
refute Process.alive?(agent)
end
@ -51,7 +51,7 @@ defmodule Ash.Test.Reactor.NotificationsTest do
:ok = Notifications.enqueue_notifications(context, notifications)
{:ok, context} = Notifications.halt(context)
assert context.__unpublished_ash_notifications__ == notifications
assert context.ash_notifications == notifications
end
end
@ -65,7 +65,7 @@ defmodule Ash.Test.Reactor.NotificationsTest do
notifications = build_notifications()
:ok = Notifications.enqueue_notifications(context, notifications)
expect(Notifications, :publish, fn actual ->
expect(Notifications, :publish, fn _context, actual ->
assert actual == notifications
[]
end)
@ -77,7 +77,7 @@ defmodule Ash.Test.Reactor.NotificationsTest do
notifications = build_notifications()
:ok = Notifications.enqueue_notifications(context, notifications)
expect(Notifications, :publish, & &1)
expect(Notifications, :publish, fn _context, notifications -> notifications end)
assert capture_log(fn ->
assert {:ok, :result} = Notifications.complete(:result, context)
@ -85,7 +85,7 @@ defmodule Ash.Test.Reactor.NotificationsTest do
end
test "it stops the agent", %{context: context} do
agent = context.__ash_notification_agent__
[agent | _] = context.ash_notification_agent
{:ok, :result} = Notifications.complete(:result, context)
refute Process.alive?(agent)
@ -99,7 +99,7 @@ defmodule Ash.Test.Reactor.NotificationsTest do
end
test "it stops the agent", %{context: context} do
agent = context.__ash_notification_agent__
[agent | _] = context.ash_notification_agent
:ok = Notifications.error([:errors], context)
refute Process.alive?(agent)
@ -112,5 +112,5 @@ defmodule Ash.Test.Reactor.NotificationsTest do
end
end
defp agent_get(agent), do: Agent.get(agent, & &1)
defp agent_get([agent | _]), do: Agent.get(agent, & &1)
end

View file

@ -55,7 +55,7 @@ defmodule Ash.Test.ReactorTest do
end
end
expect(Ash.Reactor.Notifications, :publish, fn notifications ->
expect(Ash.Reactor.Notifications, :publish, fn _context, notifications ->
assert [
%Ash.Notifier.Notification{
resource: Ash.Test.ReactorTest.Post,

View file

@ -79,7 +79,7 @@ defmodule Ash.Test.Reactor.TransactionTest do
end
assert {:ok, %{title: "About Marty McFly"}} =
Reactor.run(SuccessfulNamedReturnTransactionReactor)
Reactor.run!(SuccessfulNamedReturnTransactionReactor)
end
test "when the transaction fails it is rolled back" do