mirror of
https://github.com/ash-project/ash.git
synced 2024-09-19 21:13:10 +12:00
improvement: add cascade update built in change (#1398)
--------- Co-authored-by: Alykhan Jetha <aj@marketcircle.com>
This commit is contained in:
parent
69b5e20b2f
commit
a43624c9f3
3 changed files with 609 additions and 0 deletions
|
@ -322,6 +322,46 @@ defmodule Ash.Resource.Change.Builtins do
|
|||
{Ash.Resource.Change.CascadeDestroy, Keyword.put(opts, :relationship, relationship)}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Cascade a resource's update action to a related resource's update action.
|
||||
|
||||
Adds an after-action hook that explicitly calls update on any records related
|
||||
via the named relationship. It will optimise for bulk updates where
|
||||
possible.
|
||||
|
||||
Allows you to copy fields from the arguments or changes to the destination,
|
||||
this way you can cascade a bunch of changes downstream.
|
||||
|
||||
> #### Beware database constraints {: .warning}
|
||||
>
|
||||
> Think carefully before using this change with data layers which enforce
|
||||
> referential integrity (ie PostgreSQL and SQLite) and you may need to defer
|
||||
> constraints for the relationship in question.
|
||||
>
|
||||
> See also:
|
||||
> 1. [`postgres.references.reference.deferrable` DSL](https://hexdocs.pm/ash_postgres/dsl-ashpostgres-datalayer.html#postgres-references-reference-deferrable)
|
||||
> 2. [`sqlite.references.reference.deferrable` DSL](https://hexdocs.pm/ash_sqlite/dsl-ashsqlite-datalayer.html#sqlite-references-reference-deferrable)
|
||||
> 3. [PostgreSQL's `SET CONSTRAINTS` documentation](https://www.postgresql.org/docs/current/sql-set-constraints.html)
|
||||
> 4. [SQLite's `PRAGMA defer_foreign_keys` documentation](https://www.sqlite.org/pragma.html#pragma_defer_foreign_keys)
|
||||
|
||||
> #### Cascading notifications {: .tip}
|
||||
>
|
||||
> By default notifications are disabled for the related updates. This is to avoid potentially sending a **lot** of notifications for high-cardinality relationships.
|
||||
|
||||
## Options
|
||||
|
||||
#{Ash.Resource.Change.CascadeUpdate.opt_schema() |> Keyword.delete(:resource) |> Spark.Options.docs()}
|
||||
|
||||
## Example
|
||||
|
||||
change cascade_update(:relationship1)
|
||||
change cascade_update(:relationship2, copy_inputs: [:field1, :field2])
|
||||
"""
|
||||
@spec cascade_update(relationship :: atom) :: Ash.Resource.Change.ref()
|
||||
def cascade_update(relationship, opts \\ []) do
|
||||
{Ash.Resource.Change.CascadeUpdate, Keyword.put(opts, :relationship, relationship)}
|
||||
end
|
||||
|
||||
@doc ~S"""
|
||||
Directly attach an `after_action` function to the current change.
|
||||
|
||||
|
|
277
lib/ash/resource/change/cascade_update.ex
Normal file
277
lib/ash/resource/change/cascade_update.ex
Normal file
|
@ -0,0 +1,277 @@
|
|||
defmodule Ash.Resource.Change.CascadeUpdate do
|
||||
@option_schema [
|
||||
relationship: [
|
||||
type: :atom,
|
||||
doc: "The name of the relationship to work on",
|
||||
required: true
|
||||
],
|
||||
action: [
|
||||
type: :atom,
|
||||
doc:
|
||||
"The name of the update action to call on the related resource. Uses the primary update by default.",
|
||||
required: false
|
||||
],
|
||||
copy_inputs: [
|
||||
type: {:list, :atom},
|
||||
doc:
|
||||
"A list of fields to copy & pass on to the downstream update. The source action cannot be atomic.",
|
||||
required: false,
|
||||
default: []
|
||||
],
|
||||
read_action: [
|
||||
type: :atom,
|
||||
doc:
|
||||
"The name of the read action to call on the related resource to find results to be updated",
|
||||
required: false
|
||||
],
|
||||
return_notifications?: [
|
||||
type: :boolean,
|
||||
doc: "Return notifications for all updated records?",
|
||||
required: false,
|
||||
default: false
|
||||
],
|
||||
domain: [
|
||||
type: {:spark, Ash.Domain},
|
||||
private?: true
|
||||
]
|
||||
]
|
||||
|
||||
option_schema = @option_schema
|
||||
|
||||
defmodule Opts do
|
||||
@moduledoc false
|
||||
use Spark.Options.Validator, schema: option_schema
|
||||
end
|
||||
|
||||
@moduledoc """
|
||||
Cascade a resource's update action to a related resource's update action.
|
||||
|
||||
Adds an after-action hook that explicitly calls update on any records related
|
||||
via the named relationship. It will optimise for bulk updates where
|
||||
possible.
|
||||
|
||||
Allows you to copy fields from the arguments or changes to the destination,
|
||||
this way you can cascade a bunch of changes downstream.
|
||||
|
||||
|
||||
> #### Beware database constraints {: .warning}
|
||||
>
|
||||
> Think carefully before using this change with data layers which enforce
|
||||
> referential integrity (ie PostgreSQL and SQLite) and you may need to defer
|
||||
> constraints for the relationship in question.
|
||||
>
|
||||
> See also:
|
||||
> 1. [`postgres.references.reference.deferrable` DSL](https://hexdocs.pm/ash_postgres/dsl-ashpostgres-datalayer.html#postgres-references-reference-deferrable)
|
||||
> 2. [`sqlite.references.reference.deferrable` DSL](https://hexdocs.pm/ash_sqlite/dsl-ashsqlite-datalayer.html#sqlite-references-reference-deferrable)
|
||||
> 3. [PostgreSQL's `SET CONSTRAINTS` documentation](https://www.postgresql.org/docs/current/sql-set-constraints.html)
|
||||
> 4. [SQLite's `PRAGMA defer_foreign_keys` documentation](https://www.sqlite.org/pragma.html#pragma_defer_foreign_keys)
|
||||
|
||||
> #### Cascading notifications {: .tip}
|
||||
>
|
||||
> By default notifications are disabled for the related updates. This is to avoid potentially sending a **lot** of notifications for high-cardinality relationships.
|
||||
|
||||
## Options
|
||||
|
||||
#{Opts.docs()}
|
||||
|
||||
## Example
|
||||
|
||||
change {Ash.Resource.Change.CascadeUpdate, relationship: :comments, action: :update_all, copy_inputs: [:name]}
|
||||
|
||||
or, equivalently using `Ash.Resource.Change.Builtins.cascade_update/2`:
|
||||
|
||||
change cascade_update(:comments, action: :update_all, copy_inputs: [:name])
|
||||
|
||||
"""
|
||||
use Ash.Resource.Change
|
||||
require Ash.Query
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def change(changeset, opts, context) do
|
||||
with {:ok, %Opts{} = opts} <- Opts.validate(opts),
|
||||
{:ok, opts} <- validate_relationship_and_action(opts, changeset) do
|
||||
Ash.Changeset.after_action(changeset, fn _changeset, result ->
|
||||
case {update_related(changeset, [result], opts, context), opts.return_notifications?} do
|
||||
{_, false} -> {:ok, result}
|
||||
{%{notifications: []}, true} -> {:ok, result}
|
||||
{%{notifications: notifications}, true} -> {:ok, result, notifications}
|
||||
end
|
||||
end)
|
||||
else
|
||||
{:error, reason} ->
|
||||
Ash.Changeset.add_error(changeset, reason)
|
||||
end
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def atomic(_, _, _), do: :ok
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def after_batch([{changeset, _} | _] = changesets_and_results, opts, context) do
|
||||
with {:ok, %Opts{} = opts} <- Opts.validate(opts),
|
||||
{:ok, opts} <- validate_relationship_and_action(opts, changeset) do
|
||||
records = Enum.map(changesets_and_results, &elem(&1, 1))
|
||||
result = Enum.map(records, &{:ok, &1})
|
||||
|
||||
case {update_related(changeset, records, opts, context), opts.return_notifications?} do
|
||||
{_, false} -> result
|
||||
{%{notifications: empty}, true} when empty in [nil, []] -> result
|
||||
{%{notifications: notifications}, true} -> Enum.concat(result, notifications)
|
||||
end
|
||||
else
|
||||
{:error, reason} -> [{:error, reason}]
|
||||
end
|
||||
end
|
||||
|
||||
@doc false
|
||||
@impl true
|
||||
def batch_callbacks?([], _, _), do: false
|
||||
def batch_callbacks?(_, _, _), do: true
|
||||
|
||||
@doc false
|
||||
def opt_schema, do: @option_schema
|
||||
|
||||
defp validate_relationship_and_action(opts, changeset) do
|
||||
relationship = Ash.Resource.Info.relationship(changeset.resource, opts.relationship)
|
||||
|
||||
destination_action =
|
||||
if relationship do
|
||||
if opts.action do
|
||||
Ash.Resource.Info.action(relationship.destination, opts.action)
|
||||
else
|
||||
Ash.Resource.Info.primary_action!(relationship.destination, :update)
|
||||
end
|
||||
end
|
||||
|
||||
is_update? =
|
||||
if destination_action != nil and destination_action.type == :update, do: true, else: false
|
||||
|
||||
case {relationship, is_update?} do
|
||||
{nil, _} ->
|
||||
{:error,
|
||||
Ash.Error.Changes.InvalidRelationship.exception(
|
||||
relationship: opts.relationship,
|
||||
message: "Relationship doesn't exist."
|
||||
)}
|
||||
|
||||
{_, false} ->
|
||||
{:error,
|
||||
Ash.Error.Invalid.NoSuchAction.exception(
|
||||
resource: relationship.destination,
|
||||
action: opts.action,
|
||||
update: :update
|
||||
)}
|
||||
|
||||
{_, _} ->
|
||||
{:ok,
|
||||
%{
|
||||
opts
|
||||
| action: destination_action,
|
||||
relationship: relationship,
|
||||
domain: relationship.domain || Ash.Resource.Info.domain(relationship.destination)
|
||||
}}
|
||||
end
|
||||
end
|
||||
|
||||
defp update_related(_, [], _, _), do: :ok
|
||||
|
||||
defp update_related(changeset, data, opts, context) do
|
||||
action = opts.action
|
||||
relationship = opts.relationship
|
||||
copies = opts.copy_inputs
|
||||
|
||||
inputs =
|
||||
Enum.reduce(copies, %{}, fn key, downstream_input ->
|
||||
with :error <- Map.fetch(changeset.params, key),
|
||||
:error <- Map.fetch(changeset.params, to_string(key)) do
|
||||
downstream_input
|
||||
else
|
||||
{:ok, value} ->
|
||||
Map.put(downstream_input, key, value)
|
||||
end
|
||||
end)
|
||||
|
||||
context_opts =
|
||||
context
|
||||
|> Ash.Context.to_opts(
|
||||
domain: opts.domain,
|
||||
return_errors?: true,
|
||||
strategy: [:stream, :atomic, :atomic_batches],
|
||||
skip_unknown_inputs: :*,
|
||||
return_notifications?: opts.return_notifications?
|
||||
)
|
||||
|
||||
case related_query(data, opts) do
|
||||
{:ok, query} ->
|
||||
Ash.bulk_update(query, action.name, inputs, context_opts)
|
||||
|
||||
:error ->
|
||||
data
|
||||
|> List.wrap()
|
||||
|> Ash.load!(
|
||||
[
|
||||
{relationship.name,
|
||||
Ash.Query.set_context(relationship.destination, %{cascade_update: true})}
|
||||
],
|
||||
authorize?: false
|
||||
)
|
||||
|> Enum.flat_map(fn record ->
|
||||
record
|
||||
|> Map.get(relationship.name)
|
||||
|> List.wrap()
|
||||
end)
|
||||
|> Ash.bulk_update!(
|
||||
action.name,
|
||||
inputs,
|
||||
Keyword.update(
|
||||
context_opts,
|
||||
:context,
|
||||
%{cascade_update: true},
|
||||
&Map.put(&1, :cascade_update, true)
|
||||
)
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
defp related_query(_records, opts) when opts.relationship.type == :many_to_many, do: :error
|
||||
|
||||
defp related_query(records, opts) do
|
||||
if Ash.Actions.Read.Relationships.has_parent_expr?(opts.relationship) do
|
||||
:error
|
||||
else
|
||||
related_query =
|
||||
if opts.read_action do
|
||||
Ash.Query.for_read(opts.relationship.destination, opts.read_action, %{})
|
||||
else
|
||||
Ash.Query.new(opts.relationship.destination)
|
||||
end
|
||||
|
||||
{:ok,
|
||||
Ash.Actions.Read.Relationships.related_query(
|
||||
opts.relationship.name,
|
||||
records,
|
||||
related_query,
|
||||
Ash.Query.new(opts.relationship.source)
|
||||
)
|
||||
|> elem(1)
|
||||
|> filter_by_keys(opts.relationship, records)}
|
||||
end
|
||||
end
|
||||
|
||||
defp filter_by_keys(query, %{no_attributes?: true}, _records) do
|
||||
query
|
||||
end
|
||||
|
||||
defp filter_by_keys(
|
||||
query,
|
||||
%{source_attribute: source_attribute, destination_attribute: destination_attribute},
|
||||
records
|
||||
) do
|
||||
source_values = Enum.map(records, &Map.get(&1, source_attribute))
|
||||
|
||||
Ash.Query.filter(query, ^ref(destination_attribute) in ^source_values)
|
||||
end
|
||||
end
|
292
test/resource/changes/cascade_update_test.exs
Normal file
292
test/resource/changes/cascade_update_test.exs
Normal file
|
@ -0,0 +1,292 @@
|
|||
defmodule Ash.Test.Resource.Change.CascadeUpdate do
|
||||
@moduledoc false
|
||||
use ExUnit.Case, async: true
|
||||
|
||||
alias Ash.Test.Domain
|
||||
alias Ash.Test.Resource.Change.CascadeUpdate, as: Test
|
||||
|
||||
defmodule Notifier do
|
||||
@moduledoc false
|
||||
use Ash.Notifier
|
||||
|
||||
def notify(notification) do
|
||||
if notification.action.name == :update do
|
||||
Agent.update(
|
||||
Test.Agent,
|
||||
&%{&1 | notifications: MapSet.put(&1.notifications, notification.data.id)}
|
||||
)
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
defmodule Author do
|
||||
@moduledoc false
|
||||
use Ash.Resource, domain: Domain, data_layer: Ash.DataLayer.Ets
|
||||
|
||||
attributes do
|
||||
uuid_primary_key :id
|
||||
attribute :name, :string, public?: true
|
||||
end
|
||||
|
||||
actions do
|
||||
default_accept :*
|
||||
defaults [:read, create: :*]
|
||||
|
||||
update :update do
|
||||
primary? true
|
||||
require_atomic? false
|
||||
skip_unknown_inputs [:content]
|
||||
|
||||
change cascade_update(:posts,
|
||||
copy_inputs: [:name, :content],
|
||||
return_notifications?: true
|
||||
)
|
||||
end
|
||||
|
||||
update :update_atomic do
|
||||
change cascade_update(:posts,
|
||||
copy_inputs: [:name]
|
||||
)
|
||||
end
|
||||
|
||||
update :wrong_relationship_cascade do
|
||||
change cascade_update(:postssss,
|
||||
copy_inputs: [:name]
|
||||
)
|
||||
end
|
||||
|
||||
update :no_notification_update do
|
||||
change cascade_update(:posts,
|
||||
return_notifications?: true,
|
||||
action: :no_notification_update
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
relationships do
|
||||
has_many :posts, Test.Post, public?: true
|
||||
end
|
||||
|
||||
code_interface do
|
||||
define :create
|
||||
define :update
|
||||
define :no_notification_update
|
||||
define :read
|
||||
end
|
||||
end
|
||||
|
||||
defmodule Post do
|
||||
@moduledoc false
|
||||
use Ash.Resource, domain: Domain, data_layer: Ash.DataLayer.Ets, notifiers: [Test.Notifier]
|
||||
|
||||
attributes do
|
||||
uuid_primary_key :id
|
||||
attribute :name, :string, public?: true
|
||||
end
|
||||
|
||||
actions do
|
||||
default_accept :*
|
||||
defaults [:read, create: :*]
|
||||
|
||||
update :update do
|
||||
primary? true
|
||||
require_atomic? false
|
||||
skip_unknown_inputs [:content]
|
||||
|
||||
change cascade_update(:comments,
|
||||
copy_inputs: [:name]
|
||||
)
|
||||
|
||||
change cascade_update(:summary,
|
||||
copy_inputs: [:name, :content]
|
||||
)
|
||||
|
||||
change before_action(fn changeset, _ ->
|
||||
Agent.update(
|
||||
Test.Agent,
|
||||
&%{&1 | updates: MapSet.put(&1.updates, changeset.data.id)}
|
||||
)
|
||||
|
||||
changeset
|
||||
end)
|
||||
end
|
||||
|
||||
update :no_notification_update do
|
||||
end
|
||||
end
|
||||
|
||||
relationships do
|
||||
has_many :comments, Test.Comment, public?: true
|
||||
has_one :summary, Test.Summary, public?: true
|
||||
belongs_to :author, Test.Author, public?: true, attribute_writable?: true
|
||||
end
|
||||
|
||||
code_interface do
|
||||
define :create
|
||||
define :update
|
||||
define :read
|
||||
end
|
||||
end
|
||||
|
||||
defmodule Comment do
|
||||
@moduledoc false
|
||||
use Ash.Resource, domain: Domain, data_layer: Ash.DataLayer.Ets
|
||||
|
||||
actions do
|
||||
default_accept :*
|
||||
defaults [:read, create: :*]
|
||||
|
||||
update :update do
|
||||
primary? true
|
||||
end
|
||||
end
|
||||
|
||||
attributes do
|
||||
uuid_primary_key :id
|
||||
attribute :name, :string, public?: true
|
||||
end
|
||||
|
||||
relationships do
|
||||
belongs_to :post, Test.Post, public?: true, attribute_writable?: true
|
||||
end
|
||||
|
||||
code_interface do
|
||||
define :create
|
||||
define :update
|
||||
define :read
|
||||
end
|
||||
end
|
||||
|
||||
defmodule Summary do
|
||||
@moduledoc false
|
||||
use Ash.Resource, domain: Domain, data_layer: Ash.DataLayer.Ets
|
||||
|
||||
actions do
|
||||
default_accept :*
|
||||
defaults [:read, create: :*]
|
||||
|
||||
update :update do
|
||||
primary? true
|
||||
end
|
||||
end
|
||||
|
||||
attributes do
|
||||
uuid_primary_key :id
|
||||
attribute :name, :string, public?: true
|
||||
attribute :content, :string, public?: true
|
||||
end
|
||||
|
||||
relationships do
|
||||
belongs_to :post, Test.Post, public?: true, attribute_writable?: true
|
||||
end
|
||||
|
||||
code_interface do
|
||||
define :create
|
||||
define :update
|
||||
define :read
|
||||
end
|
||||
end
|
||||
|
||||
setup do
|
||||
{:ok, pid} =
|
||||
start_supervised({Agent, fn -> %{updates: MapSet.new(), notifications: MapSet.new()} end})
|
||||
|
||||
Process.register(pid, Test.Agent)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "when updating an author with name & content, all downstream relationships are also updated" do
|
||||
author = Author.create!(%{})
|
||||
post1 = Post.create!(%{author_id: author.id})
|
||||
post2 = Post.create!(%{author_id: author.id})
|
||||
|
||||
comment1 = Comment.create!(%{post_id: post1.id})
|
||||
comment2 = Comment.create!(%{post_id: post1.id})
|
||||
|
||||
summary = Summary.create!(%{post_id: post1.id})
|
||||
|
||||
name = "Ash Framework"
|
||||
content = "Is great!"
|
||||
|
||||
# Cascade the values down the chain...
|
||||
author |> Ash.update!(%{name: name, content: content})
|
||||
|
||||
a = Ash.get!(Author, author.id)
|
||||
p = Ash.get!(Post, post1.id)
|
||||
p2 = Ash.get!(Post, post2.id)
|
||||
c1 = Ash.get!(Comment, comment1.id)
|
||||
c2 = Ash.get!(Comment, comment2.id)
|
||||
s1 = Ash.get!(Summary, summary.id)
|
||||
|
||||
assert ^name = a.name
|
||||
assert ^name = p.name
|
||||
assert ^name = p2.name
|
||||
assert ^name = c1.name
|
||||
assert ^name = c2.name
|
||||
assert ^name = s1.name
|
||||
assert ^content = s1.content
|
||||
end
|
||||
|
||||
test "wrong relationship in cascade should error" do
|
||||
author = Author.create!(%{})
|
||||
Post.create!(%{author_id: author.id})
|
||||
|
||||
catch_error(
|
||||
author
|
||||
|> Ash.Changeset.for_update(:wrong_relationship_cascade, %{name: "Ash"})
|
||||
|> Ash.update!()
|
||||
)
|
||||
end
|
||||
|
||||
test "updated records are notified" do
|
||||
author = Author.create!(%{})
|
||||
|
||||
post_ids =
|
||||
1..Enum.random(3..25)
|
||||
|> Enum.map(fn _ -> Post.create!(%{author_id: author.id}) end)
|
||||
|> MapSet.new(& &1.id)
|
||||
|
||||
name = "Ash Framework"
|
||||
author |> Ash.update!(%{name: name})
|
||||
|
||||
notified_ids = Agent.get(Test.Agent, & &1.notifications)
|
||||
|
||||
assert MapSet.equal?(post_ids, notified_ids)
|
||||
end
|
||||
|
||||
test "does not error when notifications are requested but none are returned - bulk" do
|
||||
author = Author.create!(%{})
|
||||
|
||||
1..Enum.random(3..5)
|
||||
|> Enum.map(fn _ -> Post.create!(%{author_id: author.id}) end)
|
||||
|
||||
Ash.bulk_update!([author], :no_notification_update, %{name: "Ash Framework"})
|
||||
|
||||
assert [_ | _] = Post.read!()
|
||||
assert [_ | _] = Author.read!()
|
||||
end
|
||||
|
||||
test "does not error when notifications are requested but none are returned - single" do
|
||||
author = Author.create!(%{})
|
||||
|
||||
1..Enum.random(3..5)
|
||||
|> Enum.map(fn _ -> Post.create!(%{author_id: author.id}) end)
|
||||
|
||||
Author.no_notification_update!(author, %{name: "Ash Framework"})
|
||||
|
||||
assert [_ | _] = Post.read!()
|
||||
assert [_ | _] = Author.read!()
|
||||
end
|
||||
|
||||
test "does not error when there is nothing to cascade update" do
|
||||
author = Author.create!(%{})
|
||||
name = "Ash Framework"
|
||||
Author.update!(author, %{name: name})
|
||||
|
||||
a2 = Ash.get!(Author, author.id)
|
||||
assert ^name = a2.name
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue