From d7b446bc7face0b47e5d57a8394a037049ee4137 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Thu, 15 Sep 2022 02:08:30 -0400 Subject: [PATCH] fix: properly reraise errors raised in tasks --- lib/ash/actions/read.ex | 23 +++++++++++++---------- lib/ash/engine/engine.ex | 20 +++++++++++++++++--- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/lib/ash/actions/read.ex b/lib/ash/actions/read.ex index f7904a44..e40205c1 100644 --- a/lib/ash/actions/read.ex +++ b/lib/ash/actions/read.ex @@ -942,7 +942,13 @@ defmodule Ash.Actions.Read do end defp maybe_await(%Task{} = task) do - Task.await(task) + case Task.await(task) do + {:__exception__, e, stacktrace} -> + reraise e, stacktrace + + other -> + other + end end defp maybe_await(other), do: other @@ -958,15 +964,12 @@ defmodule Ash.Actions.Read do {:error, error} -> {:error, error} end else - {:ok, - Task.async(fn -> - try do - do_fetch_count(ash_query, query, initial_limit, initial_offset) - rescue - exception -> - {:error, exception} - end - end)} + Ash.Engine.async( + fn -> + {:ok, do_fetch_count(ash_query, query, initial_limit, initial_offset)} + end, + opts + ) end else {:ok, {:ok, nil}} diff --git a/lib/ash/engine/engine.ex b/lib/ash/engine/engine.ex index 2017ef42..dbf3b329 100644 --- a/lib/ash/engine/engine.ex +++ b/lib/ash/engine/engine.ex @@ -110,7 +110,13 @@ defmodule Ash.Engine do ) try do - Task.await(task, opts[:timeout]) + case Task.await(task, opts[:timeout]) do + {:__exception__, e, stacktrace} -> + raise e, stacktrace + + other -> + other + end catch :exit, {:timeout, {Task, :await, [^task, timeout]}} -> {:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: opts[:name])} @@ -355,10 +361,17 @@ defmodule Ash.Engine do state.tasks |> Enum.reduce(state, fn task, state -> case Task.yield(task, 0) do + {:ok, {:__exception__, exception, stacktrace}} -> + reraise exception, stacktrace + {:ok, {request_path, result}} -> state = %{state | tasks: state.tasks -- [task]} request = Enum.find(state.requests, &(&1.path == request_path)) + if is_nil(request) do + dbg() + end + new_request = %{request | async_fetch_state: {:fetched, result}} replace_request(state, new_request) @@ -439,7 +452,8 @@ defmodule Ash.Engine do end end - defp async(func, opts) do + @doc false + def async(func, opts) do ash_context = Ash.get_context_for_transfer(opts) Task.async(fn -> @@ -449,7 +463,7 @@ defmodule Ash.Engine do func.() rescue e -> - {:error, e} + {:__exception__, e, __STACKTRACE__} end end) end