This commit is contained in:
Zach Daniel 2020-01-14 15:02:30 -05:00
parent ed2b24d590
commit bed0a39cf2
No known key found for this signature in database
GPG key ID: A57053A671EE649E
5 changed files with 42 additions and 253 deletions

View file

@ -1,21 +1,19 @@
defmodule Ash.Actions.Create do defmodule Ash.Actions.Create do
alias Ash.Engine alias Ash.Engine
alias Ash.Actions.{Attributes, Relationships} alias Ash.Actions.{Attributes, Relationships, SideLoad}
@spec run(Ash.api(), Ash.resource(), Ash.action(), Ash.params()) :: @spec run(Ash.api(), Ash.resource(), Ash.action(), Ash.params()) ::
{:ok, Ash.record()} | {:error, Ecto.Changeset.t()} | {:error, Ash.error()} {:ok, Ash.record()} | {:error, Ecto.Changeset.t()} | {:error, Ash.error()}
def run(api, resource, action, params) do def run(api, resource, action, params) do
if Keyword.get(params, :side_load, []) in [[], nil] do Ash.DataLayer.transact(resource, fn ->
Ash.DataLayer.transact(resource, fn -> do_run(api, resource, action, params)
do_run(api, resource, action, params) end)
end)
else
{:error, "Cannot side load on create currently"}
end
end end
defp do_run(api, resource, action, params) do defp do_run(api, resource, action, params) do
attributes = Keyword.get(params, :attributes, %{}) attributes = Keyword.get(params, :attributes, %{})
side_loads = Keyword.get(params, :side_load, [])
relationships = Keyword.get(params, :relationships, %{}) relationships = Keyword.get(params, :relationships, %{})
with {:ok, relationships} <- with {:ok, relationships} <-
@ -32,9 +30,10 @@ defmodule Ash.Actions.Create do
), ),
params <- Keyword.merge(params, attributes: attributes, relationships: relationships), params <- Keyword.merge(params, attributes: attributes, relationships: relationships),
%{valid?: true} = changeset <- changeset(api, resource, params), %{valid?: true} = changeset <- changeset(api, resource, params),
{:ok, %{data: created}} <- {:ok, side_load_requests} <- SideLoad.requests(api, resource, side_loads),
do_authorized(changeset, params, action, resource, api) do {:ok, %{data: created} = state} <-
{:ok, created} do_authorized(changeset, params, action, resource, api, side_load_requests) do
{:ok, SideLoad.attach_side_loads(created, state)}
else else
%Ecto.Changeset{} = changeset -> %Ecto.Changeset{} = changeset ->
{:error, changeset} {:error, changeset}
@ -53,7 +52,7 @@ defmodule Ash.Actions.Create do
|> Relationships.handle_relationship_changes(api, relationships, :create) |> Relationships.handle_relationship_changes(api, relationships, :create)
end end
defp do_authorized(changeset, params, action, resource, api) do defp do_authorized(changeset, params, action, resource, api, side_load_requests) do
relationships = Keyword.get(params, :relationships, %{}) relationships = Keyword.get(params, :relationships, %{})
create_request = create_request =
@ -113,7 +112,7 @@ defmodule Ash.Actions.Create do
Engine.run( Engine.run(
params[:authorization][:user], params[:authorization][:user],
[create_request | attribute_requests] ++ [create_request | attribute_requests] ++
relationship_read_requests ++ relationship_change_requests, relationship_read_requests ++ relationship_change_requests ++ side_load_requests,
strict_access?: strict_access?, strict_access?: strict_access?,
log_final_report?: params[:authorization][:log_final_report?] || false log_final_report?: params[:authorization][:log_final_report?] || false
) )
@ -123,7 +122,7 @@ defmodule Ash.Actions.Create do
Engine.run( Engine.run(
authorization[:user], authorization[:user],
[create_request | attribute_requests] ++ [create_request | attribute_requests] ++
relationship_read_requests ++ relationship_change_requests, relationship_read_requests ++ relationship_change_requests ++ side_load_requests,
fetch_only?: true fetch_only?: true
) )
end end

View file

@ -3,6 +3,12 @@ defmodule Ash.Actions.Read do
alias Ash.Actions.SideLoad alias Ash.Actions.SideLoad
def run(api, resource, action, params) do def run(api, resource, action, params) do
Ash.DataLayer.transact(resource, fn ->
do_run(api, resource, action, params)
end)
end
defp do_run(api, resource, action, params) do
filter = Keyword.get(params, :filter, []) filter = Keyword.get(params, :filter, [])
sort = Keyword.get(params, :sort, []) sort = Keyword.get(params, :sort, [])
side_loads = Keyword.get(params, :side_load, []) side_loads = Keyword.get(params, :side_load, [])
@ -10,7 +16,7 @@ defmodule Ash.Actions.Read do
with %Ash.Filter{errors: [], requests: filter_requests} = filter <- with %Ash.Filter{errors: [], requests: filter_requests} = filter <-
Ash.Filter.parse(resource, filter, api), Ash.Filter.parse(resource, filter, api),
{:ok, side_load_requests} <- SideLoad.requests(api, resource, side_loads, filter), {:ok, side_load_requests} <- SideLoad.requests(api, resource, side_loads),
query <- Ash.DataLayer.resource_to_query(resource), query <- Ash.DataLayer.resource_to_query(resource),
{:ok, sort} <- Ash.Actions.Sort.process(resource, sort), {:ok, sort} <- Ash.Actions.Sort.process(resource, sort),
{:ok, sorted_query} <- Ash.DataLayer.sort(query, sort, resource), {:ok, sorted_query} <- Ash.DataLayer.sort(query, sort, resource),

View file

@ -1,21 +1,21 @@
defmodule Ash.Actions.SideLoad do defmodule Ash.Actions.SideLoad do
def requests(api, resource, side_load, source_filter, path \\ []) def requests(api, resource, side_load, path \\ [])
def requests(_, _, [], _, _), do: {:ok, []} def requests(_, _, [], _), do: {:ok, []}
def requests(api, resource, side_load, source_filter, path) do def requests(api, resource, side_load, path) do
# TODO: return authorizations here. # TODO: return authorizations here.
Enum.reduce(side_load, {:ok, []}, fn Enum.reduce(side_load, {:ok, []}, fn
_, {:error, error} -> _, {:error, error} ->
{:error, error} {:error, error}
{key, true}, {:ok, acc} -> {key, true}, {:ok, acc} ->
do_requests(api, resource, key, [], source_filter, path, acc) do_requests(api, resource, key, [], path, acc)
{key, further}, {:ok, acc} -> {key, further}, {:ok, acc} ->
do_requests(api, resource, key, further, source_filter, path, acc) do_requests(api, resource, key, further, path, acc)
key, {:ok, acc} -> key, {:ok, acc} ->
do_requests(api, resource, key, [], source_filter, path, acc) do_requests(api, resource, key, [], path, acc)
end) end)
end end
@ -35,8 +35,8 @@ defmodule Ash.Actions.SideLoad do
end) end)
end end
def side_load(api, resource, data, side_load, source_filter) do def side_load(api, resource, data, side_load) do
requests = requests(api, resource, side_load, source_filter) requests = requests(api, resource, side_load)
case Ash.Engine.run(nil, requests, state: %{data: data}) do case Ash.Engine.run(nil, requests, state: %{data: data}) do
{:ok, state} -> {:ok, state} ->
@ -134,12 +134,12 @@ defmodule Ash.Actions.SideLoad do
last_relationship!(relationship.destination, rest) last_relationship!(relationship.destination, rest)
end end
defp do_requests(api, resource, key, further, source_filter, path, acc) do defp do_requests(api, resource, key, further, path, acc) do
with {:rel, relationship} when not is_nil(relationship) <- with {:rel, relationship} when not is_nil(relationship) <-
{:rel, Ash.relationship(resource, key)}, {:rel, Ash.relationship(resource, key)},
nested_path <- path ++ [relationship], nested_path <- path ++ [relationship],
{:ok, requests} <- {:ok, requests} <-
requests(api, relationship.destination, further, source_filter, nested_path) do requests(api, relationship.destination, further, nested_path) do
default_read = default_read =
Ash.primary_action(resource, :read) || Ash.primary_action(resource, :read) ||
raise "Must set default read for #{inspect(resource)}" raise "Must set default read for #{inspect(resource)}"
@ -178,7 +178,7 @@ defmodule Ash.Actions.SideLoad do
else else
# Because we have the records, we can optimize the filter by nillifying the reverse relationship. # Because we have the records, we can optimize the filter by nillifying the reverse relationship.
# The reverse relationship is useful if you don't have the relationship keys for the related items (only pkeys) # The reverse relationship is useful if you don't have the relationship keys for the related items (only pkeys)
# or for doing many to many joins. # or for doing many to many joins, but can be slower.
filter = filter =
side_load_filter(%{relationship | reverse_relationship: nil}, state, path) side_load_filter(%{relationship | reverse_relationship: nil}, state, path)
@ -197,53 +197,6 @@ defmodule Ash.Actions.SideLoad do
end end
end end
# defp add_results_to_data(data, results, relationship, path) do
# {results_by_destination_key, default} =
# case relationship.cardinality do
# :many ->
# {Enum.group_by(results, &Map.get(&1, relationship.destination_field)), []}
# :one ->
# keyed =
# Enum.into(results, %{}, fn result ->
# {Map.get(result, relationship.destination_field), result}
# end)
# {keyed, nil}
# end
# do_add_results_to_data(data, results_by_destination_key, default, relationship, path)
# end
# defp do_add_results_to_data(data, results_by_destination_key, default, relationship, [])
# when is_list(data) do
# Enum.map(data, fn item ->
# source_value = Map.get(item, relationship.source_field)
# relationship_value = Map.get(results_by_destination_key, source_value, default)
# Map.put(item, relationship.name, relationship_value)
# end)
# end
# defp do_add_results_to_data(data, results_by_destination_key, default, relationship, [
# first | rest
# ])
# when is_list(data) do
# Enum.map(data, fn item ->
# Map.update!(
# item,
# first,
# &do_add_results_to_data(&1, results_by_destination_key, default, relationship, rest)
# )
# end)
# end
# defp do_add_results_to_data(data, results_by_destination_key, default, relationship, path) do
# [data]
# |> do_add_results_to_data(results_by_destination_key, default, relationship, path)
# |> List.first()
# end
defp relationship_already_loaded?(data, relationship, path) do defp relationship_already_loaded?(data, relationship, path) do
Enum.all?(get_field(data, relationship.name, path), fn item -> Enum.all?(get_field(data, relationship.name, path), fn item ->
not match?(%Ecto.Association.NotLoaded{}, item) not match?(%Ecto.Association.NotLoaded{}, item)
@ -321,12 +274,7 @@ defmodule Ash.Actions.SideLoad do
defp reverse_relationship_path_and_values(relationship, data, prior_path, acc \\ []) defp reverse_relationship_path_and_values(relationship, data, prior_path, acc \\ [])
defp reverse_relationship_path_and_values( defp reverse_relationship_path_and_values(%{reverse_relationship: nil}, _, _, _) do
%{reverse_relationship: nil},
_data,
_prior_path,
_acc
) do
:error :error
end end
@ -381,22 +329,6 @@ defmodule Ash.Actions.SideLoad do
|> get_field(name, rest) |> get_field(name, rest)
end end
# defp is_fetched?(_, []), do: true
# defp is_fetched?(records, [rel | rest]) do
# Enum.all?(records, fn record ->
# case Map.get(record, rel) do
# %Ecto.Association.NotLoaded{} ->
# false
# value ->
# value
# |> List.wrap()
# |> is_fetched?(rest)
# end
# end)
# end
defp put_nested_relationship([rel | rest], value) do defp put_nested_relationship([rel | rest], value) do
[ [
{rel, put_nested_relationship(rest, value)} {rel, put_nested_relationship(rest, value)}
@ -407,93 +339,6 @@ defmodule Ash.Actions.SideLoad do
value value
end end
# def side_load(resource, record, keyword, api, global_params \\ [])
# def side_load(_resource, [], _side_loads, _api, _global_params) do
# {:ok, []}
# end
# def side_load(_resource, record_or_records, [], _api, _global_params),
# do: {:ok, record_or_records}
# def side_load(
# resource,
# %Ash.Actions.Paginator{results: results} = paginator,
# side_loads,
# api,
# global_params
# ) do
# case side_load(resource, results, side_loads, api, global_params) do
# {:ok, side_loaded} -> {:ok, %{paginator | results: side_loaded}}
# {:error, error} -> {:error, error}
# end
# end
# def side_load(resource, record, side_loads, api, global_params)
# when not is_list(record) do
# case side_load(resource, [record], side_loads, api, global_params) do
# {:ok, [side_loaded]} -> {:ok, side_loaded}
# {:error, error} -> {:error, error}
# end
# end
# def side_load(resource, records, side_loads, api, global_params) do
# {side_load_type, config} = Ash.side_load_config(api)
# side_loads = sanitize_side_loads(side_loads)
# side_load_results =
# side_loads
# |> maybe_async_stream(config, async?, fn relationship_name, further ->
# relationship = Ash.relationship(resource, relationship_name)
# # Combining filters, and handling boolean filters is
# # going to come into play here. #TODO
# # need to be able to configure options specific to the path of the preload!
# unless relationship.reverse_relationship do
# raise "no reverse relationship for #{inspect(relationship)}. This should be validated at compile time."
# end
# action_params =
# global_params
# |> Keyword.put(
# :filter,
# [{relationship.reverse_relationship, reverse_relationship_filter(records)}]
# )
# |> Keyword.put_new(:paginate?, false)
# with {:ok, %{results: related_records}} <-
# api.read(relationship.destination, action_params),
# {:ok, side_loaded_related} <-
# side_load(relationship.destination, related_records, further, api, global_params) do
# keyed_by_id =
# Enum.group_by(side_loaded_related, fn record ->
# # This is required for many to many relationships
# Map.get(record, :__related_id__) ||
# Map.get(record, relationship.destination_field)
# end)
# {:ok, {relationship, keyed_by_id}}
# else
# {:error, error} -> {:error, error}
# end
# end)
# |> Enum.to_list()
# # This is dumb, should handle these errors better
# first_error =
# Enum.find(side_load_results, fn side_loaded ->
# match?({:error, _error}, side_loaded)
# end)
# if first_error do
# first_error
# else
# {:ok, link_records(Enum.map(side_load_results, &elem(&1, 1)), records)}
# end
# end
defp sanitize_side_loads(side_loads) do defp sanitize_side_loads(side_loads) do
Enum.map(side_loads, fn side_load_part -> Enum.map(side_loads, fn side_load_part ->
if is_atom(side_load_part) do if is_atom(side_load_part) do
@ -503,68 +348,4 @@ defmodule Ash.Actions.SideLoad do
end end
end) end)
end end
# defp reverse_relationship_filter(records) when is_list(records) do
# [or: records |> List.wrap() |> Enum.map(&reverse_relationship_filter/1)]
# end
# defp reverse_relationship_filter(%resource{} = record) do
# record |> Map.take(Ash.primary_key(resource)) |> Map.to_list()
# end
# defp link_records(results, records) do
# Enum.reduce(results, records, fn {relationship, keyed_by_id}, records ->
# Enum.map(records, fn record ->
# related_to_this_record =
# Map.get(keyed_by_id, Map.get(record, relationship.source_field)) || []
# unwrapped =
# if relationship.cardinality == :many do
# related_to_this_record
# else
# List.first(related_to_this_record)
# end
# related_ids = Enum.map(related_to_this_record, fn record -> record.id end)
# linked_record =
# record
# |> Map.put(relationship.name, unwrapped)
# |> Map.put_new(:__linkage__, %{})
# |> Map.update!(:__linkage__, &Map.put(&1, relationship.name, related_ids))
# linked_record
# end)
# end)
# end
# defp maybe_async_stream(preloads, _opts, false, function) do
# Stream.map(preloads, fn {association, further} ->
# function.(association, further)
# end)
# end
# defp maybe_async_stream(preloads, opts, true, function) do
# # We could theoretically do one of them outside of a task whlie we wait for the rest
# # Not worth implementing to start, IMO.
# async_opts = [
# opts[:max_concurrency] || System.schedulers_online(),
# ordered: false,
# timeout: opts[:timeout] || :timer.seconds(5),
# on_timeout: :kill_task,
# shutdown: opts[:shutdown] || :timer.seconds(5)
# ]
# Task.Supervisor.async_stream_nolink(
# opts[:supervisor],
# preloads,
# fn {key, further} -> function.(key, further) end,
# async_opts
# )
# |> Stream.map(&to_result/1)
# end
# defp to_result({:exit, reason}), do: {:error, {:exit, reason}}
# defp to_result({:ok, {:ok, value}}), do: {:ok, value}
# defp to_result({:ok, {:error, error}}), do: {:error, error}
end end

View file

@ -1,6 +1,6 @@
defmodule Ash.Actions.Update do defmodule Ash.Actions.Update do
alias Ash.Engine alias Ash.Engine
alias Ash.Actions.{Attributes, Relationships} alias Ash.Actions.{Attributes, Relationships, SideLoad}
@spec run(Ash.api(), Ash.record(), Ash.action(), Ash.params()) :: @spec run(Ash.api(), Ash.record(), Ash.action(), Ash.params()) ::
{:ok, Ash.record()} | {:error, Ecto.Changeset.t()} | {:error, Ash.error()} {:ok, Ash.record()} | {:error, Ecto.Changeset.t()} | {:error, Ash.error()}
@ -18,6 +18,7 @@ defmodule Ash.Actions.Update do
defp do_run(api, %resource{} = record, action, params) do defp do_run(api, %resource{} = record, action, params) do
attributes = Keyword.get(params, :attributes, %{}) attributes = Keyword.get(params, :attributes, %{})
relationships = Keyword.get(params, :relationships, %{}) relationships = Keyword.get(params, :relationships, %{})
side_loads = Keyword.get(params, :side_load, [])
with {:ok, relationships} <- with {:ok, relationships} <-
Relationships.validate_not_changing_relationship_and_source_field( Relationships.validate_not_changing_relationship_and_source_field(
@ -33,8 +34,10 @@ defmodule Ash.Actions.Update do
), ),
params <- Keyword.merge(params, attributes: attributes, relationships: relationships), params <- Keyword.merge(params, attributes: attributes, relationships: relationships),
%{valid?: true} = changeset <- changeset(record, api, params), %{valid?: true} = changeset <- changeset(record, api, params),
{:ok, %{data: updated}} <- do_authorized(changeset, params, action, resource, api) do {:ok, side_load_requests} <- SideLoad.requests(api, resource, side_loads),
{:ok, updated} {:ok, %{data: updated}} = state <-
do_authorized(changeset, params, action, resource, api, side_load_requests) do
{:ok, SideLoad.attach_side_loads(updated, state)}
else else
%Ecto.Changeset{} = changeset -> %Ecto.Changeset{} = changeset ->
{:error, changeset} {:error, changeset}
@ -53,7 +56,7 @@ defmodule Ash.Actions.Update do
|> Relationships.handle_relationship_changes(api, relationships, :update) |> Relationships.handle_relationship_changes(api, relationships, :update)
end end
defp do_authorized(changeset, params, action, resource, api) do defp do_authorized(changeset, params, action, resource, api, side_load_requests) do
relationships = Keyword.get(params, :relationships) relationships = Keyword.get(params, :relationships)
update_request = update_request =
@ -102,7 +105,7 @@ defmodule Ash.Actions.Update do
Engine.run( Engine.run(
params[:authorization][:user], params[:authorization][:user],
[update_request | attribute_requests] ++ relationship_requests, [update_request | attribute_requests] ++ relationship_requests ++ side_load_requests,
strict_access?: strict_access?, strict_access?: strict_access?,
log_final_report?: params[:authorization][:log_final_report?] || false log_final_report?: params[:authorization][:log_final_report?] || false
) )
@ -111,7 +114,7 @@ defmodule Ash.Actions.Update do
Engine.run( Engine.run(
authorization[:user], authorization[:user],
[update_request | attribute_requests] ++ relationship_requests, [update_request | attribute_requests] ++ relationship_requests ++ side_load_requests,
fetch_only?: true fetch_only?: true
) )
end end

View file

@ -271,7 +271,7 @@ defmodule Ash.Authorization.Checker do
defp run_preparation(_, nil, :side_load, _), do: {:ok, nil} defp run_preparation(_, nil, :side_load, _), do: {:ok, nil}
defp run_preparation(request, data, :side_load, side_load) do defp run_preparation(request, data, :side_load, side_load) do
SideLoad.side_load(request.api, request.resource, data, side_load, request.api) SideLoad.side_load(request.api, request.resource, data, side_load)
end end
defp run_preparation(_, _, preparation, _), do: {:error, "Unknown preparation #{preparation}"} defp run_preparation(_, _, preparation, _), do: {:error, "Unknown preparation #{preparation}"}