mirror of
https://github.com/ash-project/ash.git
synced 2024-09-20 05:23:03 +12:00
improvement: support depending on requests that will be added
docs: improve flow docs closes: #418 improvement: support dynamic action steps in `Ash.Flow`
This commit is contained in:
parent
652342c41d
commit
6751afc683
7 changed files with 482 additions and 289 deletions
|
@ -65,6 +65,16 @@ end
|
|||
- `%{step_name: :key}` will return a map of each key to the provided step name, i.e `%{key: <step_name_result>}`
|
||||
- `[:step_name]` - which is equivalent to `%{step_name: :step_name}`
|
||||
|
||||
A flow always returns an `%Ash.Flow.Result{}`, and the return value of a successful flow will be available in `%Ash.Flow.Result{result: result}` when the flow did not encounter an error.
|
||||
|
||||
If the flow resulted in an error, `error?` is set to `true`, and the result will be `nil`.
|
||||
|
||||
## Halting and Resuming Flows
|
||||
|
||||
A flow can be halted by using the `halt_if` option on a step, or by a custom step returning `{:error, Ash.Flow.Error.Halted.exception(reason: reason)}`
|
||||
|
||||
In this case, the flow will be marked as `complete?: false`. The result of each step up until this point is saved, and you can then rerun the flow with different inputs by passing the incomplete result into the `resume` option when running the flow again. Individual steps can be rerun by deleting them from the `data` field of the flow.
|
||||
|
||||
## Errors
|
||||
|
||||
Currently, any error anywhere in the flow will fail the flow and will return an error. Over time, error handling behavior will be added, as well as the ability to customize how transactions are rolled back, and to handle errors in a custom way.
|
||||
|
|
|
@ -61,6 +61,7 @@ defmodule Ash.Engine do
|
|||
opts: [],
|
||||
requests: [],
|
||||
data: %{},
|
||||
dependencies_waiting_on_request: MapSet.new(),
|
||||
unsent_dependencies: [],
|
||||
dependencies_seen: MapSet.new(),
|
||||
dependencies: %{},
|
||||
|
@ -466,7 +467,6 @@ defmodule Ash.Engine do
|
|||
end
|
||||
|
||||
defp do_run_iteration(state, request) do
|
||||
log(state, fn -> breakdown(state) end)
|
||||
{state, notifications, dependencies} = fully_advance_request(state, request)
|
||||
|
||||
state
|
||||
|
@ -516,13 +516,6 @@ defmodule Ash.Engine do
|
|||
end
|
||||
end
|
||||
|
||||
defp breakdown(state) do
|
||||
"""
|
||||
State breakdown:
|
||||
#{Enum.map_join(state.requests, "\n", &"#{&1.name}: #{&1.state}")}
|
||||
"""
|
||||
end
|
||||
|
||||
def long_breakdown(state) do
|
||||
"""
|
||||
#{errors(state)}
|
||||
|
@ -724,14 +717,11 @@ defmodule Ash.Engine do
|
|||
|
||||
depended_on_request = Enum.find(state.requests, &(&1.path == dep_path))
|
||||
|
||||
if !depended_on_request do
|
||||
raise "Engine Error in request #{inspect(request_path)}: No request found with path #{inspect(dep_path)}. Available paths:\n #{Enum.map_join(state.requests, "\n", &inspect(&1.path))}"
|
||||
end
|
||||
|
||||
if depended_on_request do
|
||||
# we want to send things from non async requests
|
||||
# after we've sent all info to async requests
|
||||
unsent_dependencies =
|
||||
if depended_on_request.async? do
|
||||
if depended_on_request && depended_on_request.async? do
|
||||
state.unsent_dependencies ++ [{request_path, dep}]
|
||||
else
|
||||
[{request_path, dep} | state.unsent_dependencies]
|
||||
|
@ -750,6 +740,13 @@ defmodule Ash.Engine do
|
|||
dependencies_seen: MapSet.put(state.dependencies_seen, seen_dep),
|
||||
unsent_dependencies: unsent_dependencies
|
||||
}
|
||||
else
|
||||
%{
|
||||
state
|
||||
| dependencies_waiting_on_request:
|
||||
MapSet.put(state.dependencies_waiting_on_request, {request_path, dep})
|
||||
}
|
||||
end
|
||||
end
|
||||
)
|
||||
end
|
||||
|
@ -830,6 +827,12 @@ defmodule Ash.Engine do
|
|||
{async, non_async} =
|
||||
requests
|
||||
|> Enum.map(fn request ->
|
||||
if Enum.find(state.requests, &(&1.path == request.path)) do
|
||||
raise """
|
||||
Attempted to add request #{inspect(request.path)} but it has already been added!
|
||||
"""
|
||||
end
|
||||
|
||||
authorize? = request.authorize? and state.authorize?
|
||||
|
||||
%{
|
||||
|
@ -847,6 +850,26 @@ defmodule Ash.Engine do
|
|||
|> Enum.split_with(& &1.async?)
|
||||
|
||||
%{state | requests: async ++ state.requests ++ non_async}
|
||||
|> add_dependencies_waiting_on_request()
|
||||
end
|
||||
|
||||
defp add_dependencies_waiting_on_request(state) do
|
||||
state.dependencies_waiting_on_request
|
||||
|> Enum.reduce(state, fn
|
||||
{request_path, dep}, state ->
|
||||
dep_path = :lists.droplast(dep)
|
||||
|
||||
if Enum.any?(state.requests, &(&1.path == dep_path)) do
|
||||
%{
|
||||
state
|
||||
| unsent_dependencies: [{request_path, dep} | state.unsent_dependencies],
|
||||
dependencies_waiting_on_request:
|
||||
MapSet.delete(state.dependencies_waiting_on_request, {request_path, dep})
|
||||
}
|
||||
else
|
||||
state
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp build_dependencies(request, dependencies) do
|
||||
|
|
|
@ -1018,6 +1018,15 @@ defmodule Ash.Engine.Request do
|
|||
end
|
||||
end
|
||||
|
||||
result =
|
||||
case result do
|
||||
{:requests, requests} ->
|
||||
{:requests, requests, []}
|
||||
|
||||
other ->
|
||||
other
|
||||
end
|
||||
|
||||
case result do
|
||||
{:new_deps, new_deps} ->
|
||||
log(request, fn -> "New dependencies for #{field}: #{inspect(new_deps)}" end)
|
||||
|
@ -1031,7 +1040,7 @@ defmodule Ash.Engine.Request do
|
|||
|
||||
{:skipped, new_request, notifications, new_deps}
|
||||
|
||||
{:requests, requests} ->
|
||||
{:requests, requests, new_deps} ->
|
||||
log(request, fn ->
|
||||
paths =
|
||||
Enum.map(requests, fn
|
||||
|
@ -1046,13 +1055,15 @@ defmodule Ash.Engine.Request do
|
|||
end)
|
||||
|
||||
new_deps =
|
||||
Enum.flat_map(requests, fn
|
||||
requests
|
||||
|> Enum.flat_map(fn
|
||||
{request, key} ->
|
||||
[request.path ++ [key]]
|
||||
|
||||
_request ->
|
||||
[]
|
||||
end)
|
||||
|> Enum.concat(new_deps)
|
||||
|
||||
new_unresolved =
|
||||
Map.update!(
|
||||
|
|
|
@ -37,6 +37,10 @@ defmodule Ash.Expr do
|
|||
soft_escape(%Ash.Query.Ref{relationship_path: [], attribute: op}, escape?)
|
||||
end
|
||||
|
||||
def do_expr({:__aliases__, _, _} = expr, _escape?) do
|
||||
expr
|
||||
end
|
||||
|
||||
def do_expr({:^, _, [value]}, _escape?) do
|
||||
value
|
||||
end
|
||||
|
|
|
@ -115,7 +115,18 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
end)
|
||||
end
|
||||
|
||||
@deps_keys [:input, :over, :record, :wait_for, :halt_if, :tenant, :condition]
|
||||
@deps_keys [
|
||||
:input,
|
||||
:over,
|
||||
:record,
|
||||
:wait_for,
|
||||
:halt_if,
|
||||
:tenant,
|
||||
:condition,
|
||||
:resource,
|
||||
:api,
|
||||
:action
|
||||
]
|
||||
|
||||
defp handle_input_templates(run_flow_steps) do
|
||||
run_flow_steps
|
||||
|
@ -511,14 +522,7 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
context: context,
|
||||
transaction_name: transaction_name
|
||||
)
|
||||
)
|
||||
|> Enum.map(fn request ->
|
||||
if request.path == :lists.droplast(output_path) do
|
||||
{request, :data}
|
||||
else
|
||||
request
|
||||
end
|
||||
end)}
|
||||
), [output_path]}
|
||||
else
|
||||
{:ok, nil}
|
||||
end
|
||||
|
@ -804,9 +808,21 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
halt_reason: halt_reason
|
||||
} = read
|
||||
|
||||
List.wrap(
|
||||
maybe_dynamic(
|
||||
[resource, action],
|
||||
name,
|
||||
[name, :data],
|
||||
input,
|
||||
all_steps,
|
||||
transaction_name,
|
||||
additional_context,
|
||||
fn resource, action ->
|
||||
%{
|
||||
action: action,
|
||||
action_input: action_input,
|
||||
resource: resource,
|
||||
api: api,
|
||||
dep_paths: dep_paths,
|
||||
tenant: tenant,
|
||||
request_deps: request_deps
|
||||
|
@ -815,6 +831,7 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
all_steps,
|
||||
resource,
|
||||
action,
|
||||
api,
|
||||
action_input,
|
||||
input,
|
||||
transaction_name,
|
||||
|
@ -859,6 +876,9 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
|> Ash.Flow.handle_modifiers()
|
||||
end
|
||||
)
|
||||
end
|
||||
)
|
||||
)
|
||||
|
||||
%Step{step: %Ash.Flow.Step.Create{} = create, input: input} ->
|
||||
%{
|
||||
|
@ -872,8 +892,20 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
halt_if: halt_if
|
||||
} = create
|
||||
|
||||
List.wrap(
|
||||
maybe_dynamic(
|
||||
[resource, action],
|
||||
name,
|
||||
[name, :commit],
|
||||
input,
|
||||
all_steps,
|
||||
transaction_name,
|
||||
additional_context,
|
||||
fn resource, action ->
|
||||
%{
|
||||
action: action,
|
||||
resource: resource,
|
||||
api: api,
|
||||
action_input: action_input,
|
||||
dep_paths: dep_paths,
|
||||
tenant: tenant,
|
||||
|
@ -883,6 +915,7 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
all_steps,
|
||||
resource,
|
||||
action,
|
||||
api,
|
||||
action_input,
|
||||
input,
|
||||
transaction_name,
|
||||
|
@ -920,6 +953,9 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
|> Ash.Flow.handle_modifiers()
|
||||
end
|
||||
)
|
||||
end
|
||||
)
|
||||
)
|
||||
|
||||
%Step{step: %Ash.Flow.Step.Validate{only_keys: keys} = validate, input: input} ->
|
||||
%{
|
||||
|
@ -936,6 +972,7 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
|
||||
%{
|
||||
action: action,
|
||||
resource: resource,
|
||||
action_input: action_input,
|
||||
dep_paths: dep_paths,
|
||||
tenant: tenant,
|
||||
|
@ -945,6 +982,7 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
all_steps,
|
||||
resource,
|
||||
action,
|
||||
nil,
|
||||
action_input,
|
||||
input,
|
||||
transaction_name,
|
||||
|
@ -1065,9 +1103,21 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
halt_reason: halt_reason
|
||||
} = update
|
||||
|
||||
List.wrap(
|
||||
maybe_dynamic(
|
||||
[resource, action],
|
||||
name,
|
||||
[name, :commit],
|
||||
input,
|
||||
all_steps,
|
||||
transaction_name,
|
||||
additional_context,
|
||||
fn resource, action ->
|
||||
%{
|
||||
action: action,
|
||||
action_input: action_input,
|
||||
resource: resource,
|
||||
api: api,
|
||||
dep_paths: dep_paths,
|
||||
tenant: tenant,
|
||||
request_deps: request_deps
|
||||
|
@ -1076,6 +1126,7 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
all_steps,
|
||||
resource,
|
||||
action,
|
||||
api,
|
||||
action_input,
|
||||
input,
|
||||
transaction_name,
|
||||
|
@ -1137,6 +1188,9 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
end
|
||||
)
|
||||
]
|
||||
end
|
||||
)
|
||||
)
|
||||
|
||||
%Step{step: %Ash.Flow.Step.Destroy{} = destroy, input: input} ->
|
||||
%{
|
||||
|
@ -1152,9 +1206,21 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
halt_reason: halt_reason
|
||||
} = destroy
|
||||
|
||||
List.wrap(
|
||||
maybe_dynamic(
|
||||
[resource, action],
|
||||
name,
|
||||
[name, :commit],
|
||||
input,
|
||||
all_steps,
|
||||
transaction_name,
|
||||
additional_context,
|
||||
fn resource, action ->
|
||||
%{
|
||||
action: action,
|
||||
action_input: action_input,
|
||||
resource: resource,
|
||||
api: api,
|
||||
dep_paths: dep_paths,
|
||||
tenant: tenant,
|
||||
request_deps: request_deps
|
||||
|
@ -1163,6 +1229,7 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
all_steps,
|
||||
resource,
|
||||
action,
|
||||
api,
|
||||
action_input,
|
||||
input,
|
||||
transaction_name,
|
||||
|
@ -1225,6 +1292,63 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
)
|
||||
]
|
||||
end
|
||||
)
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_dynamic(
|
||||
items,
|
||||
name,
|
||||
request_path,
|
||||
input,
|
||||
all_steps,
|
||||
transaction_name,
|
||||
additional_context,
|
||||
fun
|
||||
) do
|
||||
# TODO: this only really works for resource/action right now
|
||||
if Enum.all?(items, &is_atom/1) do
|
||||
apply(fun, items)
|
||||
else
|
||||
{items, deps} =
|
||||
Enum.reduce(items, {[], []}, fn item, {items, deps} ->
|
||||
{item, new_deps} = Ash.Flow.handle_input_template(item, input)
|
||||
{[item | items], deps ++ new_deps}
|
||||
end)
|
||||
|
||||
dep_paths = get_dep_paths(all_steps, deps, transaction_name, [])
|
||||
request_deps = dependable_request_paths(dep_paths)
|
||||
|
||||
Ash.Engine.Request.new(
|
||||
authorize?: false,
|
||||
async?: false,
|
||||
name: "Run dynamic step #{inspect(name)}",
|
||||
path: [:dynamic, request_path],
|
||||
error_path: [],
|
||||
data:
|
||||
Ash.Engine.Request.resolve(request_deps, fn context ->
|
||||
context = Ash.Helpers.deep_merge_maps(context, additional_context)
|
||||
|
||||
results = results(dep_paths, context)
|
||||
|
||||
items =
|
||||
items
|
||||
|> Ash.Flow.set_dependent_values(%{
|
||||
results: results,
|
||||
elements: Map.get(context, :_ash_engine_elements)
|
||||
})
|
||||
|> Ash.Flow.handle_modifiers()
|
||||
|
||||
requests =
|
||||
fun
|
||||
|> apply(Enum.reverse(items))
|
||||
|> List.wrap()
|
||||
|
||||
{:ok, nil, %{requests: requests}}
|
||||
end)
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
defp extract_changeset_or_query_errors(result, keys) do
|
||||
|
@ -1486,25 +1610,37 @@ defmodule Ash.Flow.Executor.AshEngine do
|
|||
all_steps,
|
||||
resource,
|
||||
action,
|
||||
api,
|
||||
action_input,
|
||||
input,
|
||||
transaction_name,
|
||||
tenant,
|
||||
additional
|
||||
) do
|
||||
{action_input, deps} = Ash.Flow.handle_input_template(action_input, input)
|
||||
{tenant, tenant_deps} = Ash.Flow.handle_input_template(tenant, input)
|
||||
{_, additional_deps} = Ash.Flow.handle_input_template(additional, input)
|
||||
{resource, resource_deps} = Ash.Flow.handle_input_template(resource, input)
|
||||
{api, api_deps} = Ash.Flow.handle_input_template(api, input)
|
||||
{action, action_deps} = Ash.Flow.handle_input_template(action, input)
|
||||
|
||||
action =
|
||||
Ash.Resource.Info.action(resource, action) ||
|
||||
raise "No such action #{action} for #{resource}"
|
||||
|
||||
{action_input, deps} = Ash.Flow.handle_input_template(action_input, input)
|
||||
{tenant, tenant_deps} = Ash.Flow.handle_input_template(tenant, input)
|
||||
{_, additional_deps} = Ash.Flow.handle_input_template(additional, input)
|
||||
|
||||
dep_paths = get_dep_paths(all_steps, deps ++ tenant_deps, transaction_name, additional_deps)
|
||||
dep_paths =
|
||||
get_dep_paths(
|
||||
all_steps,
|
||||
deps ++ tenant_deps ++ api_deps ++ resource_deps ++ action_deps,
|
||||
transaction_name,
|
||||
additional_deps
|
||||
)
|
||||
|
||||
request_deps = dependable_request_paths(dep_paths)
|
||||
|
||||
%{
|
||||
resource: resource,
|
||||
api: api,
|
||||
action: action,
|
||||
action_input: action_input,
|
||||
dep_paths: dep_paths,
|
||||
|
|
|
@ -2,7 +2,7 @@ defmodule Ash.Flow do
|
|||
@moduledoc """
|
||||
A flow is a static definition of a set of steps to be .
|
||||
|
||||
Seeuthe {{link:ash:guide:Flows}} guide for more.
|
||||
See the {{link:ash:guide:Flows}} guide for more.
|
||||
"""
|
||||
|
||||
@type t :: module
|
||||
|
@ -50,7 +50,6 @@ defmodule Ash.Flow do
|
|||
input: input,
|
||||
result: result,
|
||||
notifications: metadata[:notifications] || [],
|
||||
runner_metadata: metadata[:runner_metadata],
|
||||
valid?: true,
|
||||
complete?: true
|
||||
}
|
||||
|
@ -66,14 +65,16 @@ defmodule Ash.Flow do
|
|||
}
|
||||
|
||||
{:error, metadata, error} ->
|
||||
complete? = complete?(error)
|
||||
|
||||
%Ash.Flow.Result{
|
||||
flow: flow,
|
||||
params: params,
|
||||
input: input,
|
||||
notifications: metadata[:notifications] || [],
|
||||
runner_metadata: metadata[:runner_metadata],
|
||||
runner_metadata: if(not complete?, do: metadata[:runner_metadata]),
|
||||
valid?: false,
|
||||
complete?: false,
|
||||
complete?: complete?,
|
||||
errors: List.wrap(error)
|
||||
}
|
||||
|
||||
|
@ -83,7 +84,7 @@ defmodule Ash.Flow do
|
|||
params: params,
|
||||
input: input,
|
||||
valid?: false,
|
||||
complete?: false,
|
||||
complete?: complete?(error),
|
||||
errors: List.wrap(error)
|
||||
}
|
||||
end
|
||||
|
@ -94,7 +95,7 @@ defmodule Ash.Flow do
|
|||
params: input,
|
||||
input: new_input,
|
||||
valid?: false,
|
||||
complete?: false,
|
||||
complete?: complete?(error),
|
||||
errors: List.wrap(error)
|
||||
}
|
||||
|
||||
|
@ -104,7 +105,7 @@ defmodule Ash.Flow do
|
|||
params: input,
|
||||
input: input,
|
||||
valid?: false,
|
||||
complete?: false,
|
||||
complete?: complete?(error),
|
||||
errors: List.wrap(error)
|
||||
}
|
||||
end
|
||||
|
@ -112,6 +113,14 @@ defmodule Ash.Flow do
|
|||
end
|
||||
end
|
||||
|
||||
defp complete?(%Ash.Error.Flow.Halted{}) do
|
||||
false
|
||||
end
|
||||
|
||||
defp complete?(_) do
|
||||
true
|
||||
end
|
||||
|
||||
defp add_actor(opts) do
|
||||
if Keyword.has_key?(opts, :actor) do
|
||||
opts
|
||||
|
|
|
@ -75,19 +75,19 @@ defmodule Ash.Flow.Step do
|
|||
def shared_action_opts do
|
||||
[
|
||||
resource: [
|
||||
type: :atom,
|
||||
type: :any,
|
||||
required: true,
|
||||
doc: "The resource to call the action on.",
|
||||
links: []
|
||||
],
|
||||
action: [
|
||||
type: :atom,
|
||||
type: :any,
|
||||
required: true,
|
||||
doc: "The action to call on the resource.",
|
||||
links: []
|
||||
],
|
||||
api: [
|
||||
type: :atom,
|
||||
type: :any,
|
||||
doc:
|
||||
"The api to use when calling the action. Defaults to the api set in the `flow` section.",
|
||||
links: []
|
||||
|
|
Loading…
Reference in a new issue