fix: fix ets lateral join source field usage

fix: properly apply distinct in ets
This commit is contained in:
Zach Daniel 2024-08-13 22:07:37 -04:00
parent 93775109c7
commit 7cb3e04b2a
4 changed files with 137 additions and 37 deletions

View file

@ -1236,11 +1236,15 @@ defmodule Ash.Actions.Read.Relationships do
end
primary_key_is_join_keys? =
Enum.all?(Ash.Resource.Info.primary_key(relationship.through), &(&1 in join_keys))
Enum.all?(
Ash.Resource.Info.primary_key(relationship.through),
&(&1 in join_keys)
)
is_unique_on_join_keys? =
Enum.any?(Ash.Resource.Info.identities(relationship.through), fn identity ->
Enum.all?(identity.keys, &(&1 in join_keys))
is_nil(identity.where) && identity.nils_distinct? &&
Enum.all?(identity.keys, &(&1 in join_keys))
end)
not (primary_key_is_join_keys? || is_unique_on_join_keys?)

View file

@ -407,6 +407,10 @@ defmodule Ash.Actions.Sort do
end
def runtime_sort(results, [{field, direction} | rest], opts) do
# we need check if the field supports simple equality, and if so then we can use
# uniq_by
#
# otherwise, we need to do our own matching
resource = get_resource(results, opts)
results
@ -451,16 +455,18 @@ defmodule Ash.Actions.Sort do
def runtime_distinct(results, empty, _) when empty in [nil, []], do: results
def runtime_distinct([single_result], _, _), do: [single_result]
def runtime_distinct([%resource{} | _] = results, [{field, direction} | rest], opts) do
def runtime_distinct([%resource{} | _] = results, distinct, opts) do
# we need check if the field supports simple equality, and if so then we can use
# uniq_by
#
# otherwise, we need to do our own matching
fields = Enum.map(distinct, &elem(&1, 0))
results
|> load_field(field, resource, opts)
|> Enum.group_by(&resolve_field(&1, field))
|> Enum.sort_by(fn {key, _value} -> key end, to_sort_by_fun(direction))
|> Enum.map(fn {_key, [first | _]} ->
first
end)
|> runtime_distinct(rest, Keyword.put(opts, :rekey?, false))
|> maybe_rekey(results, resource, Keyword.get(opts, :rekey?, true))
|> load_field(fields, resource, opts)
|> Enum.to_list()
|> runtime_sort(distinct, opts)
|> Enum.uniq_by(&Map.take(&1, fields))
end
defp load_field(records, field, resource, opts) do

View file

@ -416,26 +416,44 @@ defmodule Ash.DataLayer.Ets do
{source_query, source_attribute, destination_attribute, relationship}
]
) do
source_attributes = Enum.map(root_data, &Map.get(&1, source_attribute))
source_query =
source_query
|> Ash.Query.unset(:load)
|> Ash.Query.unset(:page)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.Query.set_domain(query.domain)
primary_key = Ash.Resource.Info.primary_key(source_query.resource)
source_query =
case primary_key do
[] ->
source_attributes = Enum.map(root_data, &Map.get(&1, source_attribute))
Ash.Query.filter(source_query, ^ref(source_attribute) in ^source_attributes)
[field] ->
source_attributes = Enum.map(root_data, &Map.get(&1, field))
Ash.Query.filter(source_query, ^ref(field) in ^source_attributes)
fields ->
filter = [
or:
Enum.map(root_data, fn record ->
[and: Map.take(record, fields) |> Map.to_list()]
end)
]
Ash.Query.do_filter(source_query, filter)
end
source_query
|> Ash.Query.filter(^ref(source_attribute) in ^source_attributes)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.Query.unset(:load)
|> Ash.Query.unset(:select)
|> Ash.Query.unset(:page)
|> Ash.Actions.Read.unpaginated_read(nil, authorize?: false)
|> case do
{:error, error} ->
{:error, error}
{:ok, root_data} ->
parent_pkey =
case root_data do
[%resource{} | _] -> Ash.Resource.Info.primary_key(resource)
[] -> []
end
root_data
|> Enum.reduce_while({:ok, []}, fn parent, {:ok, results} ->
new_filter =
@ -465,7 +483,7 @@ defmodule Ash.DataLayer.Ets do
new_results =
Enum.map(
new_results,
&Map.put(&1, :__lateral_join_source__, Map.take(parent, parent_pkey))
&Map.put(&1, :__lateral_join_source__, Map.take(parent, primary_key))
)
{:cont, {:ok, new_results ++ results}}
@ -482,26 +500,44 @@ defmodule Ash.DataLayer.Ets do
{through_query, destination_attribute_on_join_resource, destination_attribute,
_through_relationship}
]) do
source_attributes = Enum.map(root_data, &Map.get(&1, source_attribute))
source_query =
source_query
|> Ash.Query.unset(:load)
|> Ash.Query.unset(:page)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.Query.set_domain(query.domain)
primary_key = Ash.Resource.Info.primary_key(source_query.resource)
source_query =
case primary_key do
[] ->
source_attributes = Enum.map(root_data, &Map.get(&1, source_attribute))
Ash.Query.filter(source_query, ^ref(source_attribute) in ^source_attributes)
[field] ->
source_attributes = Enum.map(root_data, &Map.get(&1, field))
Ash.Query.filter(source_query, ^ref(field) in ^source_attributes)
fields ->
filter = [
or:
Enum.map(root_data, fn record ->
[and: Map.take(record, fields) |> Map.to_list()]
end)
]
Ash.Query.do_filter(source_query, filter)
end
source_query
|> Ash.Query.unset(:load)
|> Ash.Query.unset(:page)
|> Ash.Query.filter(^ref(source_attribute) in ^source_attributes)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.Query.set_domain(query.domain)
|> Ash.read(authorize?: false)
|> case do
{:error, error} ->
{:error, error}
{:ok, root_data} ->
parent_pkey =
case root_data do
[%resource{} | _] -> Ash.Resource.Info.primary_key(resource)
[] -> []
end
root_data
|> Enum.reduce_while({:ok, []}, fn parent, {:ok, results} ->
through_query
@ -536,13 +572,14 @@ defmodule Ash.DataLayer.Ets do
Enum.flat_map(new_results, fn result ->
join_data
|> Enum.flat_map(fn join_row ->
# TODO: use `Ash.Type.equal?`
if Map.get(join_row, destination_attribute_on_join_resource) ==
Map.get(result, destination_attribute) do
[
Map.put(
result,
:__lateral_join_source__,
Map.take(parent, parent_pkey)
Map.take(parent, primary_key)
)
]
else

View file

@ -346,6 +346,25 @@ defmodule Ash.Test.Actions.LoadTest do
end
end
defmodule LinkedCategory do
@moduledoc false
use Ash.Resource, domain: Domain, data_layer: Ash.DataLayer.Ets
ets do
private?(true)
end
actions do
default_accept :*
defaults([:read, :destroy, create: :*, update: :*])
end
attributes do
attribute(:source_name, :string, public?: true, primary_key?: true, allow_nil?: false)
attribute(:dest_name, :string, public?: true, primary_key?: true, allow_nil?: false)
end
end
defmodule Category do
@moduledoc false
use Ash.Resource, domain: Domain, data_layer: Ash.DataLayer.Ets
@ -378,6 +397,14 @@ defmodule Ash.Test.Actions.LoadTest do
destination_attribute_on_join_resource: :post_id,
source_attribute_on_join_resource: :category_id
)
many_to_many :linked_categories, __MODULE__,
public?: true,
through: LinkedCategory,
source_attribute: :name,
destination_attribute: :name,
destination_attribute_on_join_resource: :dest_name,
source_attribute_on_join_resource: :source_name
end
end
@ -822,6 +849,32 @@ defmodule Ash.Test.Actions.LoadTest do
assert Enum.sort([category1.id, category2.id]) == Enum.sort([id1, id2])
end
test "ETS lateral join handles more records with matching source attribute" do
category1 =
Category
|> Ash.Changeset.for_create(:create, %{name: "lame"})
|> Ash.create!()
category2 =
Category
|> Ash.Changeset.for_create(:create, %{name: "lame"})
|> Ash.create!()
category3 =
Category
|> Ash.Changeset.for_create(:create, %{name: "cool"})
|> Ash.create!()
LinkedCategory
|> Ash.Changeset.for_create(:create, %{source_name: "lame", dest_name: "cool"})
|> Ash.create!()
category1
|> Ash.load!(linked_categories: Ash.Query.limit(Category, 2))
|> Map.get(:linked_categories)
|> IO.inspect()
end
test "it allows loading filtered many to many relationships with lateral joins" do
category1 =
Category