This commit is contained in:
Zach Daniel 2019-12-09 14:28:09 -05:00
parent 6ba4afa1d6
commit 82cd7c00b1
No known key found for this signature in database
GPG key ID: A57053A671EE649E
6 changed files with 188 additions and 43 deletions

View file

@ -96,3 +96,7 @@ end
* Don't let users declare `has_one` relationships without claiming that there is a unique constraint on the destination field. * Don't let users declare `has_one` relationships without claiming that there is a unique constraint on the destination field.
* Set up "atomic updates" (upserts). If an adapter supports them, and the auth passes precheck, we could turn `get + update` combos into `upserts` * Set up "atomic updates" (upserts). If an adapter supports them, and the auth passes precheck, we could turn `get + update` combos into `upserts`
* Use data layer compatibility features to disallow incompatible setups. For instance, if the data layer can't transact, then they can't have an editable `has_one` or `many_to_many` resource. * Use data layer compatibility features to disallow incompatible setups. For instance, if the data layer can't transact, then they can't have an editable `has_one` or `many_to_many` resource.
* Add `can?(:bulk_update)` to data layers, so we can more efficiently update relationships
* Figure out under what circumstances we can bulk fetch when reading before updating many_to_many and to_many relationships, and do so.
* most relationship stuff can't be done w/o primary keys
* includer errors are super obscure because you can't tell what action they are about

View file

@ -112,4 +112,118 @@ defmodule Ash.Actions.ChangesetHelpers do
end) end)
end end
end end
def has_many_assoc_update(changeset, %{name: rel_name}, identifier, _, _)
when not is_list(identifier) do
Ecto.Changeset.add_error(changeset, rel_name, "Invalid value")
end
def has_many_assoc_update(
%{__ash_api__: api} = changeset,
%{
destination: destination,
destination_field: destination_field,
source_field: source_field,
name: rel_name
},
identifiers,
authorize?,
user
) do
case values_to_primary_key_filters(destination, identifiers) do
{:error, _error} ->
Ecto.Changeset.add_error(changeset, rel_name, "Invalid primary key supplied")
{:ok, filters} ->
after_change(changeset, fn _changeset, %resource{} = result ->
value = Map.get(result, source_field)
with {:ok, %{results: related}} <-
api.read(destination, %{
filter: %{from_related: {result, rel_name}},
paginate?: false,
authorize?: authorize?,
user: user
}),
{:ok, to_relate} <-
get_to_relate(api, filters, destination, authorize?, user),
to_clear <- get_no_longer_present(resource, related, to_relate),
:ok <- clear_related(api, resource, to_clear, destination_field, authorize?, user),
{:ok, now_related} <-
relate_items(api, to_relate, destination_field, value, authorize?, user) do
Map.put(result, rel_name, now_related)
end
end)
end
end
defp relate_items(api, to_relate, destination_field, destination_field_value, authorize?, user) do
Enum.reduce(to_relate, {:ok, []}, fn
to_be_related, {:ok, now_related} ->
case api.update(to_be_related, %{
attributes: %{destination_field => destination_field_value},
authorize?: authorize?,
user: user
}) do
{:ok, newly_related} -> [newly_related | now_related]
{:error, error} -> {:error, error}
end
_, {:error, error} ->
{:error, error}
end)
end
defp clear_related(api, resource, to_clear, destination_key, authorize?, user) do
Enum.reduce(to_clear, :ok, fn
record, :ok ->
case api.update(resource, record, %{
attributes: %{destination_key => nil},
authorize?: authorize?,
user: user
}) do
{:ok, _} -> :ok
{:error, error} -> {:error, error}
end
_record, {:error, error} ->
{:error, error}
end)
end
defp get_no_longer_present(resource, currently_related, to_relate) do
primary_key = Ash.primary_key(resource)
to_relate_pkeys =
to_relate
|> Enum.map(&Map.take(&1, primary_key))
|> MapSet.new()
Enum.reject(currently_related, fn related_item ->
MapSet.member?(to_relate_pkeys, Map.take(related_item, primary_key))
end)
end
defp get_to_relate(api, filters, destination, authorize?, user) do
Enum.reduce(filters, {:ok, nil}, fn
filter, {:ok, _} ->
api.get(destination, filter, %{authorize?: authorize?, user: user})
_, {:error, error} ->
{:error, error}
end)
end
defp values_to_primary_key_filters(destination, identifiers) do
Enum.reduce(identifiers, {:ok, []}, fn
identifier, {:ok, filters} ->
case Filter.value_to_primary_key_filter(destination, identifier) do
{:ok, filter} -> [filter | filters]
{:error, error} -> {:error, error}
end
_, {:error, error} ->
{:error, error}
end)
end
end end

View file

@ -123,8 +123,8 @@ defmodule Ash.Actions.Create do
%{type: :has_one} = rel -> %{type: :has_one} = rel ->
ChangesetHelpers.has_one_assoc_update(changeset, rel, value, authorize?, user) ChangesetHelpers.has_one_assoc_update(changeset, rel, value, authorize?, user)
# %{type: :has_many} = rel -> %{type: :has_many} = rel ->
# has_many_assoc_update(changeset, rel, value) ChangesetHelpers.has_many_assoc_update(changeset, rel, value, authorize?, user)
# %{type: :many_to_many} = rel -> # %{type: :many_to_many} = rel ->
# many_to_many_assoc_update(changeset, rel, value, repo) # many_to_many_assoc_update(changeset, rel, value, repo)

View file

@ -76,6 +76,24 @@ defmodule Ash.Actions.Filter do
end end
end end
defp process_filter(_resource, :from_related, {_related, relationship}, {filter, errors})
when is_atom(relationship) do
{filter,
[
"Must provide structs, or a relationship struct. Cannot pass ids and an atom relationship. #{
relationship
}"
| errors
]}
end
defp process_filter(_resource, :from_related, {related, relationship}, {filter, errors}) do
{Map.put(filter, :from_related, {related, relationship}), errors}
end
# {:from_related, {[_ | _] = related, %{} = relationship}} ->
# {Map.put(filter, :from_related, {related, relationship}), errors}
defp process_filter(resource, field, value, {filter, errors}) do defp process_filter(resource, field, value, {filter, errors}) do
cond do cond do
attr = Ash.attribute(resource, field) -> attr = Ash.attribute(resource, field) ->

View file

@ -12,21 +12,20 @@ defmodule Ash.Actions.SideLoader do
global_params global_params
) do ) do
case side_load(resource, results, side_loads, api, global_params) do case side_load(resource, results, side_loads, api, global_params) do
{:ok, side_loaded} -> %{paginator | results: side_loaded} {:ok, side_loaded} -> {:ok, %{paginator | results: side_loaded}}
{:error, error} -> {:error, error} {:error, error} -> {:error, error}
end end
end end
def side_load(resource, record, side_loads, api, global_params) when not is_list(record) do def side_load(resource, record, side_loads, api, global_params) when not is_list(record) do
case side_load(resource, [record], side_loads, api, global_params) do case side_load(resource, [record], side_loads, api, global_params) do
{:ok, [side_loaded]} -> side_loaded {:ok, [side_loaded]} -> {:ok, side_loaded}
{:error, error} -> {:error, error} {:error, error} -> {:error, error}
end end
end end
def side_load(resource, records, side_loads, api, global_params) do def side_load(resource, records, side_loads, api, global_params) do
# TODO: No global config! {side_load_type, config} = Ash.side_load_config(api)
{side_load_type, config} = Ash.side_load_config(resource)
async? = side_load_type == :parallel async? = side_load_type == :parallel
side_loads = side_loads =
@ -38,7 +37,7 @@ defmodule Ash.Actions.SideLoader do
end end
end) end)
side_loaded = side_load_results =
side_loads side_loads
|> maybe_async_stream(config, async?, fn relationship_name, further -> |> maybe_async_stream(config, async?, fn relationship_name, further ->
relationship = Ash.relationship(resource, relationship_name) relationship = Ash.relationship(resource, relationship_name)
@ -50,14 +49,14 @@ defmodule Ash.Actions.SideLoader do
action_params = action_params =
global_params global_params
|> Map.put(:filter, %{ |> Map.put(:filter, %{
# TODO: This filter needs to be supported and documented, e.g for authorization
from_related: {records, relationship} from_related: {records, relationship}
}) })
|> Map.put_new(:paginate?, false) |> Map.put_new(:paginate?, false)
with {:ok, related_records} <- api.read(relationship.destination, action_params), with {:ok, %{results: related_records}} <-
{:ok, %{results: side_loaded_related}} <- api.read(relationship.destination, action_params),
side_load(relationship.destination, related_records, further, global_params) do {:ok, side_loaded_related} <-
side_load(relationship.destination, related_records, further, api, global_params) do
keyed_by_id = keyed_by_id =
Enum.group_by(side_loaded_related, fn record -> Enum.group_by(side_loaded_related, fn record ->
# This is required for many to many relationships # This is required for many to many relationships
@ -65,6 +64,28 @@ defmodule Ash.Actions.SideLoader do
Map.get(record, relationship.destination_field) Map.get(record, relationship.destination_field)
end) end)
{:ok, {relationship, keyed_by_id}}
else
{:error, error} -> {:error, error}
end
end)
|> Enum.to_list()
# This is dumb, should handle these errors better
first_error =
Enum.find(side_load_results, fn side_loaded ->
match?({:error, _error}, side_loaded)
end)
if first_error do
first_error
else
{:ok, link_records(Enum.map(side_load_results, &elem(&1, 1)), records)}
end
end
defp link_records(results, records) do
Enum.reduce(results, records, fn {relationship, keyed_by_id}, records ->
Enum.map(records, fn record -> Enum.map(records, fn record ->
related_to_this_record = related_to_this_record =
Map.get(keyed_by_id, Map.get(record, relationship.source_field)) || [] Map.get(keyed_by_id, Map.get(record, relationship.source_field)) || []
@ -80,29 +101,17 @@ defmodule Ash.Actions.SideLoader do
linked_record = linked_record =
record record
|> Map.put(relationship_name, unwrapped) |> Map.put(relationship.name, unwrapped)
|> Map.put_new(:__linkage__, %{}) |> Map.put_new(:__linkage__, %{})
|> Map.update!(:__linkage__, &Map.put(&1, relationship_name, related_ids)) |> Map.update!(:__linkage__, &Map.put(&1, relationship.name, related_ids))
{:ok, linked_record} linked_record
end) end)
else
{:error, error} -> {:error, error}
end
end) end)
|> List.flatten()
# This is dumb, should handle these errors better
first_error =
Enum.find(side_loaded, fn side_loaded ->
match?({:error, _error}, side_loaded)
end)
first_error || {:ok, Enum.map(side_loaded, &elem(&1, 1))}
end end
defp maybe_async_stream(preloads, _opts, false, function) do defp maybe_async_stream(preloads, _opts, false, function) do
Enum.map(preloads, fn {association, further} -> Stream.map(preloads, fn {association, further} ->
function.(association, further) function.(association, further)
end) end)
end end
@ -110,7 +119,7 @@ defmodule Ash.Actions.SideLoader do
defp maybe_async_stream(preloads, opts, true, function) do defp maybe_async_stream(preloads, opts, true, function) do
# We could theoretically do one of them outside of a task whlie we wait for the rest # We could theoretically do one of them outside of a task whlie we wait for the rest
# Not worth implementing to start, IMO. # Not worth implementing to start, IMO.
opts = [ async_opts = [
opts[:max_concurrency] || System.schedulers_online(), opts[:max_concurrency] || System.schedulers_online(),
ordered: false, ordered: false,
timeout: opts[:timeout] || :timer.seconds(5), timeout: opts[:timeout] || :timer.seconds(5),
@ -118,11 +127,11 @@ defmodule Ash.Actions.SideLoader do
shutdown: opts[:shutdown] || :timer.seconds(5) shutdown: opts[:shutdown] || :timer.seconds(5)
] ]
opts[:supervisor] Task.Supervisor.async_stream_nolink(
|> Task.Supervisor.async_stream_nolink( opts[:supervisor],
preloads, preloads,
fn {key, further} -> function.(key, further) end, fn {key, further} -> function.(key, further) end,
opts async_opts
) )
|> Stream.map(&to_result/1) |> Stream.map(&to_result/1)
end end

View file

@ -104,8 +104,8 @@ defmodule Ash.Actions.Update do
%{type: :has_one} = rel -> %{type: :has_one} = rel ->
ChangesetHelpers.has_one_assoc_update(changeset, rel, value, authorize?, user) ChangesetHelpers.has_one_assoc_update(changeset, rel, value, authorize?, user)
# %{type: :has_many} = rel -> %{type: :has_many} = rel ->
# has_many_assoc_update(changeset, rel, value) ChangesetHelpers.has_many_assoc_update(changeset, rel, value, authorize?, user)
# %{type: :many_to_many} = rel -> # %{type: :many_to_many} = rel ->
# many_to_many_assoc_update(changeset, rel, value, repo) # many_to_many_assoc_update(changeset, rel, value, repo)