feat(Ash.Reactor): Add a Reactor extension that makes working with resources easy. (#683)

* feat: Add `Ash.Reactor` with create support.

* improvement: Add `Ash.Reactor` update support.

* improvement: Add `Ash.Reactor` destroy support.

* improvement(Ash.Reactor): Support for transactional handling of notifications.

* improvement(Ash.Reactor): Add `read` and `get` steps.

* docs: Add the beginnings of a Reactor topic.

* improvement(Ash.Reactor): add support for generic actions.

* improvement: Add `undo` capability to `create` step.

* improvement: transaction and undo working.

* docs: Start documenting undo behaviour.

* chore: Update to Reactor 0.6.

* improvement: Automatically thread Ash tracers through Reactor.

* improvement(Ash.Reactor): Add undo to generic actions.

* docs: Improve reactor documentation.

* fix: Mimic copying `Ash.Notifier` seems to break the compiler for some reason.
This commit is contained in:
James Harton 2024-03-02 10:26:25 +13:00 committed by GitHub
parent e328653867
commit 86d74ed789
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
50 changed files with 5612 additions and 8 deletions

View file

@ -64,6 +64,7 @@ spark_locals_without_parens = [
default: 1, default: 1,
default_accept: 1, default_accept: 1,
default_access_type: 1, default_access_type: 1,
default_api: 1,
default_context: 1, default_context: 1,
default_limit: 1, default_limit: 1,
defaults: 1, defaults: 1,
@ -244,7 +245,9 @@ spark_locals_without_parens = [
] ]
[ [
import_deps: [:spark, :reactor],
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
plugins: [Spark.Formatter],
locals_without_parens: spark_locals_without_parens, locals_without_parens: spark_locals_without_parens,
export: [ export: [
locals_without_parens: spark_locals_without_parens locals_without_parens: spark_locals_without_parens

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,157 @@
# Reactor
`Ash.Reactor` is an extension for [`Reactor`](https://github.com/ash-project/reactor) which adds explicit support for interacting with resources via their defined actions.
See [Getting started with Reactor](https://hexdocs.pm/reactor/getting-started-with-reactor.html) for more information about Reactor.
## Usage
You can either add the `Ash.Reactor` extension to your existing reactors eg:
```elixir
defmodule MyExistingReactor do
use Reactor, extensions: [Ash.Reactor]
end
```
or for your convenience you can use `use Ash.Reactor` which expands to exactly the same as above.
## Example
An example is worth 1000 words of prose:
```elixir
defmodule ExampleReactor do
use Ash.Reactor
ash do
default_api ExampleApi
end
input :customer_name
input :customer_email
input :plan_name
input :payment_nonce
create :create_customer, Customer do
inputs %{name: input(:customer_name), email: input(:customer_email)}
end
get :get_plan, Plan, :get_plan_by_name do
inputs %{name: input(:plan_name)}
fail_on_not_found? true
end
action :take_payment, PaymentProvider do
inputs %{
nonce: input(:payment_nonce),
amount: result(:get_plan, [:price])
}
end
create :subscription, Subscription do
inputs %{
plan_id: result(:get_plan, [:id]),
payment_provider_id: result(:take_payment, :id)
}
end
end
```
## Actions
For each action type there is a corresponding step DSL, which needs a name (used
to refer to the result of the step by other steps), a resource and optional
action name (defaults to the primary action if one is not provided).
Actions have several common options and some specific to their particular type.
See the [DSL documentation](dsl-ash-reactor.html) for
details.
### Action inputs
Ash actions take a map of input parameters which are usually a combination of
resource attributes and action arguments. You can provide these values as a
single map using the [`inputs` DSL entity](dsl-ash-reactor.html#reactor-action-inputs) with a map or keyword list which refers to Reactor inputs, results and hard-coded values via Reactor's [predefined template functions](https://hexdocs.pm/reactor/Reactor.Dsl.Argument.html#functions).
For action types that act on a specific resource (ie `update` and `destroy`) you can provide the value using the [`initial` DSL option](dsl-ash-reactor.html#reactor-update-initial).
#### Example
```elixir
input :blog_title
input :blog_body
input :author_email
read :get_author, MyBlog.Author, :get_author_by_email do
inputs %{email: input(:author_email)}
end
create :create_post, MyBlog.Post, :create do
inputs %{
title: input(:blog, [:title]),
body: input(:blog, [:body]),
author_id: result(:get_author, [:email])
}
end
update :author_post_count, MyBlog.Author, :update_post_count do
wait_for :create_post
initial result(:get_author)
end
return :create_post
```
## Handling failure.
Reactor is a saga executor, which means that when failure occurs it tries to
clean up any intermediate state left behind. By default the `create`, `update`
and `destroy` steps do not specify any behaviour for what to do when there is a
failure downstream in the reactor. This can be changed by providing both an
`undo_action` and changing the step's `undo` option to either
`:outside_transaction` or `:always` depending on your resource and datalayer
semantics.
### The `undo` option.
- `:never` - this is the default, and means that the reactor will never try and
undo the action's work. This is the most performant option, as it means that
the reactor doesn't need to store as many intermediate values.
- `:outside_transaction` - this option allows the step to decide at runtime
whether it should support undo based on whether the action is being run within
a transaction. If it is, then no undo is required because the transaction
will rollback.
- `:always` - this forces the step to always undo it's work on failure.
### Transactions
You can use the `transaction` step type to wrap a group of steps inside a data layer transaction, however the following caveats apply:
- All steps inside a transaction must happen in the same process, so the steps
inside the transaction will only ever be executed synchronously.
- Notifications will be sent only when the transaction is committed.
## Notifications
Because a reactor has transaction-like semantics notifications are automatically batched and only sent upon successful completion of the reactor.
## Running Reactors as an action
Currently the best way to expose a Reactor as an action is to use a [Generic Action](actions.html#generic-actions).
### Example
```elixir
action :run_reactor, :struct do
constraints instance_of: MyBlog.Post
argument :blog_title, :string, allow_nil?: false
argument :blog_body, :string, allow_nil?: false
argument :author_email, :ci_string, allow_nil?: false
run fn input, _context ->
Reactor.run(MyBlog.CreatePostReactor, input.arguments)
end
end
```

View file

@ -0,0 +1,73 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Action do
@moduledoc false
alias Ash.{Reactor.ActionStep, Resource.Info}
alias Reactor.Builder
alias Spark.{Dsl.Transformer, Error.DslError}
import Ash.Reactor.BuilderUtils
@doc false
@impl true
def build(action, reactor) do
with {:ok, reactor} <- ensure_hooked(reactor),
{:ok, reactor, arguments} <- build_input_arguments(reactor, action) do
arguments =
arguments
|> maybe_append(action.actor)
|> maybe_append(action.tenant)
|> Enum.concat(action.wait_for)
action_options =
action
|> Map.take([:action, :api, :authorize?, :resource, :undo_action, :undo])
|> Enum.to_list()
step_options =
action
|> Map.take([:async?])
|> Map.put(:ref, :step_name)
|> Enum.to_list()
Builder.add_step(
reactor,
action.name,
{ActionStep, action_options},
arguments,
step_options
)
end
end
@doc false
@impl true
def transform(_action, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(action, _dsl_state) when action.undo == :never, do: :ok
def verify(action, dsl_state) do
case Info.action(action.resource, action.undo_action) do
action when is_struct(action, Ash.Resource.Actions.Action) ->
:ok
nil ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:action, :undo_action],
message:
"No action found matching the name `#{action.undo_action}` on resource `#{inspect(action.resource)}`"
)}
_action ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:action, :undo_action],
message: "The undo action for a generic action step should also be a generic action."
)}
end
end
end

View file

@ -0,0 +1,6 @@
defimpl Reactor.Argument.Build, for: Ash.Reactor.Dsl.Actor do
@doc false
@impl true
def build(actor),
do: {:ok, [%Reactor.Argument{name: :actor, source: actor.source, transform: actor.transform}]}
end

View file

@ -0,0 +1,82 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Create do
@moduledoc false
alias Ash.Reactor.CreateStep
alias Ash.Resource.Info
alias Reactor.Builder
alias Spark.{Dsl.Transformer, Error.DslError}
import Ash.Reactor.BuilderUtils
@doc false
@impl true
def build(create, reactor) do
with {:ok, reactor} <- ensure_hooked(reactor),
{:ok, reactor, arguments} <- build_input_arguments(reactor, create) do
arguments =
arguments
|> maybe_append(create.actor)
|> maybe_append(create.tenant)
|> Enum.concat(create.wait_for)
action_options =
create
|> Map.take([
:action,
:api,
:authorize?,
:resource,
:undo_action,
:undo,
:upsert_identity,
:upsert?
])
|> Enum.to_list()
step_options =
create
|> Map.take([:async?])
|> Map.put(:ref, :step_name)
|> Enum.to_list()
Builder.add_step(
reactor,
create.name,
{CreateStep, action_options},
arguments,
step_options
)
end
end
@doc false
@impl true
def transform(_create, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(create, _dsl_state) when create.undo == :never, do: :ok
def verify(create, dsl_state) do
case Info.action(create.resource, create.undo_action) do
action when is_struct(action, Ash.Resource.Actions.Destroy) ->
:ok
nil ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:create, :undo_action],
message:
"No action found matching the name `#{create.undo_action}` on resource `#{inspect(create.resource)}`."
)}
_action ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:create, :undo_action],
message: "The undo action for a create step should be a destroy."
)}
end
end
end

View file

@ -0,0 +1,104 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Destroy do
@moduledoc false
alias Ash.Reactor.DestroyStep
alias Ash.Resource.Info
alias Reactor.{Argument, Builder}
alias Spark.{Dsl.Transformer, Error.DslError}
import Ash.Reactor.BuilderUtils
@doc false
@impl true
def build(destroy, reactor) do
with {:ok, reactor} <- ensure_hooked(reactor),
{:ok, reactor, arguments} <- build_input_arguments(reactor, destroy) do
arguments =
arguments
|> maybe_append(destroy.actor)
|> maybe_append(destroy.tenant)
|> Enum.concat(destroy.wait_for)
|> Enum.concat([%Argument{name: :initial, source: destroy.initial}])
action_options =
destroy
|> Map.take([
:action,
:api,
:authorize?,
:resource,
:return_destroyed?,
:undo,
:undo_action
])
|> Enum.to_list()
step_options =
destroy
|> Map.take([:async?])
|> Map.put(:ref, :step_name)
|> Enum.to_list()
Builder.add_step(
reactor,
destroy.name,
{DestroyStep, action_options},
arguments,
step_options
)
end
end
@doc false
@impl true
def transform(_destroy, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(destroy, dsl_state) do
verify_undo(destroy, dsl_state)
end
def verify_undo(destroy, _dsl_state) when destroy.undo == :never, do: :ok
def verify_undo(destroy, dsl_state) do
with :ok <- verify_returning_destroyed?(destroy, dsl_state) do
verify_undo_action(destroy, dsl_state)
end
end
defp verify_returning_destroyed?(destroy, _dsl_state) when destroy.return_destroyed? == true,
do: :ok
defp verify_returning_destroyed?(_destroy, dsl_state) do
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:destroy, :return_destroyed?],
message: "`return_destroyed?` must be true when undo is enabled."
)}
end
defp verify_undo_action(destroy, dsl_state) do
case Info.action(destroy.resource, destroy.undo_action) do
action when is_struct(action, Ash.Resource.Actions.Create) ->
:ok
nil ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:destroy, :undo_action],
message:
"No action found matching the name `#{destroy.undo_action}` on resource `#{inspect(destroy.resource)}`."
)}
_action ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:destroy, :undo_action],
message: "The undo action for a destroy step should be a create."
)}
end
end
end

View file

@ -0,0 +1,44 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Get do
alias Ash.Reactor.GetStep
alias Reactor.Builder
import Ash.Reactor.BuilderUtils
@doc false
@impl true
def build(get, reactor) do
with {:ok, reactor, arguments} <- build_input_arguments(reactor, get) do
arguments =
arguments
|> maybe_append(get.actor)
|> maybe_append(get.tenant)
|> Enum.concat(get.wait_for)
action_options =
get
|> Map.take([:action, :api, :authorize?, :resource, :fail_on_not_found?])
|> Enum.to_list()
step_options =
get
|> Map.take([:async?])
|> Map.put(:ref, :step_name)
|> Enum.to_list()
Builder.add_step(
reactor,
get.name,
{GetStep, action_options},
arguments,
step_options
)
end
end
@doc false
@impl true
def transform(_create, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(_create, _dsl_state), do: :ok
end

View file

@ -0,0 +1,44 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Read do
alias Ash.Reactor.ReadStep
alias Reactor.Builder
import Ash.Reactor.BuilderUtils
@doc false
@impl true
def build(read, reactor) do
with {:ok, reactor, arguments} <- build_input_arguments(reactor, read) do
arguments =
arguments
|> maybe_append(read.actor)
|> maybe_append(read.tenant)
|> Enum.concat(read.wait_for)
action_options =
read
|> Map.take([:action, :api, :authorize?, :resource])
|> Enum.to_list()
step_options =
read
|> Map.take([:async?])
|> Map.put(:ref, :step_name)
|> Enum.to_list()
Builder.add_step(
reactor,
read.name,
{ReadStep, action_options},
arguments,
step_options
)
end
end
@doc false
@impl true
def transform(_create, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(_create, _dsl_state), do: :ok
end

View file

@ -0,0 +1,8 @@
defimpl Reactor.Argument.Build, for: Ash.Reactor.Dsl.Tenant do
@doc false
@impl true
def build(tenant),
do:
{:ok,
[%Reactor.Argument{name: :tenant, source: tenant.source, transform: tenant.transform}]}
end

View file

@ -0,0 +1,149 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Transaction do
@moduledoc false
alias Reactor.{Builder, Dsl.Build}
import Ash.Reactor.BuilderUtils
import Reactor.Utils
@doc false
@impl true
def build(transaction, reactor) do
sub_reactor = Builder.new({Ash.Reactor.TransactionStep, transaction.name})
# force the sub-reactor to hot be hooked.
sub_reactor = %{sub_reactor | context: Map.put(sub_reactor.context, :__ash_hooked__, true)}
with {:ok, reactor} <- ensure_hooked(reactor),
{:ok, sub_reactor} <- build_nested_steps(sub_reactor, transaction.steps),
{:ok, sources} <- extract_inner_sources(sub_reactor),
{:ok, arguments} <- build_transaction_arguments(sources),
{:ok, sub_reactor} <- build_sub_reactor_inputs(sub_reactor, arguments),
{:ok, sub_reactor} <- maybe_add_return(sub_reactor, transaction),
{:ok, sub_reactor} <- Reactor.Planner.plan(sub_reactor) do
arguments =
transaction.wait_for
|> Enum.concat(arguments)
transaction_options =
[
sub_reactor: sub_reactor,
resources: transaction.resources,
timeout: transaction.timeout
]
Builder.add_step(
reactor,
transaction.name,
{Ash.Reactor.TransactionStep, transaction_options},
arguments,
async?: false,
ref: :step_name,
max_retries: 0
)
end
end
@doc false
@impl true
def transform(_transaction, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(transaction, _dsl_state) do
transaction.resources
|> Enum.reject(&Ash.DataLayer.data_layer_can?(&1, :transact))
|> case do
[] ->
:ok
[resource] ->
raise ArgumentError, "The `#{inspect(resource)}` resource does not support transactions."
resources ->
resources = Enum.map_join(resources, ", ", &"`#{inspect(&1)}`")
raise ArgumentError, "The following resources do not support transactions: #{resources}."
end
end
defp build_nested_steps(sub_reactor, steps),
do: reduce_while_ok(steps, sub_reactor, &Build.build/2)
# Iterates the nested steps and returns any argument sources which are not
# transaction local.
defp extract_inner_sources(sub_reactor) do
inside_step_names =
sub_reactor.steps
|> Enum.map(& &1.name)
|> MapSet.new()
sources_to_raise =
sub_reactor.steps
|> Enum.flat_map(& &1.arguments)
|> Enum.map(& &1.source)
|> Enum.filter(fn
source when is_struct(source, Reactor.Template.Input) ->
true
source when is_struct(source, Reactor.Template.Result) ->
!MapSet.member?(inside_step_names, source.name)
_ ->
false
end)
|> Enum.map(fn
source when is_map_key(source, :sub_path) -> %{source | sub_path: []}
source -> source
end)
|> Enum.uniq()
{:ok, sources_to_raise}
end
# sobelow_skip ["DOS.StringToAtom"]
defp build_transaction_arguments(sources) do
transaction_arguments =
sources
|> Enum.with_index()
|> Enum.map(fn
{source, _idx} when is_struct(source, Reactor.Template.Input) ->
%Reactor.Argument{name: source.name, source: source}
{source, idx} when is_struct(source, Reactor.Template.Result) ->
%Reactor.Argument{name: String.to_atom("result_#{idx}"), source: source}
end)
{:ok, transaction_arguments}
end
defp build_sub_reactor_inputs(sub_reactor, arguments) do
arguments
|> reduce_while_ok(sub_reactor, fn
argument, sub_reactor when is_struct(argument.source, Reactor.Template.Input) ->
Builder.add_input(sub_reactor, argument.name)
argument, sub_reactor when is_struct(argument.source, Reactor.Template.Result) ->
with {:ok, sub_reactor} <- Builder.add_input(sub_reactor, argument.name) do
Builder.add_step(
sub_reactor,
argument.source.name,
{Reactor.Step.ReturnArgument, argument: argument.name},
[argument],
ref: :step_name,
async?: false
)
end
end)
end
defp maybe_add_return(sub_reactor, transaction) when is_nil(transaction.return) do
last_step_name =
transaction.steps
|> Enum.map(& &1.name)
|> List.last()
Builder.return(sub_reactor, last_step_name)
end
defp maybe_add_return(sub_reactor, transaction),
do: Builder.return(sub_reactor, transaction.return)
end

View file

@ -0,0 +1,74 @@
defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.Update do
@moduledoc false
alias Ash.Reactor.UpdateStep
alias Ash.Resource.Info
alias Reactor.{Argument, Builder}
alias Spark.{Dsl.Transformer, Error.DslError}
import Ash.Reactor.BuilderUtils
@doc false
@impl true
def build(update, reactor) do
with {:ok, reactor} <- ensure_hooked(reactor),
{:ok, reactor, arguments} <- build_input_arguments(reactor, update) do
arguments =
arguments
|> maybe_append(update.actor)
|> maybe_append(update.tenant)
|> Enum.concat(update.wait_for)
|> Enum.concat([%Argument{name: :initial, source: update.initial}])
action_options =
update
|> Map.take([:action, :api, :authorize?, :resource, :undo, :undo_action])
|> Enum.to_list()
step_options =
update
|> Map.take([:async?])
|> Map.put(:ref, :step_name)
|> Enum.to_list()
Builder.add_step(
reactor,
update.name,
{UpdateStep, action_options},
arguments,
step_options
)
end
end
@doc false
@impl true
def transform(_update, dsl_state), do: {:ok, dsl_state}
@doc false
@impl true
def verify(update, _dsl_state) when update.undo == :never, do: :ok
def verify(update, dsl_state) do
case Info.action(update.resource, update.undo_action) do
action when is_struct(action, Ash.Resource.Actions.Update) ->
:ok
nil ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:update, :undo_action],
message:
"No action found matching the name `#{update.undo_action}` on resource `#{inspect(update.resource)}`."
)}
_action ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:update, :undo_action],
message: "The undo action for an update step should also be an update."
)}
end
end
end

View file

@ -0,0 +1,72 @@
defmodule Ash.Reactor.BuilderUtils do
@moduledoc false
alias Ash.Reactor.MergeInputsStep
alias Reactor.{Argument, Builder, Step.ReturnAllArguments, Template}
@doc false
@spec build_input_arguments(Reactor.t(), Ash.Reactor.action()) ::
{:ok, Reactor.t(), Enum.t()} | {:error, any}
def build_input_arguments(reactor, action) when action.inputs == [],
do: {:ok, reactor, [%Argument{name: :input, source: %Template.Value{value: %{}}}]}
def build_input_arguments(reactor, action) do
Enum.reduce_while(action.inputs, {:ok, reactor, []}, fn input, {:ok, reactor, result_names} ->
arguments = Enum.map(input.template, &%Argument{name: elem(&1, 0), source: elem(&1, 1)})
name = {:__input__, action.name, Map.keys(input.template)}
case Builder.add_step(reactor, name, ReturnAllArguments, arguments,
transform: input.transform,
ref: :step_name
) do
{:ok, reactor} -> {:cont, {:ok, reactor, [name | result_names]}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
|> case do
{:error, reason} ->
{:error, reason}
{:ok, reactor, []} ->
{:ok, reactor, []}
{:ok, reactor, [result_name]} ->
{:ok, reactor, [input: {:result, result_name}]}
{:ok, reactor, result_names} ->
arguments =
result_names
|> Enum.with_index()
|> Enum.map(&{input_prepend_atom(elem(&1, 1)), {:result, elem(&1, 0)}})
name = {:__input__, action.name, :__merge__, result_names}
case Builder.add_step(reactor, name, MergeInputsStep, arguments, ref: :step_name) do
{:ok, reactor} -> {:ok, reactor, [input: {:result, name}]}
{:error, reason} -> {:error, reason}
end
end
end
# sobelow_skip ["DOS.StringToAtom"]
defp input_prepend_atom(value) do
"input_#{value}"
|> String.to_atom()
end
@doc false
@spec maybe_append(Enum.t(), nil | any) :: Enum.t()
def maybe_append(lhs, nil), do: lhs
def maybe_append(lhs, rhs), do: Enum.concat(lhs, [rhs])
@doc false
@spec ensure_hooked(Reactor.t()) :: {:ok, Reactor.t()} | {:error, any}
def ensure_hooked(reactor) when is_map_key(reactor.context, :__ash_hooked__),
do: {:ok, reactor}
def ensure_hooked(reactor) do
with {:ok, reactor} <- Reactor.Builder.ensure_middleware(reactor, Ash.Reactor.Notifications) do
{:ok, %{reactor | context: Map.put(reactor.context, :__ash_hooked__, true)}}
end
end
end

View file

@ -0,0 +1,131 @@
defmodule Ash.Reactor.Dsl.Action do
@moduledoc """
The `action` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
action: nil,
action_step?: true,
actor: [],
api: nil,
async?: true,
authorize?: nil,
description: nil,
inputs: [],
name: nil,
resource: nil,
tenant: [],
transform: nil,
type: :action,
undo_action: nil,
undo: :never,
wait_for: []
@type t :: %__MODULE__{
__identifier__: any,
action: atom,
action_step?: true,
actor: [Ash.Reactor.Dsl.Actor.t()],
api: Ash.Api.t(),
async?: boolean,
authorize?: boolean | nil,
description: String.t() | nil,
name: atom,
inputs: [Ash.Reactor.Dsl.Inputs.t()],
resource: module,
tenant: [Ash.Reactor.Dsl.Tenant.t()],
type: :action,
undo_action: atom,
undo: :always | :never | :outside_transaction,
wait_for: [Reactor.Dsl.WaitFor.t()]
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :action,
describe: "Declares a step that will call a generic action on a resource.",
no_depend_modules: [:api, :resource],
target: __MODULE__,
args: [:name, :resource, {:optional, :action}],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
actor: [Ash.Reactor.Dsl.Actor.__entity__()],
inputs: [Ash.Reactor.Dsl.Inputs.__entity__()],
tenant: [Ash.Reactor.Dsl.Tenant.__entity__()],
wait_for: [Reactor.Dsl.WaitFor.__entity__()]
],
singleton_entity_keys: [:actor, :tenant],
recursive_as: :steps,
schema: [
action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource.
"""
],
api: [
type: {:spark, Ash.Api},
required: false,
doc:
"The API to use when calling the action. Defaults to the API set in the `ash` section."
],
async?: [
type: :boolean,
required: false,
default: true,
doc:
"When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`."
],
authorize?: [
type: {:or, [:boolean, nil]},
required: false,
default: nil,
doc: "Explicitly enable or disable authorization for the action."
],
description: [
type: :string,
required: false,
doc: "A description for the step"
],
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for
arguments into other steps.
"""
],
resource: [
type: {:spark, Ash.Resource},
required: true,
doc: """
The resource to call the action on.
"""
],
undo_action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource when the step is to be undone.
"""
],
undo: [
type: {:in, [:always, :never, :outside_transaction]},
required: false,
default: :never,
doc: """
What to do when the reactor is undoing it's work?
* `always` - The undo action will always be run.
* `never` - The action will never be undone.
* `outside_transaction` - The action will only be undone if not running inside a transaction.
"""
]
]
}
end

View file

@ -0,0 +1,412 @@
defmodule Ash.Reactor.Dsl.ActionTransformer do
@moduledoc """
Responsible for transforming actions.
"""
alias Reactor.Utils
alias Spark.{Dsl, Dsl.Transformer, Error.DslError}
use Transformer
@doc false
@impl true
@spec before?(module) :: boolean
def before?(Reactor.Dsl.Transformer), do: true
def before?(_), do: false
@doc false
@impl true
@spec transform(Dsl.t()) :: {:ok, Dsl.t()} | {:error, DslError.t()}
def transform(dsl_state) do
dsl_state
|> Transformer.get_entities([:reactor])
|> Utils.reduce_while_ok(dsl_state, fn entity, dsl_state ->
case transform_step(entity, dsl_state) do
{:ok, entity, dsl_state} ->
{:ok, Transformer.replace_entity(dsl_state, [:reactor], entity)}
{:error, reason} ->
{:error, reason}
:ignore ->
{:ok, dsl_state}
end
end)
end
defp transform_step(entity, dsl_state) when entity.action_step? do
with {:ok, entity} <- transform_entity_api(entity, dsl_state),
:ok <- validate_entity_api(entity, dsl_state),
:ok <- validate_entity_resource(entity, dsl_state),
{:ok, action} <- get_entity_resource_action(entity, dsl_state),
entity <- %{entity | action: action.name},
:ok <- validate_entity_input_names(entity, action, dsl_state),
:ok <- validate_entity_input_dupes(entity, dsl_state),
:ok <- validate_entity_input_empty(entity, dsl_state),
:ok <- maybe_validate_upsert_identity(entity, dsl_state) do
transform_nested_steps(entity, dsl_state)
end
end
defp transform_step(entity, dsl_state) when entity.type == :transaction do
with :ok <- validate_entity_resources(entity, dsl_state) do
transform_nested_steps(entity, dsl_state)
end
end
defp transform_step(_entity, _dsl_state), do: :ignore
defp transform_nested_steps(entity, dsl_state) when is_list(entity.steps) do
entity.steps
|> Enum.reduce_while({:ok, %{entity | steps: []}, dsl_state}, fn
step, {:ok, entity, dsl_state} ->
case transform_step(step, dsl_state) do
{:ok, step, dsl_state} ->
{:cont, {:ok, %{entity | steps: [step | entity.steps]}, dsl_state}}
:ignore ->
{:cont, {:ok, %{entity | steps: [step | entity.steps]}, dsl_state}}
{:error, reason} ->
{:halt, {:error, reason}}
end
end)
|> case do
{:ok, entity, dsl_state} -> {:ok, %{entity | steps: Enum.reverse(entity.steps)}, dsl_state}
other -> other
end
end
defp transform_nested_steps(entity, dsl_state), do: {:ok, entity, dsl_state}
defp transform_entity_api(entity, dsl_state) do
default_api = Transformer.get_option(dsl_state, [:ash], :default_api)
{:ok, %{entity | api: entity.api || default_api}}
end
defp validate_entity_api(entity, dsl_state) do
if entity.api.spark_is() == Ash.Api do
:ok
else
{:error,
DslError.exception(
module: Transformer.get_entities(dsl_state, :module),
path: [:ash, :default_api],
message:
"The #{entity.type} step `#{inspect(entity.name)}` has its API set to `#{inspect(entity.api)}` but it is not a valid Ash API."
)}
end
end
defp validate_entity_resource(entity, dsl_state) do
if entity.resource.spark_is() == Ash.Resource do
:ok
else
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message:
"The #{entity.type} step `#{inspect(entity.name)}` has its resource set to `#{inspect(entity.resource)}` but it is not a valid Ash resource."
)}
end
end
defp validate_entity_resources(entity, dsl_state) do
entity.resources
|> Enum.reject(&(&1.spark_is() == Ash.Resource))
|> case do
[] ->
:ok
[resource] ->
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message:
"The #{entity.type} step `#{inspect(entity.name)}` has its resources set to `#{inspect(resource)}` but it is not a valid Ash resource."
)}
resources ->
resources = Enum.map_join(resources, ", ", &"`#{inspect(&1)}`")
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message:
"The #{entity.type} step `#{inspect(entity.name)}` has its resources set to #{resources} but they are not valid Ash resources."
)}
end
end
defp validate_entity_input_dupes(%{inputs: [_]} = _entity, _dsl_state), do: :ok
defp validate_entity_input_dupes(%{inputs: [_ | _]} = entity, dsl_state) do
entity.inputs
|> Enum.map(&MapSet.new(Map.keys(&1.template)))
|> Enum.reduce(&MapSet.intersection/2)
|> Enum.to_list()
|> case do
[] ->
:ok
[key] ->
message = """
The #{entity.type} step `#{inspect(entity.name)}` defines multiple inputs for `#{key}`.
"""
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message: message
)}
keys ->
keys_sentence =
keys
|> Enum.map(&"`#{&1}`")
|> to_sentence(final_sep: " and ")
message = """
The #{entity.type} step `#{inspect(entity.name)}` defines multiple inputs for the keys #{keys_sentence}.
"""
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message: message
)}
end
end
defp validate_entity_input_dupes(_entity, _dsl_state), do: :ok
defp validate_entity_input_empty(entity, dsl_state) do
entity.inputs
|> Enum.filter(&Enum.empty?(&1.template))
|> case do
[] ->
:ok
[_] ->
message = """
The #{entity.type} step `#{inspect(entity.name)}` defines an empty input template.
"""
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message: message
)}
_ ->
message = """
The #{entity.type} step `#{inspect(entity.name)}` defines empty input templates.
"""
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message: message
)}
end
end
defp validate_entity_input_names(entity, action, dsl_state) do
argument_names = Enum.map(action.arguments, & &1.name)
allowed_input_names =
entity.resource
|> Ash.Resource.Info.attributes()
|> Enum.map(& &1.name)
|> maybe_accept_inputs(action)
|> maybe_reject_inputs(action)
|> Enum.concat(argument_names)
|> MapSet.new()
provided_input_names =
entity.inputs
|> Enum.flat_map(&Map.keys(&1.template))
|> MapSet.new()
provided_input_names
|> MapSet.difference(allowed_input_names)
|> Enum.to_list()
|> case do
[] ->
:ok
[extra] ->
suggestions =
allowed_input_names
|> Enum.map(&to_string/1)
|> sorted_suggestions(extra)
message = """
The #{entity.type} step `#{inspect(entity.name)} refers to an input named `#{extra}` which doesn't exist.
#{suggestions}
"""
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message: message
)}
extras ->
suggestions =
allowed_input_names
|> Enum.map(&to_string/1)
|> sorted_suggestions(hd(extras))
extras_sentence =
extras
|> Enum.map(&"`#{&1}`")
|> to_sentence(final_sep: " and ")
message = """
The #{entity.type} step `#{inspect(entity.name)} refers to an inputs named #{extras_sentence} which don't exist.
#{suggestions}
"""
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message: message
)}
end
end
defp maybe_accept_inputs(input_names, action) when length(action.accepts) > 0,
do: Enum.filter(input_names, &(&1 in action.accepts))
defp maybe_accept_inputs(input_names, _), do: input_names
defp maybe_reject_inputs(input_names, action) when length(action.rejects) > 0,
do: Enum.filter(input_names, &(&1 in action.rejects))
defp maybe_reject_inputs(input_names, _), do: input_names
defp get_entity_resource_action(entity, dsl_state) when is_nil(entity.action) do
entity.resource
|> Ash.Resource.Info.actions()
|> Enum.find(&(&1.type == entity.type && &1.primary?))
|> case do
nil ->
suggestions =
entity.resource
|> Ash.Resource.Info.actions()
|> Enum.filter(&(&1.type == entity.type))
|> Enum.map(&to_string(&1.name))
|> sorted_suggestions(entity.action,
prefix: "Available #{entity.type} actions are ",
suffix: ".",
final_sep: " and "
)
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message:
"The step `#{inspect(entity.name)}` has no action name specified and no primary #{entity.type} action." <>
suggestions
)}
action ->
{:ok, action}
end
end
defp get_entity_resource_action(entity, dsl_state) do
case Ash.Resource.Info.action(entity.resource, entity.action, entity.type) do
nil ->
suggestions =
entity.resource
|> Ash.Resource.Info.actions()
|> Enum.filter(&(&1.type == entity.type))
|> Enum.map(&to_string(&1.name))
|> sorted_suggestions(entity.action)
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message:
"The #{entity.type} step `#{inspect(entity.name)}` refers to an action named `#{entity.action}` which doesn't exist." <>
suggestions
)}
action ->
{:ok, action}
end
end
defp maybe_validate_upsert_identity(entity, dsl_state)
when entity.upsert? and entity.upsert_identity do
if Ash.Resource.Info.identity(entity.resource, entity.upsert_identity) do
:ok
else
suggestions =
entity.resource
|> Ash.Resource.Info.identities()
|> Enum.map(& &1.name)
|> sorted_suggestions(entity.upsert_identity)
{:error,
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: [:reactor, entity.type, entity.name],
message:
"The #{entity.type} step `#{inspect(entity.name)}` refers to an identity named `#{entity.upsert_identity}` but it does not exist." <>
suggestions
)}
end
end
defp maybe_validate_upsert_identity(_entity, _dsl_state), do: :ok
defp sorted_suggestions(suggestions, tried, options \\ [])
defp sorted_suggestions([], _, _), do: ""
defp sorted_suggestions(suggestions, tried, options) do
tried = to_string(tried)
prefix = Keyword.get(options, :prefix, "Did you mean ")
suffix = Keyword.get(options, :suffix, "?")
suggestions
|> Enum.map(&to_string/1)
|> Enum.sort_by(&String.jaro_distance(&1, tried))
|> Enum.map(&"`#{&1}`")
|> to_sentence(options)
|> then(&"\n\n#{prefix}#{&1}#{suffix}")
end
defp to_sentence([value], _opts), do: to_string(value)
defp to_sentence([_ | _] = values, opts) do
[last | rest] = Enum.reverse(values)
sep = Keyword.get(opts, :sep, ", ")
final_sep = Keyword.get(opts, :final_sep, " or ")
rest =
rest
|> Enum.reverse()
|> Enum.join(sep)
"#{rest}#{final_sep}#{last}"
end
end

View file

@ -0,0 +1,50 @@
defmodule Ash.Reactor.Dsl.Actor do
@moduledoc """
Specify the actor used to execute an action.
"""
defstruct __identifier__: nil, source: nil, transform: nil
alias Reactor.Template
@type t :: %__MODULE__{
__identifier__: any,
source: Template.Input.t() | Template.Result.t() | Template.Value.t(),
transform: nil | (any -> any) | {module, keyword} | mfa
}
@doc false
def __entity__ do
template_type = Template.type()
%Spark.Dsl.Entity{
name: :actor,
describe: "Specifies the action actor",
args: [:source],
imports: [Reactor.Dsl.Argument],
identifier: {:auto, :unique_integer},
target: __MODULE__,
schema: [
source: [
type: template_type,
required: true,
doc: """
What to use as the source of the actor.
See `Reactor.Dsl.Argument` for more information.
"""
],
transform: [
type:
{:or, [{:spark_function_behaviour, Reactor.Step, {Reactor.Step.Transform, 1}}, nil]},
required: false,
default: nil,
doc: """
An optional transformation function which can be used to modify the
actor before it is passed to the action.
"""
]
]
}
end
end

View file

@ -0,0 +1,158 @@
defmodule Ash.Reactor.Dsl.Create do
@moduledoc """
The `create` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
action: nil,
action_step?: true,
actor: [],
api: nil,
async?: true,
authorize?: nil,
description: nil,
inputs: [],
name: nil,
resource: nil,
tenant: [],
transform: nil,
type: :create,
wait_for: [],
undo_action: nil,
undo: :never,
upsert_identity: nil,
upsert?: false
@type t :: %__MODULE__{
__identifier__: any,
action: atom,
action_step?: true,
actor: [Ash.Reactor.Dsl.Actor.t()],
api: Ash.Api.t(),
async?: boolean,
authorize?: boolean | nil,
description: String.t() | nil,
name: atom,
inputs: [Ash.Reactor.Dsl.Inputs.t()],
resource: module,
tenant: [Ash.Reactor.Dsl.Tenant.t()],
type: :create,
wait_for: [Reactor.Dsl.WaitFor.t()],
undo_action: atom,
undo: :always | :never | :outside_transaction,
upsert_identity: nil | atom,
upsert?: boolean
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :create,
describe: "Declares a step that will call a create action on a resource.",
examples: [
"""
create :create_post, MyApp.Post, :create do
inputs %{
title: input(:post_title),
author_id: result(:get_user, [:id])
}
actor result(:get_user)
tenant result(:get_organisation, [:id])
end
"""
],
no_depend_modules: [:api, :resource],
target: __MODULE__,
args: [:name, :resource, {:optional, :action}],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
actor: [Ash.Reactor.Dsl.Actor.__entity__()],
inputs: [Ash.Reactor.Dsl.Inputs.__entity__()],
tenant: [Ash.Reactor.Dsl.Tenant.__entity__()],
wait_for: [Reactor.Dsl.WaitFor.__entity__()]
],
singleton_entity_keys: [:actor, :tenant],
recursive_as: :steps,
schema: [
action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource.
"""
],
api: [
type: {:spark, Ash.Api},
required: false,
doc:
"The API to use when calling the action. Defaults to the API set in the `ash` section."
],
async?: [
type: :boolean,
required: false,
default: true,
doc:
"When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`."
],
authorize?: [
type: {:or, [:boolean, nil]},
required: false,
default: nil,
doc: "Explicitly enable or disable authorization for the action."
],
description: [
type: :string,
required: false,
doc: "A description for the step"
],
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for
arguments into other steps.
"""
],
resource: [
type: {:spark, Ash.Resource},
required: true,
doc: """
The resource to call the action on.
"""
],
undo_action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource when the step is to be undone.
"""
],
undo: [
type: {:in, [:always, :never, :outside_transaction]},
required: false,
default: :never,
doc: """
What to do when the reactor is undoing it's work?
* `always` - The undo action will always be run.
* `never` - The action will never be undone.
* `outside_transaction` - The action will only be undone if not running inside a transaction.
"""
],
upsert_identity: [
type: :atom,
required: false,
doc: "The identity to use for the upsert"
],
upsert?: [
type: :boolean,
required: false,
default: false,
doc: "Whether or not this action should be executed as an upsert."
]
]
}
end

View file

@ -0,0 +1,155 @@
defmodule Ash.Reactor.Dsl.Destroy do
@moduledoc """
The `destroy` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
action: nil,
action_step?: true,
actor: [],
api: nil,
async?: true,
authorize?: nil,
description: nil,
initial: nil,
inputs: [],
name: nil,
resource: nil,
return_destroyed?: false,
tenant: [],
transform: nil,
type: :destroy,
undo_action: nil,
undo: :never,
wait_for: []
@type t :: %__MODULE__{
__identifier__: any,
action: atom,
action_step?: true,
actor: [Ash.Reactor.Dsl.Actor.t()],
api: Ash.Api.t(),
async?: boolean,
authorize?: boolean | nil,
description: String.t() | nil,
name: atom,
initial: Reactor.Template.t(),
inputs: [Ash.Reactor.Dsl.Inputs.t()],
resource: module,
return_destroyed?: boolean,
tenant: [Ash.Reactor.Dsl.Tenant.t()],
type: :destroy,
undo_action: atom,
undo: :always | :never | :outside_transaction,
wait_for: [Reactor.Dsl.WaitFor.t()]
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :destroy,
describe: "Declares a step that will call a destroy action on a resource.",
examples: [
"""
destroy :delete_post, MyApp.Post, :destroy do
initial input(:post)
actor result(:get_user)
tenant result(:get_organisation, [:id])
end
"""
],
no_depend_modules: [:api, :resource],
target: __MODULE__,
args: [:name, :resource, {:optional, :action}],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
actor: [Ash.Reactor.Dsl.Actor.__entity__()],
inputs: [Ash.Reactor.Dsl.Inputs.__entity__()],
tenant: [Ash.Reactor.Dsl.Tenant.__entity__()],
wait_for: [Reactor.Dsl.WaitFor.__entity__()]
],
singleton_entity_keys: [:actor, :tenant],
recursive_as: :steps,
schema: [
action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource.
"""
],
api: [
type: {:spark, Ash.Api},
required: false,
doc:
"The API to use when calling the action. Defaults to the API set in the `ash` section."
],
async?: [
type: :boolean,
required: false,
default: true,
doc:
"When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`."
],
authorize?: [
type: {:or, [:boolean, nil]},
required: false,
default: nil,
doc: "Explicitly enable or disable authorization for the action."
],
description: [
type: :string,
required: false,
doc: "A description for the step"
],
initial: [
type: Reactor.Template.type(),
required: true,
doc: "The record to update."
],
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for
arguments into other steps.
"""
],
resource: [
type: {:spark, Ash.Resource},
required: true,
doc: """
The resource to call the action on.
"""
],
return_destroyed?: [
type: :boolean,
default: false,
required: false,
doc: "Whether or not the step should return the destroyed record upon completion."
],
undo_action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource when the step is undone.
"""
],
undo: [
type: {:in, [:always, :never, :outside_transaction]},
required: false,
default: :never,
doc: """
What to do when the reactor is undoing it's work?
* `always` - The undo action will always be run.
* `never` - The action will never be undone.
* `outside_transaction` - The action will only be undone if not running inside a transaction.
"""
]
]
}
end

124
lib/ash/reactor/dsl/get.ex Normal file
View file

@ -0,0 +1,124 @@
defmodule Ash.Reactor.Dsl.Get do
@moduledoc """
The `get` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
action: nil,
action_step?: true,
actor: [],
api: nil,
async?: true,
authorize?: nil,
description: nil,
fail_on_not_found?: nil,
inputs: [],
name: nil,
resource: nil,
tenant: [],
transform: nil,
type: :read,
wait_for: []
@type t :: %__MODULE__{
__identifier__: any,
action: atom,
action_step?: true,
actor: [Ash.Reactor.Dsl.Actor.t()],
api: Ash.Api.t(),
async?: boolean,
authorize?: boolean | nil,
description: String.t() | nil,
fail_on_not_found?: boolean,
name: atom,
inputs: [Ash.Reactor.Dsl.Inputs.t()],
resource: module,
tenant: [Ash.Reactor.Dsl.Tenant.t()],
type: :create,
wait_for: [Reactor.Dsl.WaitFor.t()]
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :get,
describe:
"Declares a step that will call a read action on a resource retuning a single record.",
examples: [
"""
get :post_by_id, MyApp.Post, :read do
inputs %{id: input(:post_id)}
end
"""
],
no_depend_modules: [:api, :resource],
target: __MODULE__,
args: [:name, :resource, {:optional, :action}],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
actor: [Ash.Reactor.Dsl.Actor.__entity__()],
inputs: [Ash.Reactor.Dsl.Inputs.__entity__()],
tenant: [Ash.Reactor.Dsl.Tenant.__entity__()],
wait_for: [Reactor.Dsl.WaitFor.__entity__()]
],
singleton_entity_keys: [:actor, :tenant],
recursive_as: :steps,
schema: [
action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource.
"""
],
api: [
type: {:spark, Ash.Api},
required: false,
doc:
"The API to use when calling the action. Defaults to the API set in the `ash` section."
],
async?: [
type: :boolean,
required: false,
default: true,
doc:
"When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`."
],
authorize?: [
type: {:or, [:boolean, nil]},
required: false,
default: nil,
doc: "Explicitly enable or disable authorization for the action."
],
description: [
type: :string,
required: false,
doc: "A description for the step"
],
fail_on_not_found?: [
type: :boolean,
required: false,
default: false,
doc: "When set to true the step will fail if the resource is not found."
],
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for
arguments into other steps.
"""
],
resource: [
type: {:spark, Ash.Resource},
required: true,
doc: """
The resource to call the action on.
"""
]
]
}
end

View file

@ -0,0 +1,81 @@
defmodule Ash.Reactor.Dsl.Inputs do
@moduledoc """
The `inputs` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
template: %{},
transform: nil
@type t :: %__MODULE__{
__identifier__: any,
template:
%{optional(atom) => Reactor.Template.t()}
| Keyword.t(Reactor.Template.t()),
transform: nil | (any -> any) | {module, keyword} | mfa
}
@doc false
@spec __entity__ :: Spark.Dsl.Entity.t()
def __entity__ do
template_type = Reactor.Template.type()
input_template_type =
{:or,
[
{:map, :atom, template_type},
{:keyword_list, [*: [type: template_type]]}
]}
%Spark.Dsl.Entity{
name: :inputs,
describe: "Specify the inputs for an action",
docs: """
Used to build a map for input into an action.
You can provide the template value as either a map or keyword list and
multiple instances of `inputs` will be merged together.
""",
examples: [
"""
inputs %{
author: result(:get_user),
title: input(:title),
body: input(:body)
}
""",
"""
inputs(author: result(:get_user))
"""
],
target: __MODULE__,
args: [:template],
identifier: {:auto, :unique_integer},
imports: [Reactor.Dsl.Argument],
schema: [
template: [
type: input_template_type,
required: true
],
transform: [
type:
{:or,
[{:spark_function_behaviour, Reactor.Step, {Reactor.Step.TransformAll, 1}}, nil]},
required: false,
default: nil,
doc: """
An optional transformation function which will transform the inputs
before executing the action.
"""
]
],
transform: {__MODULE__, :__transform__, []}
}
end
@doc false
@spec __transform__(t) :: {:ok, t} | {:error, any}
def __transform__(entity) do
{:ok, %{entity | template: Map.new(entity.template)}}
end
end

View file

@ -0,0 +1,43 @@
defmodule Ash.Reactor.Dsl.MiddlewareTransformer do
@moduledoc """
Ensures that the required middlewares are added to the Reactor.
"""
alias Spark.Dsl.Transformer
use Transformer
@doc false
@impl true
@spec before?(module) :: boolean
def before?(Reactor.Dsl.Transformer), do: true
def before?(_), do: false
@doc false
@impl true
def transform(dsl_state) do
add_middleware(dsl_state, Ash.Reactor.Tracer)
end
defp add_middleware(dsl_state, middleware) do
middlewares =
dsl_state
|> Transformer.get_entities([:reactor, :middlewares])
|> Enum.filter(&is_struct(&1, Reactor.Dsl.Middleware))
|> Enum.map(& &1.module)
if middleware in middlewares do
{:ok, dsl_state}
else
{:ok, middleware} =
Reactor.Dsl
|> Transformer.build_entity([:reactor, :middlewares], :middleware, module: middleware)
dsl_state =
dsl_state
|> Transformer.add_entity([:reactor, :middlewares], middleware)
{:ok, dsl_state}
end
end
end

118
lib/ash/reactor/dsl/read.ex Normal file
View file

@ -0,0 +1,118 @@
defmodule Ash.Reactor.Dsl.Read do
@moduledoc """
The `read` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
action: nil,
action_step?: true,
actor: [],
api: nil,
async?: true,
authorize?: nil,
description: nil,
inputs: [],
name: nil,
resource: nil,
tenant: [],
transform: nil,
type: :read,
wait_for: []
@type t :: %__MODULE__{
__identifier__: any,
action: atom,
action_step?: true,
actor: [Ash.Reactor.Dsl.Actor.t()],
api: Ash.Api.t(),
async?: boolean,
authorize?: boolean | nil,
description: String.t() | nil,
name: atom,
inputs: [Ash.Reactor.Dsl.Inputs.t()],
resource: module,
tenant: [Ash.Reactor.Dsl.Tenant.t()],
type: :create,
wait_for: [Reactor.Dsl.WaitFor.t()]
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :read,
describe: "Declares a step that will call a read action on a resource.",
examples: [
"""
read :read_posts, MyApp.Post, :read
""",
"""
read :read_posts_in_range, MyApp.Post, :read_in_range do
inputs %{min_date: input(:min_date), max_date: input(:max_date)}
end
"""
],
no_depend_modules: [:api, :resource],
target: __MODULE__,
args: [:name, :resource, {:optional, :action}],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
actor: [Ash.Reactor.Dsl.Actor.__entity__()],
inputs: [Ash.Reactor.Dsl.Inputs.__entity__()],
tenant: [Ash.Reactor.Dsl.Tenant.__entity__()],
wait_for: [Reactor.Dsl.WaitFor.__entity__()]
],
singleton_entity_keys: [:actor, :tenant],
recursive_as: :steps,
schema: [
action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource.
"""
],
api: [
type: {:spark, Ash.Api},
required: false,
doc:
"The API to use when calling the action. Defaults to the API set in the `ash` section."
],
async?: [
type: :boolean,
required: false,
default: true,
doc:
"When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`."
],
authorize?: [
type: {:or, [:boolean, nil]},
required: false,
default: nil,
doc: "Explicitly enable or disable authorization for the action."
],
description: [
type: :string,
required: false,
doc: "A description for the step"
],
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for
arguments into other steps.
"""
],
resource: [
type: {:spark, Ash.Resource},
required: true,
doc: """
The resource to call the action on.
"""
]
]
}
end

View file

@ -0,0 +1,50 @@
defmodule Ash.Reactor.Dsl.Tenant do
@moduledoc """
Specify the actor used to execute an action.
"""
defstruct __identifier__: nil, source: nil, transform: nil
alias Reactor.Template
@type t :: %__MODULE__{
__identifier__: any,
source: Template.Input.t() | Template.Result.t() | Template.Value.t(),
transform: nil | (any -> any) | {module, keyword} | mfa
}
@doc false
def __entity__ do
template_type = Template.type()
%Spark.Dsl.Entity{
name: :tenant,
describe: "Specifies the action tenant",
args: [:source],
imports: [Reactor.Dsl.Argument],
identifier: {:auto, :unique_integer},
target: __MODULE__,
schema: [
source: [
type: template_type,
required: true,
doc: """
What to use as the source of the tenant.
See `Reactor.Dsl.Argument` for more information.
"""
],
transform: [
type:
{:or, [{:spark_function_behaviour, Reactor.Step, {Reactor.Step.Transform, 1}}, nil]},
required: false,
default: nil,
doc: """
An optional transformation function which can be used to modify the
tenant before it is passed to the action.
"""
]
]
}
end
end

View file

@ -0,0 +1,84 @@
defmodule Ash.Reactor.Dsl.Transaction do
@moduledoc """
The `transaction` entity for the `Ash.Reactor` reactor extension.
"""
# Comes from Ecto's default:
# https://hexdocs.pm/ecto/Ecto.Repo.html#module-shared-options
@default_timeout 15_000
defstruct __identifier__: nil,
arguments: [],
description: nil,
name: nil,
resources: [],
return: nil,
steps: [],
type: :transaction,
timeout: @default_timeout,
wait_for: []
@type t :: %__MODULE__{
__identifier__: any,
arguments: [],
description: nil | String.t(),
name: atom,
resources: [Ash.Resource.t()],
return: atom,
steps: [Reactor.Step.t()],
timeout: timeout(),
type: :transaction,
wait_for: [Reactor.Dsl.WaitFor.t()]
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :transaction,
describe:
"Creates a group of steps which will be executed inside a data layer transaction.",
target: __MODULE__,
args: [:name, :resources],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
wait_for: [Reactor.Dsl.WaitFor.__entity__()],
steps: []
],
recursive_as: :steps,
schema: [
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for
arguments into other steps.
"""
],
resources: [
type: {:wrap_list, {:spark, Ash.Resource}},
required: true,
doc: """
A resource or list of resources to consider in the transaction.
"""
],
return: [
type: :atom,
required: false,
doc: """
The name of the step whose result will be returned as the return value of the transaction.
"""
],
timeout: [
type: {:or, [:pos_integer, {:in, [:infinity]}]},
required: false,
default: @default_timeout,
doc: """
How long to allow the transaction to run before timing out.
"""
]
]
}
end

View file

@ -0,0 +1,150 @@
defmodule Ash.Reactor.Dsl.Update do
@moduledoc """
The `update` entity for the `Ash.Reactor` reactor extension.
"""
defstruct __identifier__: nil,
action: nil,
action_step?: true,
actor: [],
api: nil,
async?: true,
authorize?: nil,
description: nil,
initial: nil,
inputs: [],
name: nil,
resource: nil,
tenant: [],
transform: nil,
type: :update,
undo_action: nil,
undo: :never,
wait_for: []
@type t :: %__MODULE__{
__identifier__: any,
action: atom,
action_step?: true,
actor: [Ash.Reactor.Dsl.Actor.t()],
api: Ash.Api.t(),
async?: boolean,
authorize?: boolean | nil,
description: String.t() | nil,
name: atom,
initial: Reactor.Template.t(),
inputs: [Ash.Reactor.Dsl.Inputs.t()],
resource: module,
tenant: [Ash.Reactor.Dsl.Tenant.t()],
type: :update,
undo_action: atom,
undo: :always | :never | :outside_transaction,
wait_for: [Reactor.Dsl.WaitFor.t()]
}
@doc false
def __entity__,
do: %Spark.Dsl.Entity{
name: :update,
describe: "Declares a step that will call an update action on a resource.",
examples: [
"""
update :publish_post, MyApp.Post, :update do
initial input(:post)
inputs %{
published: value(true)
}
actor result(:get_user)
tenant result(:get_organisation, [:id])
end
"""
],
no_depend_modules: [:api, :resource],
target: __MODULE__,
args: [:name, :resource, {:optional, :action}],
identifier: :name,
imports: [Reactor.Dsl.Argument],
entities: [
actor: [Ash.Reactor.Dsl.Actor.__entity__()],
inputs: [Ash.Reactor.Dsl.Inputs.__entity__()],
tenant: [Ash.Reactor.Dsl.Tenant.__entity__()],
wait_for: [Reactor.Dsl.WaitFor.__entity__()]
],
singleton_entity_keys: [:actor, :tenant],
recursive_as: :steps,
schema: [
action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource.
"""
],
api: [
type: {:spark, Ash.Api},
required: false,
doc:
"The API to use when calling the action. Defaults to the API set in the `ash` section."
],
async?: [
type: :boolean,
required: false,
default: true,
doc:
"When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`."
],
authorize?: [
type: {:or, [:boolean, nil]},
required: false,
default: nil,
doc: "Explicitly enable or disable authorization for the action."
],
description: [
type: :string,
required: false,
doc: "A description for the step"
],
initial: [
type: Reactor.Template.type(),
required: true,
doc: "The record to update."
],
name: [
type: :atom,
required: true,
doc: """
A unique name for the step.
This is used when choosing the return value of the Reactor and for
arguments into other steps.
"""
],
resource: [
type: {:spark, Ash.Resource},
required: true,
doc: """
The resource to call the action on.
"""
],
undo_action: [
type: :atom,
required: false,
doc: """
The name of the action to call on the resource when the step is undone.
"""
],
undo: [
type: {:in, [:always, :never, :outside_transaction]},
required: false,
default: :never,
doc: """
What to do when the reactor is undoing it's work?
* `always` - The undo action will always be run.
* `never` - The action will never be undone.
* `outside_transaction` - The action will only be undone if not running inside a transaction.
"""
]
]
}
end

View file

@ -0,0 +1,163 @@
defmodule Ash.Reactor.Notifications do
@moduledoc """
Reactor middleware used to collect and emit notifications upon successful
completion of the Reactor.
"""
use Reactor.Middleware
require Logger
@context_agent_key :__ash_notification_agent__
@context_notification_key :__unpublished_ash_notifications__
defguardp has_agent?(context) when is_map_key(context, @context_agent_key)
defguardp has_notifications?(context)
when is_map_key(context, @context_notification_key) and
length(:erlang.map_get(@context_notification_key, context)) > 0
@doc """
When starting a reactor, start an agent to act as a temporary store of
notifications.
"""
@impl true
def init(context) when has_notifications?(context) do
with {:ok, notifications} <- Map.fetch(context, @context_notification_key),
{:ok, context} <- agent_start(context),
{:ok, context} <- agent_put(context, notifications) do
context = Map.delete(context, @context_notification_key)
{:ok, context}
end
end
def init(context), do: agent_start(context)
@doc """
When halting the reactor, store any queued notifications in the context for
eventual resumption.
"""
@impl true
def halt(context) when has_agent?(context) do
with {:ok, notifications} <- agent_get(context),
{:ok, context} <- agent_stop(context) do
if Enum.any?(notifications) do
{:ok,
Map.update(
context,
@context_notification_key,
notifications,
&Enum.concat(&1, notifications)
)}
else
{:ok, context}
end
end
end
@doc """
When the reactor completes successfully, publish any queued notifications.
"""
@impl true
def complete(result, context) when has_agent?(context) do
with {:ok, notifications} <- agent_get(context),
{:ok, _context} <- agent_stop(context),
[] <- __MODULE__.publish(notifications) do
{:ok, result}
else
{:error, reason} ->
{:error, reason}
notifications when is_list(notifications) ->
{:current_stacktrace, stacktrace} = Process.info(self(), :current_stacktrace)
Logger.warning("""
Missed #{Enum.count(notifications)} notifications in Reactor complete hook.
This happens when your steps return notifications but they are unable to be published
upon successful completion of the reactor.
#{Exception.format_stacktrace(stacktrace)}
""")
{:ok, result}
end
end
def complete(result, _context), do: {:ok, result}
@doc """
When the reactor fails, discard any queued notifications.
"""
@impl true
def error(_errors, context) do
agent_stop(context)
:ok
end
@doc """
Add notifications to the queue to be published on reactor success.
"""
@spec enqueue_notifications(Reactor.context(), Enumerable.t(Ash.Notifier.Notification.t())) ::
:ok | {:error, any}
def enqueue_notifications(context, notifications) do
with {:ok, _} <- agent_put(context, notifications) do
:ok
end
end
@doc """
Dispatch notifications.
"""
@spec publish(Ash.Notifier.Notification.t() | [Ash.Notifier.Notification.t()]) ::
[Ash.Notifier.Notification.t()]
def publish(notifications), do: Ash.Notifier.notify(notifications)
defp agent_start(context) when has_agent?(context) do
case agent_get(context) do
{:ok, _} -> {:ok, context}
_ -> agent_start(Map.delete(context, @context_agent_key))
end
end
defp agent_start(context) do
case Agent.start_link(fn -> [] end) do
{:ok, pid} -> {:ok, Map.put(context, :__ash_notification_agent__, pid)}
{:error, reason} -> {:error, reason}
end
end
defp agent_get(context) do
notifications =
context
|> Map.fetch!(@context_agent_key)
|> Agent.get(fn notifications -> notifications end, 100)
{:ok, notifications}
rescue
error -> {:error, error}
end
defp agent_stop(context) do
:ok =
context
|> Map.fetch!(@context_agent_key)
|> Agent.stop(:normal)
{:ok, Map.delete(context, @context_agent_key)}
rescue
error -> {:error, error}
end
defp agent_put(context, notifications) do
:ok =
context
|> Map.fetch!(@context_agent_key)
|> Agent.update(&Enum.concat(&1, notifications))
{:ok, context}
rescue
error -> {:error, error}
end
end

View file

@ -0,0 +1,41 @@
defmodule Ash.Reactor do
@moduledoc """
`Ash.Reactor` is a [`Reactor`](https://hex.pm/packages/reactor) extension
which provides steps for working with Ash resources and actions.
See the [Ash Reactor Guide](https://hexdocs.pm/ash/reactor.html) for more
information.
"""
@ash %Spark.Dsl.Section{
name: :ash,
describe: "Ash-related configuration for the `Ash.Reactor` extension",
schema: [
default_api: [
type: {:behaviour, Ash.Api},
doc: "An API to use by default when calling actions",
required: false
]
]
}
@type action :: __MODULE__.Dsl.Create.t() | __MODULE__.Dsl.Update.t()
use Spark.Dsl.Extension,
sections: [@ash],
transformers: [__MODULE__.Dsl.ActionTransformer, __MODULE__.Dsl.MiddlewareTransformer],
dsl_patches:
~w[Action Create Destroy Get Read Transaction Update]
|> Enum.map(&Module.concat(__MODULE__.Dsl, &1))
|> Enum.map(&%Spark.Dsl.Patch.AddEntity{section_path: [:reactor], entity: &1.__entity__()})
@doc false
@spec __using__(Keyword.t()) :: Macro.output()
defmacro __using__(opts) do
opts = Keyword.update(opts, :extensions, [Ash.Reactor], &[Ash.Reactor | &1])
quote do
use Reactor, unquote(opts)
end
end
end

View file

@ -0,0 +1,76 @@
defmodule Ash.Reactor.ActionStep do
@moduledoc """
The Reactor step which is used to execute generic actions.
"""
use Reactor.Step
import Ash.Reactor.StepUtils
alias Ash.{ActionInput, DataLayer}
@doc false
@impl true
def run(arguments, _context, options) do
action_input_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[]
|> maybe_set_kw(:authorize?, options[:authorize?])
options[:resource]
|> ActionInput.for_action(options[:action], arguments[:input], action_input_options)
|> options[:api].run_action(action_options)
end
@doc false
@impl true
def undo(record, arguments, _context, options) do
action_input_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[]
|> maybe_set_kw(:authorize?, options[:authorize?])
inputs =
arguments[:input]
|> Map.put(:result_to_undo, record)
options[:resource]
|> ActionInput.for_action(options[:action], inputs, action_input_options)
|> options[:api].run_action(action_options)
end
@doc false
@impl true
def can?(%{impl: {_, options}}, :undo) do
case options[:undo] do
:always -> true
:never -> false
:outside_transaction -> !DataLayer.in_transaction?(options[:resource])
end
end
def can?(_, :compensate), do: false
@doc false
@impl true
def async?(%{impl: {_, options}, async?: async}) do
cond do
DataLayer.in_transaction?(options[:resource]) ->
false
is_function(async, 1) ->
async.(options)
true ->
async
end
end
end

View file

@ -0,0 +1,92 @@
defmodule Ash.Reactor.CreateStep do
@moduledoc """
The Reactor step which is used to execute create actions.
"""
use Reactor.Step
import Ash.Reactor.StepUtils
alias Ash.{Changeset, DataLayer}
@doc false
@impl true
def run(arguments, context, options) do
changeset_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:upsert_identity, options[:upsert_identity])
|> maybe_set_kw(:upsert?, options[:upsert?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[return_notifications?: true]
|> maybe_set_kw(:authorize?, options[:authorize?])
options[:resource]
|> Changeset.for_create(options[:action], arguments[:input], changeset_options)
|> options[:api].create(action_options)
|> case do
{:ok, record} ->
{:ok, record}
{:ok, record, notifications} ->
with :ok <- Ash.Reactor.Notifications.enqueue_notifications(context, notifications),
do: {:ok, record}
{:error, reason} ->
{:error, reason}
end
end
@doc false
@impl true
def undo(record, arguments, _context, options) do
changeset_options =
[]
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[return_notifications?: true, return_destroyed?: false]
|> maybe_set_kw(:authorize?, options[:authorize?])
record
|> Changeset.for_destroy(options[:undo_action], arguments[:input], changeset_options)
|> options[:api].destroy(action_options)
# We always want to discard the notifications.
|> case do
:ok -> :ok
{:ok, _} -> :ok
{:ok, _, _} -> :ok
{:error, reason} -> {:error, reason}
end
end
@doc false
@impl true
def can?(%{impl: {_, options}}, :undo) do
case options[:undo] do
:always -> true
:never -> false
:outside_transaction -> !DataLayer.in_transaction?(options[:resource])
end
end
def can?(_, :compensate), do: false
@doc false
@impl true
def async?(%{impl: {_, options}, async?: async}) do
cond do
DataLayer.in_transaction?(options[:resource]) ->
false
is_function(async, 1) ->
async.(options)
true ->
async
end
end
end

View file

@ -0,0 +1,103 @@
defmodule Ash.Reactor.DestroyStep do
@moduledoc """
The Reactor step which is used to execute update actions.
"""
use Reactor.Step
import Ash.Reactor.StepUtils
alias Ash.{Changeset, DataLayer}
@doc false
@impl true
def run(arguments, context, options) do
changeset_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
|> maybe_set_kw(:return_destroyed?, options[:return_destroyed?])
action_options =
[return_notifications?: true]
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:return_destroyed?, options[:return_destroyed?])
arguments[:initial]
|> Changeset.for_destroy(options[:action], arguments[:input], changeset_options)
|> options[:api].destroy(action_options)
|> case do
:ok ->
{:ok, :ok}
{:ok, record} when is_struct(record) ->
{:ok, record}
{:ok, notifications} when is_list(notifications) ->
with :ok <- Ash.Reactor.Notifications.enqueue_notifications(context, notifications),
do: {:ok, :ok}
{:ok, record, notifications} ->
with :ok <- Ash.Reactor.Notifications.enqueue_notifications(context, notifications),
do: {:ok, record}
{:error, reason} ->
{:error, reason}
end
end
@doc false
@impl true
def undo(record, arguments, _context, options) do
changeset_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[return_notifications?: false]
|> maybe_set_kw(:authorize?, options[:authorize?])
attributes =
options[:resource]
|> Ash.Resource.Info.attributes()
|> Map.new(&{&1.name, record[&1]})
|> then(&Map.merge(arguments[:input], &1))
options[:resource]
|> Changeset.for_create(options[:undo_action], attributes, changeset_options)
|> options[:api].create(action_options)
|> case do
{:ok, record} -> {:ok, record}
{:ok, record, _notifications} -> {:ok, record}
{:error, reason} -> {:error, reason}
end
end
@doc false
@impl true
def can?(%{impl: {_, options}}, :undo) do
case options[:undo] do
:always -> true
:never -> false
:outside_transaction -> !DataLayer.in_transaction?(options[:resource])
end
end
def can?(_, :compensate), do: false
@doc false
@impl true
def async?(%{impl: {_, options}, async?: async}) do
cond do
DataLayer.in_transaction?(options[:resource]) ->
false
is_function(async, 1) ->
async.(options)
true ->
async
end
end
end

View file

@ -0,0 +1,42 @@
defmodule Ash.Reactor.GetStep do
@moduledoc """
The Reactor step which is used to execute get actions.
"""
use Reactor.Step
import Ash.Reactor.StepUtils
alias Ash.Query
def run(arguments, _context, options) do
query_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[]
|> maybe_set_kw(:authorize?, options[:authorize?])
options[:resource]
|> Query.for_read(options[:action], arguments[:input], query_options)
|> options[:api].read_one(action_options)
|> case do
{:ok, nil} ->
if options[:fail_on_not_found?] do
raise Ash.Error.Query.NotFound, resource: options[:resource]
else
{:ok, nil}
end
{:ok, record} ->
{:ok, record}
{:ok, records, _} ->
{:ok, records}
{:error, reason} ->
{:error, reason}
end
end
end

View file

@ -0,0 +1,17 @@
defmodule Ash.Reactor.MergeInputsStep do
@moduledoc """
A custom step which merges any number of `inputs` results into a single map.
"""
use Reactor.Step
@doc false
@impl true
def run(arguments, _context, _options) do
result =
arguments
|> Map.values()
|> Enum.reduce(&Map.merge/2)
{:ok, result}
end
end

View file

@ -0,0 +1,30 @@
defmodule Ash.Reactor.ReadStep do
@moduledoc """
The Reactor step which is used to execute read actions.
"""
use Reactor.Step
import Ash.Reactor.StepUtils
alias Ash.Query
def run(arguments, _context, options) do
query_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[]
|> maybe_set_kw(:authorize?, options[:authorize?])
options[:resource]
|> Query.for_read(options[:action], arguments[:input], query_options)
|> options[:api].read(action_options)
|> case do
{:ok, records} -> {:ok, records}
{:ok, records, _} -> {:ok, records}
{:error, reason} -> {:error, reason}
end
end
end

View file

@ -0,0 +1,30 @@
defmodule Ash.Reactor.TransactionStep do
@moduledoc """
The Reactor step which is used to wrap other steps in an Ash data layer
transaction.
"""
use Reactor.Step
def run(arguments, context, options) do
sub_reactor = Keyword.fetch!(options, :sub_reactor)
resources = Keyword.fetch!(options, :resources)
timeout = Keyword.fetch!(options, :timeout)
Ash.DataLayer.transaction(
resources,
fn ->
case Reactor.run(sub_reactor, arguments, context, async?: false) do
{:ok, result} ->
result
{:error, [reason]} ->
Ash.DataLayer.rollback(resources, reason)
{:error, reasons} ->
Ash.DataLayer.rollback(resources, reasons)
end
end,
timeout
)
end
end

View file

@ -0,0 +1,94 @@
defmodule Ash.Reactor.UpdateStep do
@moduledoc """
The Reactor step which is used to execute update actions.
"""
use Reactor.Step
import Ash.Reactor.StepUtils
alias Ash.{Changeset, DataLayer}
@doc false
@impl true
def run(arguments, context, options) do
changeset_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[return_notifications?: true]
|> maybe_set_kw(:authorize?, options[:authorize?])
arguments[:initial]
|> Changeset.for_update(options[:action], arguments[:input], changeset_options)
|> options[:api].update(action_options)
|> case do
{:ok, record} ->
{:ok, record}
{:ok, record, notifications} ->
with :ok <- Ash.Reactor.Notifications.enqueue_notifications(context, notifications),
do: {:ok, record}
{:error, reason} ->
{:error, reason}
end
end
@doc false
@impl true
def undo(record, arguments, _context, options) do
changeset_options =
options
|> maybe_set_kw(:authorize?, options[:authorize?])
|> maybe_set_kw(:actor, arguments[:actor])
|> maybe_set_kw(:tenant, arguments[:tenant])
action_options =
[return_notifications?: false]
|> maybe_set_kw(:authorize?, options[:authorize?])
attributes =
options[:resource]
|> Ash.Resource.Info.attributes()
|> Map.new(&{&1.name, record[&1]})
|> then(&Map.merge(arguments[:input], &1))
record
|> Changeset.for_update(options[:undo_action], attributes, changeset_options)
|> options[:api].update(action_options)
|> case do
{:ok, record} -> {:ok, record}
{:ok, record, _notifications} -> {:ok, record}
{:error, reason} -> {:error, reason}
end
end
@doc false
@impl true
def can?(%{impl: {_, options}}, :undo) do
case options[:undo] do
:always -> true
:never -> false
:outside_transaction -> !DataLayer.in_transaction?(options[:resource])
end
end
def can?(_, :compensate), do: false
@doc false
@impl true
def async?(%{impl: {_, options}, async?: async}) do
cond do
DataLayer.in_transaction?(options[:resource]) ->
false
is_function(async, 1) ->
async.(options)
true ->
async
end
end
end

View file

@ -0,0 +1,8 @@
defmodule Ash.Reactor.StepUtils do
@moduledoc false
@doc false
@spec maybe_set_kw(Keyword.t(), atom, any) :: Keyword.t()
def maybe_set_kw(keywords, _key, nil), do: keywords
def maybe_set_kw(keywords, key, value), do: Keyword.put(keywords, key, value)
end

25
lib/ash/reactor/tracer.ex Normal file
View file

@ -0,0 +1,25 @@
defmodule Ash.Reactor.Tracer do
@moduledoc """
Reactor middleware which threads Ash's tracing information through to new
processes spawned by Reactor.
"""
use Reactor.Middleware
alias Ash.ProcessHelpers
@doc false
@impl true
def get_process_context do
tracers = Application.get_env(:ash, :tracer, [])
ProcessHelpers.get_context_for_transfer(tracer: tracers)
end
@doc false
@impl true
def set_process_context(context) do
ProcessHelpers.transfer_context(context)
:ok
end
end

18
mix.exs
View file

@ -78,6 +78,7 @@ defmodule Ash.MixProject do
"documentation/topics/phoenix.md", "documentation/topics/phoenix.md",
"documentation/topics/policies.md", "documentation/topics/policies.md",
"documentation/topics/pub_sub.md", "documentation/topics/pub_sub.md",
"documentation/topics/reactor.md",
"documentation/topics/relationships.md", "documentation/topics/relationships.md",
"documentation/topics/security.md", "documentation/topics/security.md",
"documentation/topics/store-context-in-process.md", "documentation/topics/store-context-in-process.md",
@ -91,7 +92,8 @@ defmodule Ash.MixProject do
"documentation/dsls/DSL:-Ash.Flow.md", "documentation/dsls/DSL:-Ash.Flow.md",
"documentation/dsls/DSL:-Ash.DataLayer.Ets.md", "documentation/dsls/DSL:-Ash.DataLayer.Ets.md",
"documentation/dsls/DSL:-Ash.DataLayer.Mnesia.md", "documentation/dsls/DSL:-Ash.DataLayer.Mnesia.md",
"documentation/dsls/DSL:-Ash.Registry.md" "documentation/dsls/DSL:-Ash.Registry.md",
"documentation/dsls/DSL:-Ash.Reactor.md"
], ],
groups_for_extras: [ groups_for_extras: [
Tutorials: ~r'documentation/tutorials', Tutorials: ~r'documentation/tutorials',
@ -183,7 +185,8 @@ defmodule Ash.MixProject do
Ash.DataLayer.Simple, Ash.DataLayer.Simple,
Ash.Notifier.PubSub, Ash.Notifier.PubSub,
Ash.Policy.Authorizer, Ash.Policy.Authorizer,
Ash.Registry Ash.Registry,
Ash.Reactor
], ],
Introspection: [ Introspection: [
Ash.Api.Info, Ash.Api.Info,
@ -322,7 +325,7 @@ defmodule Ash.MixProject do
# Run "mix help deps" to learn about dependencies. # Run "mix help deps" to learn about dependencies.
defp deps do defp deps do
[ [
{:spark, "~> 1.1 and >= 1.1.50"}, {:spark, "~> 1.1 and >= 1.1.55"},
{:ecto, "~> 3.7"}, {:ecto, "~> 3.7"},
{:ets, "~> 0.8"}, {:ets, "~> 0.8"},
{:decimal, "~> 2.0"}, {:decimal, "~> 2.0"},
@ -333,12 +336,15 @@ defmodule Ash.MixProject do
{:telemetry, "~> 1.1"}, {:telemetry, "~> 1.1"},
{:plug, ">= 0.0.0", optional: true}, {:plug, ">= 0.0.0", optional: true},
{:earmark, "~> 1.4"}, {:earmark, "~> 1.4"},
{:reactor, "~> 0.6"},
# Dev/Test dependencies # Dev/Test dependencies
{:eflame, "~> 1.0", only: [:dev, :test]}, {:eflame, "~> 1.0", only: [:dev, :test]},
{:ex_doc, github: "elixir-lang/ex_doc", only: [:dev, :test], runtime: false}, {:ex_doc, github: "elixir-lang/ex_doc", only: [:dev, :test], runtime: false},
{:ex_check, "~> 0.12", only: [:dev, :test]}, {:ex_check, "~> 0.12", only: [:dev, :test]},
{:credo, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:credo, ">= 0.0.0", only: [:dev, :test], runtime: false},
{:dialyxir, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:dialyxir, ">= 0.0.0", only: [:dev, :test], runtime: false},
{:mimic, "~> 1.7", only: [:test]},
{:sobelow, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:sobelow, ">= 0.0.0", only: [:dev, :test], runtime: false},
{:git_ops, "~> 2.5", only: [:dev, :test]}, {:git_ops, "~> 2.5", only: [:dev, :test]},
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
@ -358,11 +364,11 @@ defmodule Ash.MixProject do
"spark.cheat_sheets_in_search" "spark.cheat_sheets_in_search"
], ],
"spark.cheat_sheets_in_search": "spark.cheat_sheets_in_search":
"spark.cheat_sheets_in_search --extensions Ash.Resource.Dsl,Ash.Api.Dsl,Ash.Flow.Dsl,Ash.Registry.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia,Ash.Notifier.PubSub,Ash.Policy.Authorizer", "spark.cheat_sheets_in_search --extensions Ash.Resource.Dsl,Ash.Api.Dsl,Ash.Flow.Dsl,Ash.Registry.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia,Ash.Notifier.PubSub,Ash.Policy.Authorizer,Ash.Reactor",
"spark.formatter": "spark.formatter":
"spark.formatter --extensions Ash.Resource.Dsl,Ash.Api.Dsl,Ash.Flow.Dsl,Ash.Registry.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia,Ash.Notifier.PubSub,Ash.Policy.Authorizer", "spark.formatter --extensions Ash.Resource.Dsl,Ash.Api.Dsl,Ash.Flow.Dsl,Ash.Registry.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia,Ash.Notifier.PubSub,Ash.Policy.Authorizer,Ash.Reactor",
"spark.cheat_sheets": "spark.cheat_sheets":
"spark.cheat_sheets --extensions Ash.Resource.Dsl,Ash.Api.Dsl,Ash.Flow.Dsl,Ash.Registry.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia,Ash.Notifier.PubSub,Ash.Policy.Authorizer" "spark.cheat_sheets --extensions Ash.Resource.Dsl,Ash.Api.Dsl,Ash.Flow.Dsl,Ash.Registry.Dsl,Ash.DataLayer.Ets,Ash.DataLayer.Mnesia,Ash.Notifier.PubSub,Ash.Policy.Authorizer,Ash.Reactor"
] ]
end end
end end

View file

@ -20,19 +20,22 @@
"git_cli": {:hex, :git_cli, "0.3.0", "a5422f9b95c99483385b976f5d43f7e8233283a47cda13533d7c16131cb14df5", [:mix], [], "hexpm", "78cb952f4c86a41f4d3511f1d3ecb28edb268e3a7df278de2faa1bd4672eaf9b"}, "git_cli": {:hex, :git_cli, "0.3.0", "a5422f9b95c99483385b976f5d43f7e8233283a47cda13533d7c16131cb14df5", [:mix], [], "hexpm", "78cb952f4c86a41f4d3511f1d3ecb28edb268e3a7df278de2faa1bd4672eaf9b"},
"git_ops": {:hex, :git_ops, "2.6.0", "e0791ee1cf5db03f2c61b7ebd70e2e95cba2bb9b9793011f26609f22c0900087", [:mix], [{:git_cli, "~> 0.2", [hex: :git_cli, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "b98fca849b18aaf490f4ac7d1dd8c6c469b0cc3e6632562d366cab095e666ffe"}, "git_ops": {:hex, :git_ops, "2.6.0", "e0791ee1cf5db03f2c61b7ebd70e2e95cba2bb9b9793011f26609f22c0900087", [:mix], [{:git_cli, "~> 0.2", [hex: :git_cli, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "b98fca849b18aaf490f4ac7d1dd8c6c469b0cc3e6632562d366cab095e666ffe"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"libgraph": {:hex, :libgraph, "0.16.0", "3936f3eca6ef826e08880230f806bfea13193e49bf153f93edcf0239d4fd1d07", [:mix], [], "hexpm", "41ca92240e8a4138c30a7e06466acc709b0cbb795c643e9e17174a178982d6bf"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mimic": {:hex, :mimic, "1.7.4", "cd2772ffbc9edefe964bc668bfd4059487fa639a5b7f1cbdf4fd22946505aa4f", [:mix], [], "hexpm", "437c61041ecf8a7fae35763ce89859e4973bb0666e6ce76d75efc789204447c3"},
"mix_test_watch": {:hex, :mix_test_watch, "1.1.1", "eee6fc570d77ad6851c7bc08de420a47fd1e449ef5ccfa6a77ef68b72e7e51ad", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f82262b54dee533467021723892e15c3267349849f1f737526523ecba4e6baae"}, "mix_test_watch": {:hex, :mix_test_watch, "1.1.1", "eee6fc570d77ad6851c7bc08de420a47fd1e449ef5ccfa6a77ef68b72e7e51ad", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f82262b54dee533467021723892e15c3267349849f1f737526523ecba4e6baae"},
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"}, "nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"picosat_elixir": {:hex, :picosat_elixir, "0.2.3", "bf326d0f179fbb3b706bb2c15fbc367dacfa2517157d090fdfc32edae004c597", [:make, :mix], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "f76c9db2dec9d2561ffaa9be35f65403d53e984e8cd99c832383b7ab78c16c66"}, "picosat_elixir": {:hex, :picosat_elixir, "0.2.3", "bf326d0f179fbb3b706bb2c15fbc367dacfa2517157d090fdfc32edae004c597", [:make, :mix], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "f76c9db2dec9d2561ffaa9be35f65403d53e984e8cd99c832383b7ab78c16c66"},
"plug": {:hex, :plug, "1.14.2", "cff7d4ec45b4ae176a227acd94a7ab536d9b37b942c8e8fa6dfc0fff98ff4d80", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "842fc50187e13cf4ac3b253d47d9474ed6c296a8732752835ce4a86acdf68d13"}, "plug": {:hex, :plug, "1.14.2", "cff7d4ec45b4ae176a227acd94a7ab536d9b37b942c8e8fa6dfc0fff98ff4d80", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "842fc50187e13cf4ac3b253d47d9474ed6c296a8732752835ce4a86acdf68d13"},
"plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"}, "plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},
"reactor": {:hex, :reactor, "0.6.0", "981757389522440f99cc0a9368c9e7e95e23a3b2b2c2f59f72d5770a82766b14", [:mix], [{:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, "~> 1.0", [hex: :spark, repo: "hexpm", optional: false]}], "hexpm", "6f89a2231bcf8a0bfc3f14424f283906242a60bc3e2782ae93994b1a1482bfe6"},
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
"sourceror": {:hex, :sourceror, "0.14.1", "c6fb848d55bd34362880da671debc56e77fd722fa13b4dcbeac89a8998fc8b09", [:mix], [], "hexpm", "8b488a219e4c4d7d9ff29d16346fd4a5858085ccdd010e509101e226bbfd8efc"}, "sourceror": {:hex, :sourceror, "1.0.1", "ec2c41726d181adce888ac94b3f33b359a811b46e019c084509e02c70042e424", [:mix], [], "hexpm", "28225464ffd68bda1843c974f3ff7ccef35e29be09a65dfe8e3df3f7e3600c57"},
"spark": {:hex, :spark, "1.1.54", "54dac39403a2960f738ba5d60678d20b30de7381fb51b787b6bcb6aeabb73d9d", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "abc9a67cfb60a97d2f3c7e270fa968a2ace94f389e2741d406239d237ec6dbb1"}, "spark": {:hex, :spark, "1.1.55", "d20c3f899b23d841add29edc912ffab4463d3bb801bc73448738631389291d2e", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "bbc15a4223d8e610c81ceca825d5d0bae3738d1c4ac4dbb1061749966776c3f1"},
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
"stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"}, "stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},

View file

@ -0,0 +1,53 @@
defmodule Ash.Test.ReactorActionTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Wooter do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Simple
attributes do
uuid_primary_key :id
end
actions do
action :celebrate, :string do
argument :excitement_level, :integer, default: 3, allow_nil?: false
run fn input, _context ->
level = Ash.ActionInput.get_argument(input, :excitement_level)
ooo = Enum.map(1..level, fn _ -> "o" end)
{:ok, Enum.join(["W"] ++ ooo ++ ["t"])}
end
end
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Ash.Test.ReactorActionTest.Wooter
end
end
defmodule SimpleActionReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :excitement_level
action :celebrate, Ash.Test.ReactorActionTest.Wooter, :celebrate do
inputs(%{excitement_level: input(:excitement_level)})
end
end
test "it runs the generic action" do
assert {:ok, "Wooooooot"} = Reactor.run(SimpleActionReactor, %{excitement_level: 7})
end
end

View file

@ -0,0 +1,277 @@
defmodule Ash.Test.ReactorCreateTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Author do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
multitenancy do
strategy :attribute
attribute :organisation
global? true
end
attributes do
uuid_primary_key :id
attribute :name, :string, allow_nil?: false
attribute :organisation, :string, allow_nil?: true
end
actions do
defaults [:create, :read, :update, :destroy]
end
relationships do
has_many :posts, Ash.Test.ReactorCreateTest.Post
end
end
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
attribute :sub_title, :string
end
actions do
defaults [:create, :read, :update, :destroy]
create :with_actor_as_author do
change relate_actor(:author)
end
end
relationships do
belongs_to :author, Ash.Test.ReactorCreateTest.Author do
attribute_writable? true
allow_nil? true
end
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Ash.Test.ReactorCreateTest.Author
resource Ash.Test.ReactorCreateTest.Post
end
end
test "it can create a post" do
defmodule SimpleCreatePostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :title
input :sub_title
create :create_post, Post, :create do
inputs(%{title: input(:title), sub_title: input(:sub_title)})
end
end
assert {:ok, post} =
Reactor.run(SimpleCreatePostReactor, %{title: "Title", sub_title: "Sub-title"})
assert post.title == "Title"
assert post.sub_title == "Sub-title"
assert post.__meta__.state == :loaded
end
test "it defaults to the primary action when the action is not supplied" do
defmodule InferredActionNameCreatePostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :title
input :sub_title
create :create_post, Post do
inputs(%{title: input(:title), sub_title: input(:sub_title)})
end
end
assert {:ok, _post} =
Reactor.run(InferredActionNameCreatePostReactor, %{
title: "Title",
sub_title: "Sub-title"
})
end
test "it merges multiple `inputs` entities together" do
defmodule MergedInputsCreatePostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :title
input :sub_title
create :create_post, Post, :create do
inputs(%{title: input(:title)})
inputs(%{sub_title: input(:sub_title)})
end
end
assert {:ok, post} =
Reactor.run(MergedInputsCreatePostReactor, %{
title: "Title",
sub_title: "Sub-title"
})
assert post.title == "Title"
assert post.sub_title == "Sub-title"
end
test "`inputs` entities can be transformed separately" do
defmodule TransformedInputsCreatePostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :title
create :create_post, Post, :create do
inputs %{title: input(:title)} do
transform &%{title: String.upcase(&1.title)}
end
inputs %{sub_title: input(:title)} do
transform &%{sub_title: String.downcase(&1.sub_title)}
end
end
end
assert {:ok, post} = Reactor.run(TransformedInputsCreatePostReactor, %{title: "Title"})
assert post.title == "TITLE"
assert post.sub_title == "title"
end
test "it can provide an actor" do
defmodule CreateWithActorCreatePostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :author_name
input :title
input :sub_title
create :create_author, Author, :create do
inputs(%{name: input(:author_name)})
end
create :create_post, Post, :with_actor_as_author do
inputs(%{title: input(:title), sub_title: input(:sub_title)})
actor(result(:create_author))
end
end
assert {:ok, post} =
Reactor.run(CreateWithActorCreatePostReactor, %{
author_name: "Marty McFly",
title: "Title",
sub_title: "Sub-title"
})
assert post.author.name == "Marty McFly"
assert post.title == "Title"
end
test "it can provide a tenant" do
defmodule TenantedCreateAuthorReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :author_name
input :organisation_name
create :create_author, Author, :create do
inputs(%{name: input(:author_name)})
tenant input(:organisation_name)
end
end
assert {:ok, author} =
Reactor.run(TenantedCreateAuthorReactor, %{
author_name: "Marty McFly",
organisation_name: "Hill Valley High School"
})
assert author.name == "Marty McFly"
assert author.organisation == "Hill Valley High School"
end
test "it can undo the creation on error" do
defmodule UndoingCreateAuthorReactor do
@moduledoc false
use Ash.Reactor
ash do
default_api Api
end
input :author_name
create :create_author, Author, :create do
inputs(%{name: input(:author_name)})
undo :always
undo_action(:destroy)
end
step :fail do
argument :author, result(:create_author)
run fn _, _ ->
assert [_] = Api.read!(Author)
raise "hell"
end
end
end
assert {:error, _} =
Reactor.run(UndoingCreateAuthorReactor, %{author_name: "Marty McFly"}, %{},
async?: false
)
assert [] = Api.read!(Author)
end
end

View file

@ -0,0 +1,133 @@
defmodule Ash.Test.ReactorDestroyTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
attribute :sub_title, :string
attribute :published, :boolean, default: false
end
actions do
defaults [:create, :read, :update, :destroy]
end
code_interface do
define_for Ash.Test.ReactorDestroyTest.Api
define :create
define :get, get_by: :id, action: :read
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Post
end
end
test "it can destroy a post" do
defmodule SimpleDestroyPostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :post
destroy :delete_post, Post, :destroy do
initial(input(:post))
end
end
{:ok, original_post} =
Post.create(%{title: "Title", sub_title: "Sub-title"})
assert {:ok, :ok} =
Reactor.run(SimpleDestroyPostReactor, %{post: original_post}, %{}, async?: false)
end
test "it can destroy and return a post" do
defmodule ReturningDestroyPostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :post
destroy :delete_post, Post, :destroy do
initial(input(:post))
return_destroyed?(true)
end
end
{:ok, original_post} =
Post.create(%{title: "Title", sub_title: "Sub-title"})
assert {:ok, post} =
Reactor.run(ReturningDestroyPostReactor, %{post: original_post}, %{}, async?: false)
assert original_post.__struct__ == post.__struct__
assert original_post.id == post.id
assert post.__meta__.state == :deleted
end
test "it can undo the destruction on error" do
defmodule UndoingDestroyPostReactor do
@moduledoc false
use Ash.Reactor
ash do
default_api Api
end
input :post
destroy :delete_post, Post, :destroy do
initial(input(:post))
undo :always
undo_action(:create)
return_destroyed?(true)
end
step :fail do
wait_for :delete_post
run fn _, _ ->
assert [] = Api.read!(Post)
raise "hell"
end
end
end
{:ok, post} = Post.create(%{title: "Title"})
assert {:error, _} =
Reactor.run(
UndoingDestroyPostReactor,
%{post: post},
%{},
async?: false
)
assert Post.get(post.id)
end
end

84
test/reactor/get_test.exs Normal file
View file

@ -0,0 +1,84 @@
defmodule Ash.Test.ReactorGetTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
end
actions do
defaults [:create, :read]
end
code_interface do
define_for Ash.Test.ReactorGetTest.Api
define :create
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Ash.Test.ReactorGetTest.Post
end
end
defmodule SimpleGetReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
get(:get_post, Ash.Test.ReactorGetTest.Post, :read)
end
test "when more than one record is returned it returns an error" do
~w[Marty Doc Einstein]
|> Enum.each(&Post.create!(%{title: &1}))
assert {:error, [error]} = Reactor.run(SimpleGetReactor, %{}, %{}, async?: false)
assert Exception.message(error) =~ "expected at most one result"
end
test "when no records are returned it returns nil" do
assert {:ok, nil} = Reactor.run(SimpleGetReactor, %{}, %{}, async?: false)
end
test "when no records are returned it can return an error" do
defmodule NotFoundReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
get :get_post, Ash.Test.ReactorGetTest.Post, :read do
fail_on_not_found?(true)
end
end
assert {:error, [error]} = Reactor.run(NotFoundReactor, %{}, %{}, async?: false)
assert Exception.message(error) =~ "not found"
end
test "when exactly one record is returned it returns it" do
expected = Post.create!(%{title: "Marty"})
assert {:ok, actual} = Reactor.run(SimpleGetReactor, %{}, %{}, async?: false)
assert expected.id == actual.id
end
end

View file

@ -0,0 +1,116 @@
defmodule Ash.Test.Reactor.NotificationsTest do
@moduledoc false
use ExUnit.Case, async: true
use Mimic
import ExUnit.CaptureLog
alias Ash.Reactor.Notifications
describe "init/1" do
test "it starts an agent" do
{:ok, context} = Notifications.init(%{})
assert [] == agent_get(context.__ash_notification_agent__)
end
test "when there are already notifications in the context it stores them in the agent" do
notifications = build_notifications()
{:ok, context} =
Notifications.init(%{__unpublished_ash_notifications__: notifications})
enqueued = agent_get(context.__ash_notification_agent__)
assert enqueued == notifications
end
test "when there are already notifications in the context it removes them" do
notifications = build_notifications()
{:ok, context} =
Notifications.init(%{__unpublished_ash_notifications__: notifications})
refute is_map_key(context, :__unpublished_ash_notifications__)
end
end
describe "halt/1" do
setup do
{:ok, context} = Notifications.init(%{})
{:ok, context: context}
end
test "it stops the agent", %{context: context} do
agent = context.__ash_notification_agent__
{:ok, context} = Notifications.halt(context)
refute is_map_key(context, :__ash_notification_agent__)
refute Process.alive?(agent)
end
test "it moves any queued notifications into the context", %{context: context} do
notifications = build_notifications()
:ok = Notifications.enqueue_notifications(context, notifications)
{:ok, context} = Notifications.halt(context)
assert context.__unpublished_ash_notifications__ == notifications
end
end
describe "complete/2" do
setup do
{:ok, context} = Notifications.init(%{})
{:ok, context: context}
end
test "it publishes any queued notifications", %{context: context} do
notifications = build_notifications()
:ok = Notifications.enqueue_notifications(context, notifications)
expect(Notifications, :publish, fn actual ->
assert actual == notifications
[]
end)
assert {:ok, :result} = Notifications.complete(:result, context)
end
test "it logs a warning when there are unpublished notifications", %{context: context} do
notifications = build_notifications()
:ok = Notifications.enqueue_notifications(context, notifications)
expect(Notifications, :publish, & &1)
assert capture_log(fn ->
assert {:ok, :result} = Notifications.complete(:result, context)
end) =~ "Missed 3 notifications"
end
test "it stops the agent", %{context: context} do
agent = context.__ash_notification_agent__
{:ok, :result} = Notifications.complete(:result, context)
refute Process.alive?(agent)
end
end
describe "error/2" do
setup do
{:ok, context} = Notifications.init(%{})
{:ok, context: context}
end
test "it stops the agent", %{context: context} do
agent = context.__ash_notification_agent__
:ok = Notifications.error([:errors], context)
refute Process.alive?(agent)
end
end
defp build_notifications(how_many \\ 3) do
for i <- 1..how_many do
Ash.Notifier.Notification.new(__MODULE__.FakeResource, data: %{count: i})
end
end
defp agent_get(agent), do: Agent.get(agent, & &1)
end

View file

@ -0,0 +1,79 @@
defmodule Ash.Test.ReactorTest do
@moduledoc false
use ExUnit.Case, async: false
use Mimic
setup :set_mimic_global
test "it can be used directly" do
defmodule DirectReactor do
@moduledoc false
use Ash.Reactor
input :whom
step :greet do
argument :whom, input(:whom)
run fn %{whom: whom} -> {:ok, "Hello, #{whom}!"} end
end
end
assert {:ok, "Hello, Marty!"} = Reactor.run(DirectReactor, %{whom: "Marty"})
end
test "notifications are published when the reactor is successful" do
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
end
actions do
defaults [:create, :destroy]
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Ash.Test.ReactorTest.Post
end
end
defmodule NotifyingReactor do
@moduledoc false
use Ash.Reactor
input :title
ash do
default_api Ash.Test.ReactorTest.Api
end
create :create_post, Ash.Test.ReactorTest.Post do
inputs(%{title: input(:title)})
end
end
expect(Ash.Reactor.Notifications, :publish, fn notifications ->
assert [
%Ash.Notifier.Notification{
resource: Ash.Test.ReactorTest.Post,
action: %{name: :create}
}
] = notifications
[]
end)
assert {:ok, _post} = Reactor.run(NotifyingReactor, %{title: "Title"})
end
end

View file

@ -0,0 +1,62 @@
defmodule Ash.Test.ReactorReadTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
end
actions do
defaults [:create, :read]
end
code_interface do
define_for Ash.Test.ReactorReadTest.Api
define :create
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Ash.Test.ReactorReadTest.Post
end
end
defmodule SimpleReadReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
read :read_posts, Ash.Test.ReactorReadTest.Post
end
test "when a posts exist it returns them" do
expected =
~w[Marty Doc Einstein]
|> Enum.map(&Post.create!(%{title: &1}))
{:ok, actual} = Reactor.run(SimpleReadReactor, %{}, %{}, async?: false)
assert actual |> Enum.map(& &1.id) |> Enum.sort() ==
expected |> Enum.map(& &1.id) |> Enum.sort()
end
test "when posts don't exist it returns an empty list" do
assert {:ok, []} = Reactor.run(SimpleReadReactor, %{}, %{}, async?: false)
end
end

View file

@ -0,0 +1,98 @@
defmodule Ash.Test.ReactorTracingTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
end
actions do
defaults [:create, :read, :update, :destroy]
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Ash.Test.ReactorTracingTest.Post
end
end
defmodule TracingTestReactor do
@moduledoc false
use Ash.Reactor
ash do
default_api Api
end
create :create_post, Post, :create do
inputs(%{title: value("A wonderful post about Back To The Future")})
end
get :reload_post, Post, :read do
inputs(%{id: result(:create_post, [:id])})
end
update :update_post, Post, :update do
initial(result(:reload_post))
inputs(%{title: value("A wonderful post about a time travel movie")})
end
destroy :destroy_post, Post, :destroy do
initial(result(:update_post))
end
end
describe "tracing configured with application environment" do
setup do
original_config = Application.get_env(:ash, :tracer, nil)
Application.put_env(:ash, :tracer, [Ash.Tracer.Simple])
on_exit(fn ->
if original_config,
do: Application.put_env(:ash, :tracer, original_config),
else: Application.delete_env(:ash, :tracer)
end)
:ok
end
test "actions in synchronous reactors can be traced" do
assert {:ok, :ok} = Reactor.run(TracingTestReactor, %{}, %{}, async?: false)
assert [
"changeset:post:create",
"api:post.create",
"query:post:read",
"api:post.read",
"changeset:post:update",
"api:post.update",
"changeset:post:destroy",
"api:post.destroy"
] = Ash.Tracer.Simple.gather_spans() |> Enum.map(& &1.name)
end
test "actions in asynchronous reactors can be traced" do
assert {:ok, :ok} = Reactor.run(TracingTestReactor, %{}, %{}, async?: true)
assert [
"changeset:post:create",
"api:post.create",
"query:post:read",
"api:post.read",
"changeset:post:update",
"api:post.update",
"changeset:post:destroy",
"api:post.destroy"
] = Ash.Tracer.Simple.gather_spans() |> Enum.map(& &1.name)
end
end
end

View file

@ -0,0 +1,128 @@
defmodule Ash.Test.Reactor.TransactionTest do
@moduledoc false
use ExUnit.Case, async: false
use Mimic
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Mnesia
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
end
actions do
defaults [:create, :destroy]
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Ash.Test.Reactor.TransactionTest.Post
end
end
setup do
ExUnit.CaptureLog.capture_log(fn ->
Ash.DataLayer.Mnesia.start(__MODULE__.Api)
end)
on_exit(fn ->
ExUnit.CaptureLog.capture_log(fn ->
:mnesia.stop()
:mnesia.delete_schema([node()])
end)
end)
end
test "when the transaction completes successfully it returns the last result" do
defmodule SuccessfulNoReturnTransactionReactor do
@moduledoc false
use Ash.Reactor
ash do
default_api Api
end
transaction :create_posts, Post do
create :post_1, Post, :create do
inputs(%{title: value("About Marty McFly")})
end
create :post_2, Post, :create do
inputs(%{title: value("About Doc Brown")})
end
end
end
assert {:ok, %{title: "About Doc Brown"}} = Reactor.run(SuccessfulNoReturnTransactionReactor)
end
test "when the transaction completes successfully it returns the specified result" do
defmodule SuccessfulNamedReturnTransactionReactor do
@moduledoc false
use Ash.Reactor
ash do
default_api Api
end
transaction :create_posts, Post do
create :post_1, Post, :create do
inputs(%{title: value("About Marty McFly")})
end
create :post_2, Post, :create do
inputs(%{title: value("About Doc Brown")})
end
return :post_1
end
end
assert {:ok, %{title: "About Marty McFly"}} =
Reactor.run(SuccessfulNamedReturnTransactionReactor)
end
test "when the transaction fails it is rolled back" do
defmodule FailAndRollBackTransactionReactor do
@moduledoc false
use Ash.Reactor
ash do
default_api Api
end
transaction :create_posts, Post do
create :post_1, Post, :create do
inputs(%{title: value("About Marty McFly")})
end
step :fail do
run fn _, _ ->
raise "hell"
end
end
return :post_1
end
end
Ash.DataLayer
|> expect(:rollback, fn resources, reason ->
assert resources == [Post]
assert Exception.message(reason) == "hell"
raise reason
end)
assert {:error, [error]} =
Reactor.run(FailAndRollBackTransactionReactor, %{}, %{}, async?: false)
assert Exception.message(error) =~ "hell"
end
end

View file

@ -0,0 +1,179 @@
defmodule Ash.Test.ReactorUpdateTest do
@moduledoc false
use ExUnit.Case, async: true
defmodule Post do
@moduledoc false
use Ash.Resource, data_layer: Ash.DataLayer.Ets
ets do
private? true
end
attributes do
uuid_primary_key :id
attribute :title, :string, allow_nil?: false
attribute :sub_title, :string
attribute :published, :boolean, default: false
end
actions do
defaults [:create, :read, :update, :destroy]
end
code_interface do
define_for Ash.Test.ReactorUpdateTest.Api
define :create
define :get, action: :read, get_by: :id
end
end
defmodule Api do
@moduledoc false
use Ash.Api
resources do
resource Post
end
end
test "it can update a post" do
defmodule SimpleUpdatePostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :post
update :publish_post, Post, :update do
initial(input(:post))
inputs(%{published: value(true)})
end
end
{:ok, %{published: false} = original_post} =
Post.create(%{title: "Title", sub_title: "Sub-title"})
assert {:ok, post} =
Reactor.run(SimpleUpdatePostReactor, %{post: original_post}, %{}, async?: false)
assert post.published
end
test "it defaults to the primary action when the action is not supplied" do
defmodule InferredActionNameUpdatePostReactor do
@moduledoc false
use Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :post
input :new_title
update :update_post, Post do
inputs(%{title: input(:new_title)})
initial(input(:post))
end
end
{:ok, original_post} =
Post.create(%{title: "Title", sub_title: "Sub-title"})
assert {:ok, post} =
Reactor.run(
InferredActionNameUpdatePostReactor,
%{post: original_post, new_title: "New Title"},
%{},
async?: false
)
assert post.title == "New Title"
end
test "it merges multiple `inputs` entities together" do
defmodule MergedInputsCreatePostReactor do
@moduledoc false
use Ash.Reactor, extensions: [Ash.Reactor]
ash do
default_api Api
end
input :post
input :new_title
input :new_sub_title
update :update_post, Post, :update do
initial(input(:post))
inputs(%{title: input(:new_title)})
inputs(%{sub_title: input(:new_sub_title)})
end
end
{:ok, original_post} =
Post.create(%{title: "Title", sub_title: "Sub-title"})
assert {:ok, post} =
Reactor.run(
MergedInputsCreatePostReactor,
%{post: original_post, new_title: "New Title", new_sub_title: "New Sub-title"},
%{},
async?: false
)
assert post.title == "New Title"
assert post.sub_title == "New Sub-title"
end
test "it can undo the update on error" do
defmodule UndoingUpdateReactor do
@moduledoc false
use Ash.Reactor
ash do
default_api Api
end
input :post
input :new_title
update :update_post, Post, :update do
initial(input(:post))
inputs(%{title: input(:new_title)})
undo :always
undo_action(:update)
end
step :fail do
wait_for :update_post
run fn _, _ ->
assert [] = Api.read!(Post)
raise "hell"
end
end
end
{:ok, post} = Post.create(%{title: "Title"})
assert {:error, _} =
Reactor.run(
UndoingUpdateReactor,
%{
post: post,
new_title: "New title"
},
%{},
async?: false
)
post_run_post = Post.get!(post.id)
assert post_run_post.title == "New title"
end
end

View file

@ -2,6 +2,9 @@ exclude = [
ash_three: System.get_env("FLAG_ASH_THREE", "false") != "true" ash_three: System.get_env("FLAG_ASH_THREE", "false") != "true"
] ]
Mimic.copy(Ash.Reactor.Notifications)
Mimic.copy(Ash.DataLayer)
ExUnit.start(stacktrace_depth: 100, exclude: exclude) ExUnit.start(stacktrace_depth: 100, exclude: exclude)
Logger.configure(level: :debug) Logger.configure(level: :debug)