fix: handle data layer calculations for missing records

This commit is contained in:
Zach Daniel 2024-01-20 18:52:36 -05:00
parent 7aaea7bd55
commit 892cd7c845
4 changed files with 91 additions and 24 deletions

View file

@ -16,12 +16,12 @@ defmodule Ash.Actions.Read.AsyncLimiter do
end
def async_or_inline(
%{resource: resource, context: %{private: %{async_limiter: async_limiter}}},
%{resource: resource, context: %{private: %{async_limiter: async_limiter}}} = query,
opts,
func
)
when not is_nil(async_limiter) do
if Ash.DataLayer.data_layer_can?(resource, :async_engine) do
if Ash.DataLayer.data_layer_can?(resource, :async_engine) && !in_transaction?(query) do
claimed? =
Agent.get_and_update(async_limiter, fn
{limit, limit} ->
@ -95,6 +95,13 @@ defmodule Ash.Actions.Read.AsyncLimiter do
end
end
defp in_transaction?(query) do
Enum.any?(
List.wrap(query.resource) ++ List.wrap(query.action.touches_resources),
&Ash.DataLayer.in_transaction?(&1)
)
end
defp release(async_limiter) do
Agent.update(async_limiter, fn
{count, limit} ->

View file

@ -1,10 +1,10 @@
defmodule Ash.Actions.Read.Calculations do
@moduledoc false
def run([], _, _calculations_in_query), do: {:ok, []}
def run([], _, _, _calculations_in_query), do: {:ok, []}
def run(records, ash_query, calculations_in_query) do
def run(records, ash_query, calculations_at_runtime, calculations_in_query) do
do_run_calculations(
Map.to_list(ash_query.calculations),
calculations_at_runtime,
records,
ash_query,
MapSet.new(calculations_in_query, & &1.name)
@ -16,8 +16,8 @@ defmodule Ash.Actions.Read.Calculations do
defp do_run_calculations(calculations, records, ash_query, done, tasks) do
{do_now, do_later} =
Enum.split_with(calculations, fn {key, _calc} ->
ash_query.context[:calculation_dependencies][key]
Enum.split_with(calculations, fn calc ->
ash_query.context[:calculation_dependencies][calc.name]
|> Kernel.||([])
|> Enum.all?(&(&1 in done))
end)
@ -32,12 +32,12 @@ defmodule Ash.Actions.Read.Calculations do
{newly_done, remaining} =
do_now
|> Enum.map(fn {name, calculation} ->
|> Enum.map(fn calculation ->
Ash.Actions.Read.AsyncLimiter.async_or_inline(
ash_query,
Ash.context_to_opts(calculation.context),
fn ->
{name, calculation, run_calculation(calculation, ash_query, records)}
{calculation.name, calculation, run_calculation(calculation, ash_query, records)}
end
)
end)
@ -142,6 +142,15 @@ defmodule Ash.Actions.Read.Calculations do
records
|> apply_transient_calculation_values(calculation, ash_query, [])
|> calculation.module.calculate(calculation.opts, calculation.context)
|> case do
:unknown ->
Enum.map(records, fn _ ->
nil
end)
result ->
result
end
end
defp apply_transient_calculation_values(records, calculation, ash_query, path) do
@ -743,6 +752,9 @@ defmodule Ash.Actions.Read.Calculations do
end
end
end
true ->
raise "unknown load for #{inspect(query)}: #{inspect(load)}"
end
end)
end

View file

@ -204,29 +204,23 @@ defmodule Ash.Actions.Read do
opts[:tracer],
opts[:authorize?]
),
data <-
attach_fields(
data,
opts[:initial_data],
query
),
{:ok, data} <-
Ash.Actions.Read.Relationships.load(data, query, opts[:lazy?]),
query_with_only_runtime_calcs <- %{
query
| calculations: Map.new(calculations_at_runtime, &{&1.name, &1}),
select: []
},
{:ok, data} <-
Ash.Actions.Read.Calculations.run(
data,
query_with_only_runtime_calcs,
query,
calculations_at_runtime,
calculations_in_query
),
{:ok, data} <-
load_through_attributes(
data,
query_with_only_runtime_calcs,
%{
query
| calculations: Map.new(calculations_at_runtime, &{&1.name, &1}),
select: []
},
query.api,
opts[:actor],
opts[:tracer],
@ -609,10 +603,47 @@ defmodule Ash.Actions.Read do
},
true
) do
{:ok, results, 0}
results
|> attach_fields(initial_data, query)
|> compute_expression_at_runtime_for_missing_records(query, data_layer_calculations)
|> case do
{:ok, result} ->
{:ok, result, 0}
{:error, error} ->
{:error, error}
end
end
end
defp compute_expression_at_runtime_for_missing_records(data, query, data_layer_calculations) do
if Enum.any?(data, & &1.__metadata__[:private][:missing_from_data_layer]) do
{require_calculating, rest} =
data
|> Stream.with_index()
|> Stream.map(fn {record, index} ->
Ash.Resource.put_metadata(record, :private, %{result_index: index})
end)
|> Enum.split_with(& &1.__metadata__[:private][:missing_from_data_layer])
require_calculating
|> recalculate(query, Enum.map(data_layer_calculations, &elem(&1, 0)))
|> case do
{:ok, result} ->
{:ok,
result
|> Enum.concat(rest)
|> Enum.sort_by(& &1.__metadata__[:private][:result_index])}
end
else
{:ok, data}
end
end
defp recalculate(require_calculating, query, data_layer_calculations) do
Ash.Actions.Read.Calculations.run(require_calculating, query, data_layer_calculations, [])
end
defp authorize_query(query, opts) do
if opts[:authorize?] do
case query.api.can(query, opts[:actor],
@ -678,7 +709,6 @@ defmodule Ash.Actions.Read do
tracer,
authorize?
) do
load_through =
query.resource
|> Ash.Resource.Info.attributes()

View file

@ -1343,6 +1343,24 @@ defmodule Ash.Query do
loads
|> List.wrap()
|> Enum.map(fn
{load, {args, further}} ->
if resource_calculation = Ash.Resource.Info.calculation(query.resource, load) do
case resource_calc_to_calc(query, load, resource_calculation, args) do
{:error, _} ->
{load, {args, further}}
{:ok, calc} ->
{calc, further}
end
else
if relationship = Ash.Resource.Info.relationship(query.resource, load) do
related_query = new(relationship.destination)
{{load, reify_calculations(args, related_query)}, further}
else
{load, {args, further}}
end
end
{load, args} ->
if resource_calculation = Ash.Resource.Info.calculation(query.resource, load) do
case resource_calc_to_calc(query, load, resource_calculation, args) do