mirror of
https://github.com/ash-project/ash.git
synced 2024-09-20 21:43:02 +12:00
fix: properly reraise errors raised in tasks
This commit is contained in:
parent
f084d346ad
commit
d7b446bc7f
2 changed files with 30 additions and 13 deletions
|
@ -942,7 +942,13 @@ defmodule Ash.Actions.Read do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_await(%Task{} = task) do
|
defp maybe_await(%Task{} = task) do
|
||||||
Task.await(task)
|
case Task.await(task) do
|
||||||
|
{:__exception__, e, stacktrace} ->
|
||||||
|
reraise e, stacktrace
|
||||||
|
|
||||||
|
other ->
|
||||||
|
other
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_await(other), do: other
|
defp maybe_await(other), do: other
|
||||||
|
@ -958,15 +964,12 @@ defmodule Ash.Actions.Read do
|
||||||
{:error, error} -> {:error, error}
|
{:error, error} -> {:error, error}
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
{:ok,
|
Ash.Engine.async(
|
||||||
Task.async(fn ->
|
fn ->
|
||||||
try do
|
{:ok, do_fetch_count(ash_query, query, initial_limit, initial_offset)}
|
||||||
do_fetch_count(ash_query, query, initial_limit, initial_offset)
|
end,
|
||||||
rescue
|
opts
|
||||||
exception ->
|
)
|
||||||
{:error, exception}
|
|
||||||
end
|
|
||||||
end)}
|
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
{:ok, {:ok, nil}}
|
{:ok, {:ok, nil}}
|
||||||
|
|
|
@ -110,7 +110,13 @@ defmodule Ash.Engine do
|
||||||
)
|
)
|
||||||
|
|
||||||
try do
|
try do
|
||||||
Task.await(task, opts[:timeout])
|
case Task.await(task, opts[:timeout]) do
|
||||||
|
{:__exception__, e, stacktrace} ->
|
||||||
|
raise e, stacktrace
|
||||||
|
|
||||||
|
other ->
|
||||||
|
other
|
||||||
|
end
|
||||||
catch
|
catch
|
||||||
:exit, {:timeout, {Task, :await, [^task, timeout]}} ->
|
:exit, {:timeout, {Task, :await, [^task, timeout]}} ->
|
||||||
{:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: opts[:name])}
|
{:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: opts[:name])}
|
||||||
|
@ -355,10 +361,17 @@ defmodule Ash.Engine do
|
||||||
state.tasks
|
state.tasks
|
||||||
|> Enum.reduce(state, fn task, state ->
|
|> Enum.reduce(state, fn task, state ->
|
||||||
case Task.yield(task, 0) do
|
case Task.yield(task, 0) do
|
||||||
|
{:ok, {:__exception__, exception, stacktrace}} ->
|
||||||
|
reraise exception, stacktrace
|
||||||
|
|
||||||
{:ok, {request_path, result}} ->
|
{:ok, {request_path, result}} ->
|
||||||
state = %{state | tasks: state.tasks -- [task]}
|
state = %{state | tasks: state.tasks -- [task]}
|
||||||
request = Enum.find(state.requests, &(&1.path == request_path))
|
request = Enum.find(state.requests, &(&1.path == request_path))
|
||||||
|
|
||||||
|
if is_nil(request) do
|
||||||
|
dbg()
|
||||||
|
end
|
||||||
|
|
||||||
new_request = %{request | async_fetch_state: {:fetched, result}}
|
new_request = %{request | async_fetch_state: {:fetched, result}}
|
||||||
|
|
||||||
replace_request(state, new_request)
|
replace_request(state, new_request)
|
||||||
|
@ -439,7 +452,8 @@ defmodule Ash.Engine do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp async(func, opts) do
|
@doc false
|
||||||
|
def async(func, opts) do
|
||||||
ash_context = Ash.get_context_for_transfer(opts)
|
ash_context = Ash.get_context_for_transfer(opts)
|
||||||
|
|
||||||
Task.async(fn ->
|
Task.async(fn ->
|
||||||
|
@ -449,7 +463,7 @@ defmodule Ash.Engine do
|
||||||
func.()
|
func.()
|
||||||
rescue
|
rescue
|
||||||
e ->
|
e ->
|
||||||
{:error, e}
|
{:__exception__, e, __STACKTRACE__}
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue