improvement: add allow_async? to calculations, default to false

improvement: add elixir evaluation step to expression calculations
This commit is contained in:
Zach Daniel 2021-10-25 17:53:51 -04:00
parent e17daf6b02
commit 05d8eb1741
9 changed files with 112 additions and 38 deletions

View file

@ -3,6 +3,7 @@
locals_without_parens = [
accept: 1,
action: 1,
allow_async?: 1,
allow_nil?: 1,
allow_nil_input: 1,
allow_unregistered?: 1,

View file

@ -362,7 +362,8 @@ defmodule Ash.Actions.Read do
Enum.split_with(ash_query.calculations, fn {_name, calculation} ->
Enum.find(used_calculations, &(&1.name == calculation.name)) ||
calculation.name in Enum.map(ash_query.sort || [], &elem(&1, 0)) ||
!:erlang.function_exported(calculation.module, :calculate, 3)
(!:erlang.function_exported(calculation.module, :calculate, 3) &&
calculation.allow_async?)
end)
else
{[], ash_query.calculations}
@ -826,48 +827,51 @@ defmodule Ash.Actions.Read do
end)
can_be_runtime
|> Enum.reduce_while({:ok, %{}}, fn calculation, {:ok, calculation_results} ->
|> Enum.reduce_while({:ok, %{}, require_query}, fn calculation,
{:ok, calculation_results, require_query} ->
case calculation.module.calculate(results, calculation.opts, calculation.context) do
results when is_list(results) ->
{:cont, {:ok, Map.put(calculation_results, calculation, results)}}
{:cont, {:ok, Map.put(calculation_results, calculation, results), require_query}}
:unknown ->
{:cont, {:ok, calculation_results, [calculation | require_query]}}
{:ok, results} ->
{:cont, {:ok, Map.put(calculation_results, calculation, results)}}
{:cont, {:ok, Map.put(calculation_results, calculation, results), require_query}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
|> case do
{:ok, calculation_results} ->
{:ok,
Enum.reduce(calculation_results, results, fn {calculation, values}, records ->
if calculation.load do
:lists.zipwith(
fn record, value -> Map.put(record, calculation.name, value) end,
records,
values
)
else
:lists.zipwith(
fn record, value ->
%{record | calculations: Map.put(record.calculations, calculation.name, value)}
end,
records,
values
)
end
end)}
{:ok, calculation_results, require_query} ->
Enum.reduce(calculation_results, results, fn {calculation, values}, records ->
if calculation.load do
:lists.zipwith(
fn record, value -> Map.put(record, calculation.name, value) end,
records,
values
)
else
:lists.zipwith(
fn record, value ->
%{record | calculations: Map.put(record.calculations, calculation.name, value)}
end,
records,
values
)
end
end)
|> run_calculation_query(require_query, query)
{:error, error} ->
{:error, error}
end
|> run_calculation_query(require_query, query)
end
defp run_calculation_query({:ok, results}, [], _), do: {:ok, results}
defp run_calculation_query(results, [], _), do: {:ok, results}
defp run_calculation_query({:ok, results}, calculations, query) do
defp run_calculation_query(results, calculations, query) do
pkey = Ash.Resource.Info.primary_key(query.resource)
pkey_filter =
@ -918,10 +922,6 @@ defmodule Ash.Actions.Read do
end
end
defp run_calculation_query({:error, error}, _, _) do
{:error, error}
end
defp add_aggregate_values(results, aggregates, resource, aggregate_values, aggregates_in_query) do
keys_to_aggregates =
Enum.reduce(aggregate_values, %{}, fn {_name, keys_to_values}, acc ->

View file

@ -26,7 +26,7 @@ defmodule Ash.Calculation do
@callback init(Keyword.t()) :: {:ok, Keyword.t()} | {:error, term}
@callback describe(Keyword.t()) :: String.t()
@callback calculate([Ash.Resource.record()], Keyword.t(), map) ::
{:ok, [term]} | [term] | {:error, term}
{:ok, [term]} | [term] | {:error, term} | :unknown
@callback expression(Keyword.t(), map) :: any
@callback load(Ash.Query.t(), Keyword.t(), map) :: Ash.Query.t()
@callback select(Ash.Query.t(), Keyword.t(), map) :: list(atom)

View file

@ -147,9 +147,6 @@ defmodule Ash.Filter.Runtime do
nil ->
{:ok, true}
boolean when is_boolean(boolean) ->
{:ok, boolean}
%op{__operator__?: true, left: left, right: right} = operator ->
with {:ok, [left, right]} <-
resolve_exprs([left, right], record),
@ -195,6 +192,9 @@ defmodule Ash.Filter.Runtime do
%BooleanExpression{op: op, left: left, right: right} ->
expression_matches(op, left, right, record)
other ->
{:ok, other}
end
end

View file

@ -1,7 +1,17 @@
defmodule Ash.Query.Calculation do
@moduledoc "Represents a calculated attribute requested on a query"
defstruct [:name, :module, :opts, :load, :type, context: %{}, select: [], sequence: 0]
defstruct [
:name,
:module,
:opts,
:load,
:type,
context: %{},
select: [],
sequence: 0,
allow_async?: false
]
@type t :: %__MODULE__{}

View file

@ -731,7 +731,12 @@ defmodule Ash.Query do
|> Kernel.||([])
|> Enum.concat(module.select(query, opts, calculation.context) || [])
calculation = %{calculation | load: field, select: fields_to_select}
calculation = %{
calculation
| load: field,
select: fields_to_select,
allow_async?: resource_calculation.allow_async?
}
query =
query

View file

@ -10,7 +10,8 @@ defmodule Ash.Resource.Calculation do
:private?,
:allow_nil?,
:select,
:load
:load,
:allow_async?
]
@schema [
@ -28,6 +29,20 @@ defmodule Ash.Resource.Calculation do
default: [],
doc: "Constraints to provide to the type."
],
allow_async?: [
type: :boolean,
default: false,
doc: """
If set to `true`, then the calculation may be run after the main query.
This is useful for calculations that are very expensive, especially when combined with complex filters/join
scenarios. By adding this, we will rerun a trimmed down version of the main query, using the primary keys for
fast access. This will be done asynchronously for each calculation that has `allow_async?: true`.
Keep in mind that if the calculation is used in a filter or sort, it cannot be done asynchrnously,
and *must* be done in the main query.
"""
],
calculation: [
type: {:custom, __MODULE__, :calculation, []},
required: true,

View file

@ -9,6 +9,47 @@ defmodule Ash.Resource.Calculation.Expression do
Ash.Filter.build_filter_from_template(expr, nil, context, context[:context] || %{})
end
def calculate([], _, _), do: []
def calculate([%resource{} | _] = records, opts, context) do
expression =
Ash.Filter.build_filter_from_template(opts[:expr], nil, context, context[:context] || %{})
Enum.reduce_while(records, {:ok, []}, fn record, {:ok, values} ->
case Ash.Filter.hydrate_refs(expression, %{
resource: resource,
aggregates: %{},
calculations: %{},
public?: false
}) do
{:ok, expression} ->
case Ash.Filter.Runtime.do_match(record, expression) do
{:ok, value} ->
{:cont, {:ok, [value | values]}}
:unknown ->
{:halt, :unknown}
{:error, error} ->
{:halt, {:error, error}}
end
{:error, error} ->
{:halt, {:error, error}}
end
end)
|> case do
{:ok, values} ->
{:ok, Enum.reverse(values)}
:unknown ->
:unknown
{:error, error} ->
{:error, error}
end
end
def load(query, opts, context) do
expr =
Ash.Filter.build_filter_from_template(opts[:expr], nil, context, context[:context] || %{})

View file

@ -53,7 +53,9 @@ defmodule Ash.Test.CalculationTest do
constraints: [allow_empty?: true, trim?: false]
end
calculate :expr_full_name, :string, expr(first_name <> " " <> last_name)
calculate :expr_full_name, :string, expr(first_name <> " " <> last_name) do
allow_async? true
end
calculate :conditional_full_name,
:string,