From 892cd7c8450079cca7cb62cef4cb826ecefbc844 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Sat, 20 Jan 2024 18:52:36 -0500 Subject: [PATCH] fix: handle data layer calculations for missing records --- lib/ash/actions/read/async_limiter.ex | 11 ++++- lib/ash/actions/read/calculations.ex | 26 ++++++++---- lib/ash/actions/read/read.ex | 60 ++++++++++++++++++++------- lib/ash/query/query.ex | 18 ++++++++ 4 files changed, 91 insertions(+), 24 deletions(-) diff --git a/lib/ash/actions/read/async_limiter.ex b/lib/ash/actions/read/async_limiter.ex index 67b5366f..57333e3d 100644 --- a/lib/ash/actions/read/async_limiter.ex +++ b/lib/ash/actions/read/async_limiter.ex @@ -16,12 +16,12 @@ defmodule Ash.Actions.Read.AsyncLimiter do end def async_or_inline( - %{resource: resource, context: %{private: %{async_limiter: async_limiter}}}, + %{resource: resource, context: %{private: %{async_limiter: async_limiter}}} = query, opts, func ) when not is_nil(async_limiter) do - if Ash.DataLayer.data_layer_can?(resource, :async_engine) do + if Ash.DataLayer.data_layer_can?(resource, :async_engine) && !in_transaction?(query) do claimed? = Agent.get_and_update(async_limiter, fn {limit, limit} -> @@ -95,6 +95,13 @@ defmodule Ash.Actions.Read.AsyncLimiter do end end + defp in_transaction?(query) do + Enum.any?( + List.wrap(query.resource) ++ List.wrap(query.action.touches_resources), + &Ash.DataLayer.in_transaction?(&1) + ) + end + defp release(async_limiter) do Agent.update(async_limiter, fn {count, limit} -> diff --git a/lib/ash/actions/read/calculations.ex b/lib/ash/actions/read/calculations.ex index c0566574..854cf716 100644 --- a/lib/ash/actions/read/calculations.ex +++ b/lib/ash/actions/read/calculations.ex @@ -1,10 +1,10 @@ defmodule Ash.Actions.Read.Calculations do @moduledoc false - def run([], _, _calculations_in_query), do: {:ok, []} + def run([], _, _, _calculations_in_query), do: {:ok, []} - def run(records, ash_query, calculations_in_query) do + def run(records, ash_query, calculations_at_runtime, calculations_in_query) do do_run_calculations( - Map.to_list(ash_query.calculations), + calculations_at_runtime, records, ash_query, MapSet.new(calculations_in_query, & &1.name) @@ -16,8 +16,8 @@ defmodule Ash.Actions.Read.Calculations do defp do_run_calculations(calculations, records, ash_query, done, tasks) do {do_now, do_later} = - Enum.split_with(calculations, fn {key, _calc} -> - ash_query.context[:calculation_dependencies][key] + Enum.split_with(calculations, fn calc -> + ash_query.context[:calculation_dependencies][calc.name] |> Kernel.||([]) |> Enum.all?(&(&1 in done)) end) @@ -32,12 +32,12 @@ defmodule Ash.Actions.Read.Calculations do {newly_done, remaining} = do_now - |> Enum.map(fn {name, calculation} -> + |> Enum.map(fn calculation -> Ash.Actions.Read.AsyncLimiter.async_or_inline( ash_query, Ash.context_to_opts(calculation.context), fn -> - {name, calculation, run_calculation(calculation, ash_query, records)} + {calculation.name, calculation, run_calculation(calculation, ash_query, records)} end ) end) @@ -142,6 +142,15 @@ defmodule Ash.Actions.Read.Calculations do records |> apply_transient_calculation_values(calculation, ash_query, []) |> calculation.module.calculate(calculation.opts, calculation.context) + |> case do + :unknown -> + Enum.map(records, fn _ -> + nil + end) + + result -> + result + end end defp apply_transient_calculation_values(records, calculation, ash_query, path) do @@ -743,6 +752,9 @@ defmodule Ash.Actions.Read.Calculations do end end end + + true -> + raise "unknown load for #{inspect(query)}: #{inspect(load)}" end end) end diff --git a/lib/ash/actions/read/read.ex b/lib/ash/actions/read/read.ex index 1627310d..638907e0 100644 --- a/lib/ash/actions/read/read.ex +++ b/lib/ash/actions/read/read.ex @@ -204,29 +204,23 @@ defmodule Ash.Actions.Read do opts[:tracer], opts[:authorize?] ), - data <- - attach_fields( - data, - opts[:initial_data], - query - ), {:ok, data} <- Ash.Actions.Read.Relationships.load(data, query, opts[:lazy?]), - query_with_only_runtime_calcs <- %{ - query - | calculations: Map.new(calculations_at_runtime, &{&1.name, &1}), - select: [] - }, {:ok, data} <- Ash.Actions.Read.Calculations.run( data, - query_with_only_runtime_calcs, + query, + calculations_at_runtime, calculations_in_query ), {:ok, data} <- load_through_attributes( data, - query_with_only_runtime_calcs, + %{ + query + | calculations: Map.new(calculations_at_runtime, &{&1.name, &1}), + select: [] + }, query.api, opts[:actor], opts[:tracer], @@ -609,10 +603,47 @@ defmodule Ash.Actions.Read do }, true ) do - {:ok, results, 0} + results + |> attach_fields(initial_data, query) + |> compute_expression_at_runtime_for_missing_records(query, data_layer_calculations) + |> case do + {:ok, result} -> + {:ok, result, 0} + + {:error, error} -> + {:error, error} + end end end + defp compute_expression_at_runtime_for_missing_records(data, query, data_layer_calculations) do + if Enum.any?(data, & &1.__metadata__[:private][:missing_from_data_layer]) do + {require_calculating, rest} = + data + |> Stream.with_index() + |> Stream.map(fn {record, index} -> + Ash.Resource.put_metadata(record, :private, %{result_index: index}) + end) + |> Enum.split_with(& &1.__metadata__[:private][:missing_from_data_layer]) + + require_calculating + |> recalculate(query, Enum.map(data_layer_calculations, &elem(&1, 0))) + |> case do + {:ok, result} -> + {:ok, + result + |> Enum.concat(rest) + |> Enum.sort_by(& &1.__metadata__[:private][:result_index])} + end + else + {:ok, data} + end + end + + defp recalculate(require_calculating, query, data_layer_calculations) do + Ash.Actions.Read.Calculations.run(require_calculating, query, data_layer_calculations, []) + end + defp authorize_query(query, opts) do if opts[:authorize?] do case query.api.can(query, opts[:actor], @@ -678,7 +709,6 @@ defmodule Ash.Actions.Read do tracer, authorize? ) do - load_through = query.resource |> Ash.Resource.Info.attributes() diff --git a/lib/ash/query/query.ex b/lib/ash/query/query.ex index 80b2151c..31c5b86c 100644 --- a/lib/ash/query/query.ex +++ b/lib/ash/query/query.ex @@ -1343,6 +1343,24 @@ defmodule Ash.Query do loads |> List.wrap() |> Enum.map(fn + {load, {args, further}} -> + if resource_calculation = Ash.Resource.Info.calculation(query.resource, load) do + case resource_calc_to_calc(query, load, resource_calculation, args) do + {:error, _} -> + {load, {args, further}} + + {:ok, calc} -> + {calc, further} + end + else + if relationship = Ash.Resource.Info.relationship(query.resource, load) do + related_query = new(relationship.destination) + {{load, reify_calculations(args, related_query)}, further} + else + {load, {args, further}} + end + end + {load, args} -> if resource_calculation = Ash.Resource.Info.calculation(query.resource, load) do case resource_calc_to_calc(query, load, resource_calculation, args) do