mirror of
https://github.com/ash-project/ash.git
synced 2024-09-20 05:23:03 +12:00
improvement: support non-endpoint pubsub adapters
This commit is contained in:
parent
50f09a4826
commit
4e23e63428
2 changed files with 74 additions and 18 deletions
|
@ -106,6 +106,19 @@ defmodule Ash.Notifier.PubSub do
|
|||
type: :string,
|
||||
doc:
|
||||
"A prefix for all pubsub messages, e.g `users`. A message with `created` would be published as `users:created`"
|
||||
],
|
||||
name: [
|
||||
type: :atom,
|
||||
doc: """
|
||||
A named pub sub to pass as the first argument to broadcast.
|
||||
|
||||
If you are simply using your `Endpoint` module for pubsub then this is unnecessary. If you want to use
|
||||
a custom pub started with something like `{Phoenix.PubSub, name: MyName}`, then you can provide `MyName` to
|
||||
here.
|
||||
|
||||
If this option is provided, we assume we are working with a `Phoenix.PubSub` and not a `Phoenix.Endpoint`, so
|
||||
the payload is sent as a `%Phoenix.Socket.Broadcast{}` if that module is available.
|
||||
"""
|
||||
]
|
||||
]
|
||||
}
|
||||
|
@ -135,6 +148,10 @@ defmodule Ash.Notifier.PubSub do
|
|||
Ash.Dsl.Extension.get_opt(resource, [:pub_sub], :prefix, nil)
|
||||
end
|
||||
|
||||
def name(resource) do
|
||||
Ash.Dsl.Extension.get_opt(resource, [:pub_sub], :name, nil)
|
||||
end
|
||||
|
||||
def notify(%Ash.Notifier.Notification{resource: resource} = notification) do
|
||||
resource
|
||||
|> publications()
|
||||
|
@ -150,14 +167,48 @@ defmodule Ash.Notifier.PubSub do
|
|||
prefix = prefix(notification.resource) || ""
|
||||
prefixed_topic = prefix <> ":" <> topic
|
||||
|
||||
module(notification.resource).broadcast(
|
||||
prefixed_topic,
|
||||
event,
|
||||
notification
|
||||
)
|
||||
args =
|
||||
case name(notification.resource) do
|
||||
nil ->
|
||||
[prefixed_topic, event, notification]
|
||||
|
||||
pub_sub ->
|
||||
payload = to_payload(topic, event, notification)
|
||||
|
||||
[pub_sub, prefixed_topic, payload]
|
||||
end
|
||||
|
||||
args =
|
||||
case publish.dispatcher do
|
||||
nil ->
|
||||
args
|
||||
|
||||
dispatcher ->
|
||||
args ++ dispatcher
|
||||
end
|
||||
|
||||
apply(module(notification.resource), :broadcast, args)
|
||||
end)
|
||||
end
|
||||
|
||||
if Code.ensure_loaded?(Phoenix.Socket.Broadcast) do
|
||||
def to_payload(topic, event, notification) do
|
||||
%Phoenix.Socket.Broadcast{
|
||||
topic: topic,
|
||||
event: event,
|
||||
payload: notification
|
||||
}
|
||||
end
|
||||
else
|
||||
def to_payload(topic, event, notification) do
|
||||
%{
|
||||
topic: topic,
|
||||
event: event,
|
||||
payload: notification
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
defp fill_template(topic, _) when is_binary(topic), do: [topic]
|
||||
|
||||
defp fill_template(topic, notification) do
|
||||
|
@ -176,6 +227,16 @@ defmodule Ash.Notifier.PubSub do
|
|||
all_combinations_of_values(rest, notification, action_type, [item | trail])
|
||||
end
|
||||
|
||||
defp all_combinations_of_values([:_tenant | rest], notification, action_type, trail) do
|
||||
if notification.changeset.tenant do
|
||||
all_combinations_of_values(rest, notification, action_type, [
|
||||
notification.changeset.tenant | trail
|
||||
])
|
||||
else
|
||||
[]
|
||||
end
|
||||
end
|
||||
|
||||
defp all_combinations_of_values([item | rest], notification, :update, trail)
|
||||
when is_atom(item) do
|
||||
value_before_change = Map.get(notification.changeset.data, item)
|
||||
|
@ -189,16 +250,6 @@ defmodule Ash.Notifier.PubSub do
|
|||
end)
|
||||
end
|
||||
|
||||
defp all_combinations_of_values([:_tenant | rest], notification, action_type, trail) do
|
||||
if notification.changeset.tenant do
|
||||
all_combinations_of_values(rest, notification, action_type, [
|
||||
notification.changeset.tenant | trail
|
||||
])
|
||||
else
|
||||
[]
|
||||
end
|
||||
end
|
||||
|
||||
defp all_combinations_of_values([item | rest], notification, action_type, trail)
|
||||
when is_atom(item) do
|
||||
all_combinations_of_values(rest, notification, action_type, [
|
||||
|
|
|
@ -5,7 +5,8 @@ defmodule Ash.Notifier.PubSub.Publication do
|
|||
:action,
|
||||
:topic,
|
||||
:event,
|
||||
:type
|
||||
:type,
|
||||
:dispatcher
|
||||
]
|
||||
|
||||
@schema [
|
||||
|
@ -22,6 +23,11 @@ defmodule Ash.Notifier.PubSub.Publication do
|
|||
event: [
|
||||
type: :string,
|
||||
doc: "The name of the event to publish. Defaults to the action name"
|
||||
],
|
||||
dispatcher: [
|
||||
type: :atom,
|
||||
doc:
|
||||
"The module to use as a dispatcher. If none is set, the pubsub module provided is used."
|
||||
]
|
||||
]
|
||||
|
||||
|
@ -29,8 +35,7 @@ defmodule Ash.Notifier.PubSub.Publication do
|
|||
|> Keyword.update!(:action, &Keyword.delete(&1, :required))
|
||||
|> Keyword.put(:type,
|
||||
type: {:in, [:create, :update, :destroy]},
|
||||
doc:
|
||||
"In the case of multiple actions with the same name, you may need to provide the action type as well."
|
||||
doc: "Publish on all actions of a given type"
|
||||
)
|
||||
|
||||
def schema, do: @schema
|
||||
|
|
Loading…
Reference in a new issue