fix: move to simpler transaction logic

This commit is contained in:
Zach Daniel 2020-06-29 17:42:01 -04:00
parent 05f2d9ad64
commit 807b16e268
No known key found for this signature in database
GPG key ID: C377365383138D4B
8 changed files with 291 additions and 295 deletions

View file

@ -27,12 +27,14 @@ locals_without_parens = [
many_to_many: 3,
primary?: 1,
primary_key?: 1,
private?: 1,
read: 1,
read: 2,
resource: 1,
resource: 2,
source_field: 1,
source_field_on_join_table: 1,
table: 1,
through: 1,
type: 1,
update: 1,

View file

@ -11,7 +11,10 @@ defmodule Ash.Actions.Create do
side_load = opts[:side_load] || []
upsert? = opts[:upsert?] || false
engine_opts = Keyword.take(opts, [:verbose?, :actor, :authorize?])
engine_opts =
opts
|> Keyword.take([:verbose?, :actor, :authorize?])
|> Keyword.put(:transaction?, true)
action =
if is_atom(action) and not is_nil(action) do
@ -143,7 +146,7 @@ defmodule Ash.Actions.Create do
[authorization_request | [commit_request | relationship_read_requests]] ++
side_load_requests,
api,
Keyword.put(engine_opts, :transaction?, true)
engine_opts
)
end

View file

@ -6,7 +6,10 @@ defmodule Ash.Actions.Destroy do
@spec run(Ash.api(), Ash.record(), Ash.action(), Keyword.t()) ::
:ok | {:error, Ecto.Changeset.t()} | {:error, Ash.error()}
def run(api, %resource{} = record, action, opts) do
engine_opts = Keyword.take(opts, [:verbose?, :actor, :authorize?])
engine_opts =
opts
|> Keyword.take([:verbose?, :actor, :authorize?])
|> Keyword.put(:transaction?, true)
action =
if is_atom(action) and not is_nil(action) do

View file

@ -11,7 +11,11 @@ defmodule Ash.Actions.Update do
attributes = Keyword.get(opts, :attributes, %{})
relationships = Keyword.get(opts, :relationships, %{})
side_load = opts[:side_load] || []
engine_opts = Keyword.take(opts, [:verbose?, :actor, :authorize?])
engine_opts =
opts
|> Keyword.take([:verbose?, :actor, :authorize?])
|> Keyword.put(:transaction?, true)
action =
if is_atom(action) and not is_nil(action) do

View file

@ -45,7 +45,7 @@ defmodule Ash.Engine do
end)
transaction_result =
maybe_transact(opts, requests, fn innermost_resource ->
maybe_transact(opts, api, fn innermost_resource ->
{local_requests, async_requests} = split_local_async_requests(requests)
opts =
@ -55,38 +55,7 @@ defmodule Ash.Engine do
|> Keyword.put(:runner_pid, self())
|> Keyword.put(:api, api)
if async_requests == [] do
case Runner.run(local_requests, opts[:verbose?]) do
%{errors: errors} = runner when errors == [] ->
runner
%{errors: errors} ->
if innermost_resource do
Ash.rollback(innermost_resource, errors)
else
{:error, errors}
end
end
else
Process.flag(:trap_exit, true)
{:ok, pid} = GenServer.start(__MODULE__, opts)
_ = Process.monitor(pid)
receive do
{:pid_info, pid_info} ->
case Runner.run(local_requests, opts[:verbose?], pid, pid_info) do
%{errors: errors} = runner when errors == [] ->
runner
%{errors: errors} ->
if innermost_resource do
Ash.rollback(innermost_resource, errors)
else
{:error, errors}
end
end
end
end
run_requests(async_requests, local_requests, opts, innermost_resource)
end)
case transaction_result do
@ -99,13 +68,57 @@ defmodule Ash.Engine do
end
end
defp maybe_transact(opts, requests, func) do
defp run_requests(async_requests, local_requests, opts, innermost_resource) do
if async_requests == [] do
run_and_return_or_rollback(local_requests, opts, innermost_resource)
else
Process.flag(:trap_exit, true)
{:ok, pid} = GenServer.start(__MODULE__, opts)
_ = Process.monitor(pid)
receive do
{:pid_info, pid_info} ->
run_and_return_or_rollback(
local_requests,
opts,
innermost_resource,
pid,
pid_info
)
end
end
end
defp run_and_return_or_rollback(
local_requests,
opts,
innermost_resource,
pid \\ nil,
pid_info \\ %{}
) do
case Runner.run(local_requests, opts[:verbose?], pid, pid_info) do
%{errors: errors} = runner when errors == [] ->
runner
%{errors: errors} ->
rollback_or_return(innermost_resource, errors)
end
end
defp rollback_or_return(innermost_resource, errors) do
if innermost_resource do
Ash.rollback(innermost_resource, errors)
else
{:error, errors}
end
end
defp maybe_transact(opts, api, func) do
if opts[:transaction?] do
resources =
requests
|> Enum.map(& &1.resource)
api
|> Ash.Api.resources()
|> Enum.filter(&Ash.data_layer_can?(&1, :transact))
|> Enum.uniq()
do_in_transaction(resources, func)
else

View file

@ -48,227 +48,126 @@ defmodule Ash.Filter do
end
end
def attach_other_data_layer_filters(filter, context) do
context
|> Map.get(:datalayer_filter)
|> Kernel.||(%{})
|> Map.values()
|> Enum.flat_map(&Map.values/1)
|> Enum.reduce_while({:ok, filter}, fn %{data: new_filter}, {:ok, filter} ->
case add_to_filter(filter, new_filter) do
{:ok, new_filter} -> {:cont, {:ok, new_filter}}
{:error, error} -> {:halt, {:error, error}}
end
end)
end
def run_other_data_layer_filters(resource, api, filter) do
reduce(filter, {:ok, filter}, fn
%Expression{op: :or}, {:ok, filter} ->
{:halt, {:ok, filter}}
def cross_datalayer_filter_requests(resource, api, filter, filter_requests, authorize?) do
map_reduce(filter, {[], true}, fn
%Expression{op: :or}, {requests, _} ->
{filter, requests, true}
%Predicate{} = expression, {requests, true} ->
%Predicate{} = expression, {:ok, filter} ->
expression
|> relationship_paths(:ands_only)
|> filter_paths_that_change_data_layers(resource)
|> Enum.uniq()
|> Enum.reduce({expression, requests}, fn path, {expression, requests} ->
{for_path, _without_path} = split_expression_by_relationship_path(expression, path)
|> Enum.reduce_while({:halt, {:ok, filter}}, fn path, {:halt, {:ok, filter}} ->
{for_path, without_path} = split_expression_by_relationship_path(filter, path)
relationship = Ash.relationship(resource, path)
query =
relationship.destination
|> api.query()
|> Map.put(:filter, %__MODULE__{
expression: for_path,
resource: Ash.related(resource, path),
api: api
})
|> Map.put(:filter, for_path)
{new_requests, replace_with_path} =
other_datalayer_requests(query, relationship, path, filter_requests, authorize?)
{Map.put(expression, :__replace__, replace_with_path), {requests ++ new_requests, true}}
add_other_datalayer_read_results(query, relationship, path, without_path)
end)
# TODO: this is an optimization. We can take all predicates that appear in a single nested `and` statement
# and run those together
# %Expression{op: :and} = expression, {requests, false} ->
# expression
# |> relationship_paths(:ands_only)
# |> filter_paths_that_change_data_layers(resource)
# |> Enum.reduce({filter, requests}, fn path, {filter, requests} ->
# {for_path, without_path} = split_expression_by_relationship_path(filter, path)
%Expression{op: :and} = expression, {:ok, filter} ->
expression
|> relationship_paths(:ands_only)
|> filter_paths_that_change_data_layers(resource)
|> Enum.reduce_while({:halt, {:ok, filter}}, fn path, {:halt, {:ok, filter}} ->
{for_path, without_path} = split_expression_by_relationship_path(filter, path)
# relationship = Ash.relationship(resource, path)
relationship = Ash.relationship(resource, path)
# query =
# relationship.destination
# |> api.query()
# |> Map.put(:filter, %__MODULE__{
# expression: for_path,
# resource: Ash.related(resource, path),
# api: api
# })
query =
relationship.destination
|> api.query()
|> Map.put(:filter, for_path)
# {new_requests, replace_with_path} =
# other_datalayer_requests(query, relationship, path, filter_requests, authorize?)
add_other_datalayer_read_results(query, relationship, path, without_path)
end)
# case without_path do
# end
# if is_map(without_path) do
# {Map.put(without_path, :__replace__, replace_with_path), {requests ++ new_requests, false}}
# end
# end)
expression, {requests, add_predicates?} ->
{expression, {requests, add_predicates?}}
_, {:ok, filter} ->
{:ok, filter}
end)
end
defp other_datalayer_requests(query, relationship, path, filter_requests, authorize?) do
# Using System.unique_integer is essentially just a hack here
# because there are multiple requests possible per path, adding
# a unique integer here ensures that they all get a unique path
request_path =
if relationship.type == :many_to_many do
[:datalayer_filter_join, System.unique_integer(), path]
else
[:datalayer_filter, System.unique_integer(), path]
end
defp add_other_datalayer_read_results(query, relationship, path, filter_without_path) do
case query.api.read(query) do
{:ok, results} ->
new_filter =
case relationship.type do
:many_to_many ->
many_to_many_read_results(results, relationship, query, path)
{requests, replace_with_path} =
if relationship.type == :many_to_many do
id = System.unique_integer()
_ ->
results
|> Enum.map(&Map.get(&1, relationship.destination_field))
|> Enum.reject(&is_nil/1)
|> record_filters_or_false(relationship)
|> put_at_path(:lists.droplast(path))
end
{[many_to_many_datalayer_request(query, relationship, path, request_path, id)],
[:datalayer_filter, id, path]}
else
{[], request_path}
end
case add_to_filter(filter_without_path, new_filter) do
{:ok, filter} -> {:cont, {:halt, {:ok, filter}}}
{:error, error} -> {:halt, {:return, {:error, error}}}
end
action = Ash.primary_action!(query.resource, :read)
query_path = request_path ++ [:query]
request =
Request.new(
resource: query.resource,
api: query.api,
query:
attach_other_datalayer_authorization_filter(query, path, filter_requests, authorize?),
path: request_path,
authorize?: false,
action: action,
name: "cross data layer filter: #{Enum.join(path, ",")}",
data:
Request.resolve([query_path], fn context ->
query = get_in(context, query_path)
if relationship.type == :many_to_many do
query.api.read(query)
else
case query.api.read(query) do
{:ok, results} ->
values =
results
|> Enum.map(&Map.get(&1, relationship.destination_field))
|> Enum.reject(&is_nil/1)
Predicate.new(
query.resource,
Ash.attribute(query.resource, relationship.destination_field),
In,
values,
:lists.droplast(path)
)
{:error, error} ->
{:error, error}
end
end
end)
)
{[request | requests], replace_with_path}
end
defp attach_other_datalayer_authorization_filter(query, _path, _filter_requests, false),
do: query
defp attach_other_datalayer_authorization_filter(query, _path, [], _), do: query
defp attach_other_datalayer_authorization_filter(query, path, filter_requests, true) do
if [:filter, path] in Enum.map(filter_requests, & &1.path) do
filter_dependency = [:filter, path, :authorization_filter]
Request.resolve([filter_dependency], fn context ->
authorization_filter = get_in(context, filter_dependency)
{:ok, Ash.Query.filter(query, authorization_filter)}
end)
else
query
{:error, error} ->
{:halt, {:return, {:error, error}}}
end
end
defp many_to_many_datalayer_request(query, relationship, path, many_to_many_request_path, id) do
action = Ash.primary_action!(relationship.through, :read)
dependency = many_to_many_request_path ++ [:data]
request_path = [:datalayer_filter, id, path]
query_dependency = request_path ++ [:query]
defp record_filters_or_false(records, relationship) do
case records do
[] ->
false
Request.new(
resource: relationship.through,
api: query.api,
path: request_path,
authorize?: false,
action: action,
name: "cross data layer `through` filter: #{Enum.join(path, ",")}",
query:
Request.resolve([dependency], fn context ->
data = get_in(context, dependency) || []
[value] ->
[{relationship.source_field, value}]
destination_values =
data
|> Enum.map(&Map.get(&1, relationship.destination_field))
|> Enum.reject(&is_nil/1)
values ->
[{relationship.source_field, [in: values]}]
end
end
{:ok,
relationship.through
|> query.api.query()
|> Ash.Query.filter([
{relationship.destination_field_on_join_table, [in: destination_values]}
])}
end),
data:
Request.resolve([query_dependency], fn context ->
query = get_in(context, query_dependency)
defp many_to_many_read_results(results, relationship, query, path) do
destination_values =
results
|> Enum.map(&Map.get(&1, relationship.destination_field))
|> Enum.reject(&is_nil/1)
case query.api.read(query) do
{:ok, results} ->
values =
results
|> Enum.map(&Map.get(&1, relationship.source_field_on_join_table))
|> Enum.reject(&is_nil/1)
join_query =
relationship.through
|> query.api.query()
|> Ash.Query.filter([
{relationship.destination_field_on_join_table, [in: destination_values]}
])
Predicate.new(
query.resource,
Ash.attribute(query.resource, relationship.destination_field),
In,
values,
:lists.droplast(path)
)
case query.api.read(join_query) do
{:ok, results} ->
results
|> Enum.map(&Map.get(&1, relationship.source_field_on_join_table))
|> Enum.reject(&is_nil/1)
|> case do
[] ->
false
{:error, error} ->
{:error, error}
end
end)
)
[value] ->
[{relationship.source_field, value}]
values ->
[{relationship.source_field, [in: values]}]
end
|> put_at_path(:lists.droplast(path))
{:error, error} ->
{:error, error}
end
end
defp filter_paths_that_change_data_layers(paths, resource, acc \\ [])
defp filter_paths_that_change_data_layers([], _resource, acc), do: Enum.uniq(acc)
defp filter_paths_that_change_data_layers([], _resource, acc), do: acc
defp filter_paths_that_change_data_layers([path | rest], resource, acc) do
case shortest_path_to_changed_data_layer(resource, path) do
@ -281,10 +180,6 @@ defmodule Ash.Filter do
end
end
def path_crosses_datalayer?(resource, path) do
shortest_path_to_changed_data_layer(resource, path) != :error
end
defp shortest_path_to_changed_data_layer(resource, path, acc \\ [])
defp shortest_path_to_changed_data_layer(_resource, [], _acc), do: :error
@ -292,14 +187,24 @@ defmodule Ash.Filter do
relationship = Ash.relationship(resource, relationship)
data_layer = Ash.data_layer(relationship.destination)
if Ash.data_layer_can?(resource, :join) && data_layer == Ash.data_layer(resource) &&
(relationship.type != :many_to_many ||
data_layer == Ash.data_layer(relationship.through)) do
shortest_path_to_changed_data_layer(relationship.destination, rest, [
relationship.name | acc
])
if relationship.type == :many_to_many do
if data_layer == Ash.data_layer(resource) &&
data_layer == Ash.data_layer(relationship.through) &&
Ash.data_layer_can?(resource, :join) do
shortest_path_to_changed_data_layer(relationship.destination, rest, [
relationship.name | acc
])
else
{:ok, Enum.reverse([relationship.name | acc])}
end
else
{:ok, Enum.reverse([relationship.name | acc])}
if data_layer == Ash.data_layer(resource) && Ash.data_layer_can?(resource, :join) do
shortest_path_to_changed_data_layer(relationship.destination, rest, [
relationship.name | acc
])
else
{:ok, Enum.reverse([relationship.name | acc])}
end
end
end
@ -453,72 +358,134 @@ defmodule Ash.Filter do
end
end
def reduce(filter, acc \\ nil, func) do
map_reduce(filter, acc, fn expression, acc ->
case func.(expression, acc) do
{:halt, acc} ->
{:halt, {expression, acc}}
def reduce(filter, acc \\ nil, func)
def reduce(%__MODULE__{expression: nil}, acc, _), do: acc
acc ->
{expression, acc}
end
end)
def reduce(%__MODULE__{expression: expression}, acc, func) do
case func.(expression, acc) do
{:halt, acc} ->
acc
{:return, value} ->
value
acc ->
case do_reduce(expression, acc, func) do
{:halt, acc} -> acc
{:return, value} -> value
acc -> acc
end
end
end
def map_reduce(filter, acc \\ nil, func)
def map_reduce(%__MODULE__{expression: nil} = filter, acc, _), do: {filter, acc}
def reduce(expression, acc, func) do
case func.(expression, acc) do
{:halt, acc} ->
acc
def map_reduce(%__MODULE__{expression: expression} = filter, acc, func) do
{expression, acc} = map_reduce(expression, acc, func)
{:return, value} ->
value
{%{filter | expression: expression}, acc}
acc ->
case do_reduce(expression, acc, func) do
{:halt, acc} -> acc
{:return, value} -> value
acc -> acc
end
end
end
def map_reduce(expression, acc, func) do
{expression, acc} = func.(expression, acc)
do_map_reduce(expression, acc, func)
end
def do_map_reduce(expression, acc, func) do
def do_reduce(expression, acc, func) do
case expression do
%Expression{left: left, right: right} = expression ->
{new_left, acc} = func.(left, acc)
{new_right, acc} = func.(right, acc)
%Expression{} = expression ->
do_reduce_expression(expression, acc, func)
{new_left, acc} = do_map_reduce(new_left, acc, func)
{new_right, acc} = do_map_reduce(new_right, acc, func)
%Not{expression: not_expr} ->
case func.(not_expr, acc) do
{:halt, acc} ->
acc
{%{expression | left: new_left, right: new_right}, acc}
{:return, value} ->
{:return, value}
%Not{expression: not_expr} = not_struct ->
{expression, acc} = func.(not_expr, acc)
{expression, acc} = do_map_reduce(expression, acc, func)
{%{not_struct | expression: expression}, acc}
acc ->
do_reduce(not_expr, acc, func)
end
{:return, value} ->
{:return, value}
{:halt, value} ->
{:halt, value}
other ->
func.(other, acc)
end
end
# defp split_expression_by_relationship_path(%{expression: expression} = filter, _path)
# when expression in [nil, true, false] do
# {filter, filter}
# end
defp do_reduce_expression(%Expression{left: left, right: right}, acc, func) do
case func.(right, acc) do
{:halt, acc} ->
case func.(left, acc) do
{:return, value} ->
{:return, value}
# defp split_expression_by_relationship_path(filter, path) do
# {for_path, without_path} = do_split_expression_by_relationship_path(filter.expression, path)
{:halt, acc} ->
acc
# {%__MODULE__{
# api: filter.api,
# resource: Ash.related(filter.resource, path),
# expression: for_path
# },
# %__MODULE__{
# api: filter.api,
# resource: filter.resource,
# expression: without_path
# }}
# end
acc ->
do_reduce(left, acc, func)
end
{:return, value} ->
{:return, value}
acc ->
continue_reduce(left, right, acc, func)
end
end
defp continue_reduce(left, right, acc, func) do
case func.(left, acc) do
{:halt, acc} ->
do_reduce(right, acc, func)
{:return, value} ->
{:return, value}
acc ->
case do_reduce(left, acc, func) do
{:halt, acc} ->
{:halt, acc}
{:return, acc} ->
{:return, acc}
acc ->
do_reduce(right, acc, func)
end
end
end
defp split_expression_by_relationship_path(%{expression: expression} = filter, _path)
when expression in [nil, true, false] do
{filter, filter}
end
defp split_expression_by_relationship_path(filter, path) do
{for_path, without_path} = do_split_expression_by_relationship_path(filter.expression, path)
{%__MODULE__{
api: filter.api,
resource: Ash.related(filter.resource, path),
expression: for_path
},
%__MODULE__{
api: filter.api,
resource: filter.resource,
expression: without_path
}}
end
defp filter_expression_by_relationship_path(filter, path) do
%__MODULE__{
@ -528,25 +495,26 @@ defmodule Ash.Filter do
}
end
defp split_expression_by_relationship_path(
defp do_split_expression_by_relationship_path(
%Expression{op: op, left: left, right: right},
path
) do
{new_for_path_left, new_without_path_left} = split_expression_by_relationship_path(left, path)
{new_for_path_left, new_without_path_left} =
do_split_expression_by_relationship_path(left, path)
{new_for_path_right, new_without_path_right} =
split_expression_by_relationship_path(right, path)
do_split_expression_by_relationship_path(right, path)
{Expression.new(op, new_for_path_left, new_for_path_right),
Expression.new(op, new_without_path_left, new_without_path_right)}
end
defp split_expression_by_relationship_path(%Not{expression: expression}, path) do
{new_for_path, new_without_path} = split_expression_by_relationship_path(expression, path)
defp do_split_expression_by_relationship_path(%Not{expression: expression}, path) do
{new_for_path, new_without_path} = do_split_expression_by_relationship_path(expression, path)
{Not.new(new_for_path), Not.new(new_without_path)}
end
defp split_expression_by_relationship_path(
defp do_split_expression_by_relationship_path(
%Predicate{relationship_path: predicate_path} = predicate,
path
) do

View file

@ -18,7 +18,7 @@ defmodule Ash.MixProject do
elixirc_paths: elixirc_paths(Mix.env()),
package: package(),
deps: deps(),
dialyzer: [plt_add_apps: [:mix]],
dialyzer: [plt_add_apps: [:mix, :mnesia]],
test_coverage: [tool: ExCoveralls],
preferred_cli_env: [
coveralls: :test,

View file

@ -1,6 +1,8 @@
defmodule Ash.Test.Filter.FilterInteractionTest do
use ExUnit.Case, async: false
alias Ash.DataLayer.Mnesia
defmodule Profile do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
@ -118,7 +120,7 @@ defmodule Ash.Test.Filter.FilterInteractionTest do
end
setup do
Ash.DataLayer.Mnesia.start(Api)
Mnesia.start(Api)
on_exit(fn ->
:mnesia.stop()
@ -171,8 +173,7 @@ defmodule Ash.Test.Filter.FilterInteractionTest do
post1 =
Api.create!(Post,
attributes: %{title: "one"},
relationships: %{related_posts: [post2]},
verbose?: true
relationships: %{related_posts: [post2]}
)
query =
@ -180,6 +181,8 @@ defmodule Ash.Test.Filter.FilterInteractionTest do
|> Api.query()
|> Ash.Query.filter(related_posts: [title: "two"])
post1 = Api.reload!(post1)
assert [^post1] = Api.read!(query)
end
end