improvement: don't start processes for single items in list

chore: add some benchmarks/flame files
This commit is contained in:
Zach Daniel 2024-08-15 09:14:43 -04:00
parent e200b5b9d1
commit ce5c080492
11 changed files with 311 additions and 89 deletions

View file

@ -230,7 +230,10 @@ spark_locals_without_parens = [
[ [
import_deps: [:spark, :reactor], import_deps: [:spark, :reactor],
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], inputs: [
"{mix,.formatter}.exs",
"{config,lib,test,benchmarks,flames}/**/*.{ex,exs}"
],
plugins: [Spark.Formatter], plugins: [Spark.Formatter],
locals_without_parens: spark_locals_without_parens, locals_without_parens: spark_locals_without_parens,
export: [ export: [

View file

@ -20,11 +20,8 @@ end
changeset = Ash.Changeset.for_create(Resource, :create, %{}) changeset = Ash.Changeset.for_create(Resource, :create, %{})
Benchee.run( Benchee.run(%{
%{
create: fn -> create: fn ->
Ash.create!(changeset) Ash.create!(changeset)
end end
})
}
)

View file

@ -30,10 +30,11 @@ defmodule Resource do
attributes do attributes do
uuid_primary_key :id uuid_primary_key :id
attribute :embeds, {:array, Embed}, public?: true attribute :embeds, {:array, Embed}, public?: true
attribute :structs, {:array, :struct} do attribute :structs, {:array, :struct} do
public? true public? true
constraints [
items: [ constraints items: [
instance_of: Embed, instance_of: Embed,
fields: [ fields: [
name: [ name: [
@ -41,8 +42,8 @@ defmodule Resource do
] ]
] ]
] ]
]
end end
attribute :maps, {:array, :map}, public?: true attribute :maps, {:array, :map}, public?: true
end end

80
benchmarks/read.exs Normal file
View file

@ -0,0 +1,80 @@
defmodule Domain do
use Ash.Domain, validate_config_inclusion?: false
resources do
allow_unregistered? true
end
end
defmodule Destination do
use Ash.Resource,
data_layer: Ash.DataLayer.Ets,
domain: Domain
actions do
defaults [:read, :destroy, create: :*, update: :*]
end
attributes do
uuid_primary_key :id
attribute :name, :string, allow_nil?: false, public?: true
end
relationships do
belongs_to :source, Source, public?: true
end
end
defmodule Source do
use Ash.Resource,
data_layer: Ash.DataLayer.Ets,
domain: Domain
actions do
defaults [:read, :destroy, create: :*, update: :*]
end
attributes do
uuid_primary_key :id
attribute :first_name, :string, allow_nil?: false, public?: true
attribute :last_name, :string, allow_nil?: false, public?: true
end
calculations do
calculate :full_name, :string, expr(first_name <> " " <> last_name)
end
aggregates do
first :first_destination_name, :destination, :name
end
relationships do
has_many :destination, Destination
end
end
source =
Source
|> Ash.Changeset.for_create(:create, %{first_name: "John", last_name: "Doe"})
|> Ash.create!()
for _ <- 1..2 do
Destination
|> Ash.Changeset.for_create(:create, %{source_id: source.id, name: "Destination"})
|> Ash.create!()
end
query =
Source
|> Ash.Query.for_read(:read, %{})
|> Ash.Query.load([:first_destination_name, :full_name, :destination])
Ash.read!(query)
Logger.configure(level: :error)
Benchee.run(%{
"read" => fn ->
Ash.read!(query)
end
})

View file

@ -16,7 +16,8 @@ mixed = fn count ->
true -> true ->
{:or, -var, expr} {:or, -var, expr}
end end
end) |> Ash.Policy.SatSolver.solve() end)
|> Ash.Policy.SatSolver.solve()
end end
Benchee.run( Benchee.run(
@ -26,14 +27,32 @@ Benchee.run(
end end
}, },
inputs: %{ inputs: %{
"3 conjunctive" => Enum.to_list(1..3) |> Enum.reduce(0, fn var, expr -> {:and, var, expr} end) |> Ash.Policy.SatSolver.solve(), "3 conjunctive" =>
"3 disjunctive" => Enum.to_list(1..3) |> Enum.reduce(0, fn var, expr -> {:or, var, expr} end) |> Ash.Policy.SatSolver.solve(), Enum.to_list(1..3)
|> Enum.reduce(0, fn var, expr -> {:and, var, expr} end)
|> Ash.Policy.SatSolver.solve(),
"3 disjunctive" =>
Enum.to_list(1..3)
|> Enum.reduce(0, fn var, expr -> {:or, var, expr} end)
|> Ash.Policy.SatSolver.solve(),
"3 mixed" => mixed.(3), "3 mixed" => mixed.(3),
"5 conjunctive" => Enum.to_list(1..5) |> Enum.reduce(0, fn var, expr -> {:and, var, expr} end) |> Ash.Policy.SatSolver.solve(), "5 conjunctive" =>
"5 disjunctive" => Enum.to_list(1..5) |> Enum.reduce(0, fn var, expr -> {:or, var, expr} end) |> Ash.Policy.SatSolver.solve(), Enum.to_list(1..5)
|> Enum.reduce(0, fn var, expr -> {:and, var, expr} end)
|> Ash.Policy.SatSolver.solve(),
"5 disjunctive" =>
Enum.to_list(1..5)
|> Enum.reduce(0, fn var, expr -> {:or, var, expr} end)
|> Ash.Policy.SatSolver.solve(),
"5 mixed" => mixed.(5), "5 mixed" => mixed.(5),
"7 conjunctive" => Enum.to_list(1..7) |> Enum.reduce(0, fn var, expr -> {:and, var, expr} end) |> Ash.Policy.SatSolver.solve(), "7 conjunctive" =>
"7 disjunctive" => Enum.to_list(1..7) |> Enum.reduce(0, fn var, expr -> {:or, var, expr} end) |> Ash.Policy.SatSolver.solve(), Enum.to_list(1..7)
"7 mixed" => mixed.(7), |> Enum.reduce(0, fn var, expr -> {:and, var, expr} end)
|> Ash.Policy.SatSolver.solve(),
"7 disjunctive" =>
Enum.to_list(1..7)
|> Enum.reduce(0, fn var, expr -> {:or, var, expr} end)
|> Ash.Policy.SatSolver.solve(),
"7 mixed" => mixed.(7)
} }
) )

View file

@ -1,5 +1,4 @@
Benchee.run( Benchee.run(%{
%{
"uuid_v7 raw" => fn -> "uuid_v7 raw" => fn ->
Ash.UUIDv7.bingenerate() Ash.UUIDv7.bingenerate()
end, end,

81
flames/read.exs Normal file
View file

@ -0,0 +1,81 @@
defmodule Domain do
use Ash.Domain, validate_config_inclusion?: false
resources do
allow_unregistered? true
end
end
defmodule Destination do
use Ash.Resource,
data_layer: Ash.DataLayer.Ets,
domain: Domain
actions do
defaults [:read, :destroy, create: :*, update: :*]
end
attributes do
uuid_primary_key :id
attribute :name, :string, allow_nil?: false, public?: true
end
relationships do
belongs_to :source, Source, public?: true
end
end
defmodule Source do
use Ash.Resource,
data_layer: Ash.DataLayer.Ets,
domain: Domain
actions do
defaults [:read, :destroy, create: :*, update: :*]
end
attributes do
uuid_primary_key :id
attribute :first_name, :string, allow_nil?: false, public?: true
attribute :last_name, :string, allow_nil?: false, public?: true
end
calculations do
calculate :full_name, :string, expr(first_name <> " " <> last_name)
end
aggregates do
first :first_destination_name, :destination, :name
end
relationships do
has_many :destination, Destination
end
end
source =
Source
|> Ash.Changeset.for_create(:create, %{first_name: "John", last_name: "Doe"})
|> Ash.create!()
for _ <- 1..2 do
Destination
|> Ash.Changeset.for_create(:create, %{source_id: source.id, name: "Destination"})
|> Ash.create!()
end
query =
Source
|> Ash.Query.for_read(:read, %{})
|> Ash.Query.load([:first_destination_name, :full_name, :destination])
Ash.read!(query)
Logger.configure(level: :error)
:eflame.apply(
fn ->
Ash.read!(query)
end,
[]
)

View file

@ -18,9 +18,10 @@ defmodule Ash.Actions.Read.AsyncLimiter do
def async_or_inline( def async_or_inline(
%{resource: resource, context: %{private: %{async_limiter: async_limiter}}} = query, %{resource: resource, context: %{private: %{async_limiter: async_limiter}}} = query,
opts, opts,
last?,
func func
) )
when not is_nil(async_limiter) do when not is_nil(async_limiter) and last? != true do
if Application.get_env(:ash, :disable_async?) do if Application.get_env(:ash, :disable_async?) do
func.() func.()
else else
@ -54,7 +55,7 @@ defmodule Ash.Actions.Read.AsyncLimiter do
end end
end end
def async_or_inline(_, _opts, func) do def async_or_inline(_, _opts, _, func) do
func.() func.()
end end

View file

@ -264,16 +264,8 @@ defmodule Ash.Actions.Read.Calculations do
{newly_done, remaining} = {newly_done, remaining} =
do_now do_now
|> Enum.map(fn calculation -> |> do_run_calcs(ash_query, records)
Ash.Actions.Read.AsyncLimiter.async_or_inline( |> Stream.concat(tasks)
ash_query,
Ash.Context.to_opts(calculation.context),
fn ->
{calculation.name, calculation, run_calculation(calculation, ash_query, records)}
end
)
end)
|> Enum.concat(tasks)
|> Ash.Actions.Read.AsyncLimiter.await_at_least_one() |> Ash.Actions.Read.AsyncLimiter.await_at_least_one()
records = records =
@ -309,6 +301,26 @@ defmodule Ash.Actions.Read.Calculations do
end end
end end
defp do_run_calcs(calcs, ash_query, records, acc \\ [])
defp do_run_calcs([], _ash_query, _records, acc) do
acc
end
defp do_run_calcs([calculation | rest], ash_query, records, acc) do
result =
Ash.Actions.Read.AsyncLimiter.async_or_inline(
ash_query,
Ash.Context.to_opts(calculation.context),
Enum.empty?(rest),
fn ->
{calculation.name, calculation, run_calculation(calculation, ash_query, records)}
end
)
do_run_calcs(rest, ash_query, records, [result | acc])
end
defp attach_calculation_results(calculation, records, nil) do defp attach_calculation_results(calculation, records, nil) do
if calculation.load do if calculation.load do
Enum.map(records, fn record -> Enum.map(records, fn record ->

View file

@ -1212,7 +1212,7 @@ defmodule Ash.Actions.Read do
end end
defp authorize_query(query, opts) do defp authorize_query(query, opts) do
if opts[:authorize?] do if opts[:authorize?] && !Enum.empty?(Ash.Resource.Info.authorizers(query.resource)) do
case Ash.can(query, opts[:actor], case Ash.can(query, opts[:actor],
return_forbidden_error?: true, return_forbidden_error?: true,
maybe_is: false, maybe_is: false,

View file

@ -39,21 +39,30 @@ defmodule Ash.Actions.Read.Relationships do
end) end)
end end
defp fetch_related_records(relationships_and_queries, records) do defp fetch_related_records(batch, records, acc \\ [])
Enum.map(relationships_and_queries, fn
defp fetch_related_records([], _records, acc) do
Ash.Actions.Read.AsyncLimiter.await_all(acc)
end
defp fetch_related_records([first | rest], records, acc) do
result =
case first do
{relationship, {:lazy, query}} -> {relationship, {:lazy, query}} ->
{relationship, {:lazy, query}, lazy_related_records(records, relationship, query)} {relationship, {:lazy, query},
lazy_related_records(records, relationship, query, Enum.empty?(rest))}
{relationship, %{valid?: true} = related_query} -> {relationship, %{valid?: true} = related_query} ->
do_fetch_related_records(records, relationship, related_query) do_fetch_related_records(records, relationship, related_query, Enum.empty?(rest))
{relationship, %{errors: errors} = related_query} -> {relationship, %{errors: errors} = related_query} ->
{relationship, related_query, {:error, errors}} {relationship, related_query, {:error, errors}}
end)
|> Ash.Actions.Read.AsyncLimiter.await_all()
end end
defp lazy_related_records(records, relationship, related_query) do fetch_related_records(rest, records, [result | acc])
end
defp lazy_related_records(records, relationship, related_query, last?) do
primary_key = Ash.Resource.Info.primary_key(relationship.source) primary_key = Ash.Resource.Info.primary_key(relationship.source)
related_records_with_lazy_join_source = related_records_with_lazy_join_source =
@ -76,6 +85,11 @@ defmodule Ash.Actions.Read.Relationships do
|> Enum.map(&Ash.Resource.set_metadata(&1, %{lazy_join_source: record_pkey})) |> Enum.map(&Ash.Resource.set_metadata(&1, %{lazy_join_source: record_pkey}))
end) end)
Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query,
Ash.Context.to_opts(related_query.context),
last?,
fn ->
Ash.load(related_records_with_lazy_join_source, related_query, Ash.load(related_records_with_lazy_join_source, related_query,
lazy?: true, lazy?: true,
domain: related_query.domain, domain: related_query.domain,
@ -84,9 +98,11 @@ defmodule Ash.Actions.Read.Relationships do
authorize?: related_query.context.private[:authorize?] authorize?: related_query.context.private[:authorize?]
) )
end end
)
end
defp with_related_queries(load, query, records, lazy?) do defp with_related_queries(load, query, records, lazy?) do
Stream.map(load, fn {relationship_name, related_query} -> Enum.map(load, fn {relationship_name, related_query} ->
lazy? = lazy? || related_query.context[:private][:lazy?] lazy? = lazy? || related_query.context[:private][:lazy?]
if lazy? && Ash.Resource.loaded?(records, relationship_name, lists: :any) do if lazy? && Ash.Resource.loaded?(records, relationship_name, lists: :any) do
@ -309,11 +325,13 @@ defmodule Ash.Actions.Read.Relationships do
defp do_fetch_related_records( defp do_fetch_related_records(
records, records,
%{manual: {module, opts}} = relationship, %{manual: {module, opts}} = relationship,
related_query related_query,
last?
) do ) do
Ash.Actions.Read.AsyncLimiter.async_or_inline( Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query, related_query,
Ash.Context.to_opts(related_query.context), Ash.Context.to_opts(related_query.context),
last?,
fn -> fn ->
result = result =
module.load(records, opts, %Ash.Resource.ManualRelationship.Context{ module.load(records, opts, %Ash.Resource.ManualRelationship.Context{
@ -365,11 +383,13 @@ defmodule Ash.Actions.Read.Relationships do
defp do_fetch_related_records( defp do_fetch_related_records(
_records, _records,
%{no_attributes?: true} = relationship, %{no_attributes?: true} = relationship,
related_query related_query,
last?
) do ) do
Ash.Actions.Read.AsyncLimiter.async_or_inline( Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query, related_query,
Ash.Context.to_opts(related_query.context), Ash.Context.to_opts(related_query.context),
last?,
fn -> fn ->
result = result =
related_query related_query
@ -387,11 +407,13 @@ defmodule Ash.Actions.Read.Relationships do
defp do_fetch_related_records( defp do_fetch_related_records(
_records, _records,
relationship, relationship,
%{context: %{data_layer: %{lateral_join_source: {_, _}}}} = related_query %{context: %{data_layer: %{lateral_join_source: {_, _}}}} = related_query,
last?
) do ) do
Ash.Actions.Read.AsyncLimiter.async_or_inline( Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query, related_query,
Ash.Context.to_opts(related_query.context), Ash.Context.to_opts(related_query.context),
last?,
fn -> fn ->
result = result =
related_query related_query
@ -406,7 +428,12 @@ defmodule Ash.Actions.Read.Relationships do
) )
end end
defp do_fetch_related_records(records, %{type: :many_to_many} = relationship, related_query) do defp do_fetch_related_records(
records,
%{type: :many_to_many} = relationship,
related_query,
last?
) do
record_ids = record_ids =
Enum.map(records, fn record -> Enum.map(records, fn record ->
Map.get(record, relationship.source_attribute) Map.get(record, relationship.source_attribute)
@ -447,6 +474,7 @@ defmodule Ash.Actions.Read.Relationships do
Ash.Actions.Read.AsyncLimiter.async_or_inline( Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query, related_query,
Ash.Context.to_opts(related_query.context), Ash.Context.to_opts(related_query.context),
last?,
fn -> fn ->
case Ash.Actions.Read.unpaginated_read(join_query, nil) do case Ash.Actions.Read.unpaginated_read(join_query, nil) do
{:ok, join_records} -> {:ok, join_records} ->
@ -526,12 +554,13 @@ defmodule Ash.Actions.Read.Relationships do
) )
end end
defp do_fetch_related_records(records, relationship, related_query) do defp do_fetch_related_records(records, relationship, related_query, last?) do
destination_attributes = Enum.map(records, &Map.get(&1, relationship.source_attribute)) destination_attributes = Enum.map(records, &Map.get(&1, relationship.source_attribute))
Ash.Actions.Read.AsyncLimiter.async_or_inline( Ash.Actions.Read.AsyncLimiter.async_or_inline(
related_query, related_query,
Ash.Context.to_opts(related_query.context), Ash.Context.to_opts(related_query.context),
last?,
fn -> fn ->
result = result =
related_query related_query