mirror of
https://github.com/ash-project/ash_graphql.git
synced 2024-09-19 12:53:40 +12:00
update test
This commit is contained in:
parent
a54db97ee9
commit
948472387f
12 changed files with 84 additions and 182 deletions
|
@ -7,7 +7,7 @@ config :ash, :validate_domain_config_inclusion?, false
|
|||
config :logger, level: :warning
|
||||
|
||||
config :ash, :pub_sub, debug?: true
|
||||
config :logger, level: :debug
|
||||
config :logger, level: :info
|
||||
|
||||
if Mix.env() == :dev do
|
||||
config :git_ops,
|
||||
|
|
|
@ -14,8 +14,6 @@ defmodule AshGraphql.Subscription.Config do
|
|||
read_action =
|
||||
@subscription.read_action || Ash.Resource.Info.primary_action!(@resource, :read).name
|
||||
|
||||
dbg(@subscription)
|
||||
|
||||
actor =
|
||||
case @subscription.actor do
|
||||
{module, opts} ->
|
||||
|
|
|
@ -3,65 +3,8 @@ defmodule AshGraphql.Subscription.Endpoint do
|
|||
quote do
|
||||
use Absinthe.Phoenix.Endpoint
|
||||
|
||||
alias Absinthe.Pipeline.BatchResolver
|
||||
|
||||
require Logger
|
||||
|
||||
def run_docset(pubsub, docs_and_topics, notification) do
|
||||
for {topic, key_strategy, doc} <- docs_and_topics do
|
||||
try do
|
||||
pipeline =
|
||||
Absinthe.Subscription.Local.pipeline(doc, notification)
|
||||
|
||||
{:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline)
|
||||
|
||||
Logger.debug("""
|
||||
Absinthe Subscription Publication
|
||||
Field Topic: #{inspect(key_strategy)}
|
||||
Subscription id: #{inspect(topic)}
|
||||
Data: #{inspect(data)}
|
||||
""")
|
||||
|
||||
case should_send?(data) do
|
||||
false ->
|
||||
:ok
|
||||
|
||||
true ->
|
||||
:ok = pubsub.publish_subscription(topic, data)
|
||||
end
|
||||
rescue
|
||||
e ->
|
||||
BatchResolver.pipeline_error(e, __STACKTRACE__)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp should_send?(%{errors: errors}) do
|
||||
# if the user is not allowed to see the data or the query didn't
|
||||
# return any data we do not send the error to the client
|
||||
# because it would just expose unnecessary information
|
||||
# and the user can not really do anything usefull with it
|
||||
not (errors
|
||||
|> List.wrap()
|
||||
|> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found"] end))
|
||||
end
|
||||
|
||||
defp should_send?(_), do: true
|
||||
|
||||
defp get_filter(topic) do
|
||||
[_, rest] = String.split(topic, "__absinthe__:doc:")
|
||||
[filter, _] = String.split(rest, ":")
|
||||
|
||||
case Base.decode64(filter) do
|
||||
{:ok, filter} ->
|
||||
:erlang.binary_to_term(filter)
|
||||
|
||||
_ ->
|
||||
nil
|
||||
end
|
||||
rescue
|
||||
_ -> nil
|
||||
end
|
||||
defdelegate run_docset(pubsub, docs_and_topics, notification),
|
||||
to: AshGraphql.Subscription.Runner
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -12,5 +12,7 @@ defmodule AshGraphql.Subscription.Notifier do
|
|||
Absinthe.Subscription.publish(pub_sub, notification, [{subscription.name, "*"}])
|
||||
end
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
|
46
lib/subscription/runner.ex
Normal file
46
lib/subscription/runner.ex
Normal file
|
@ -0,0 +1,46 @@
|
|||
defmodule AshGraphql.Subscription.Runner do
|
||||
alias Absinthe.Pipeline.BatchResolver
|
||||
|
||||
require Logger
|
||||
|
||||
def run_docset(pubsub, docs_and_topics, notification) do
|
||||
for {topic, key_strategy, doc} <- docs_and_topics do
|
||||
try do
|
||||
pipeline =
|
||||
Absinthe.Subscription.Local.pipeline(doc, notification)
|
||||
|
||||
{:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline)
|
||||
|
||||
Logger.debug("""
|
||||
Absinthe Subscription Publication
|
||||
Field Topic: #{inspect(key_strategy)}
|
||||
Subscription id: #{inspect(topic)}
|
||||
Data: #{inspect(data)}
|
||||
""")
|
||||
|
||||
case should_send?(data) do
|
||||
false ->
|
||||
:ok
|
||||
|
||||
true ->
|
||||
:ok = pubsub.publish_subscription(topic, data)
|
||||
end
|
||||
rescue
|
||||
e ->
|
||||
BatchResolver.pipeline_error(e, __STACKTRACE__)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp should_send?(%{errors: errors}) do
|
||||
# if the user is not allowed to see the data or the query didn't
|
||||
# return any data we do not send the error to the client
|
||||
# because it would just expose unnecessary information
|
||||
# and the user can not really do anything usefull with it
|
||||
not (errors
|
||||
|> List.wrap()
|
||||
|> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found"] end))
|
||||
end
|
||||
|
||||
defp should_send?(_), do: true
|
||||
end
|
|
@ -5,7 +5,7 @@ defmodule AshGraphql.SubscriptionTest do
|
|||
alias AshGraphql.Test.Schema
|
||||
|
||||
setup do
|
||||
Application.put_env(PubSub, :notifier_test_pid, self() |> IO.inspect(label: :test_process))
|
||||
Application.put_env(PubSub, :notifier_test_pid, self())
|
||||
{:ok, _} = PubSub.start_link()
|
||||
{:ok, _} = Absinthe.Subscription.start_link(PubSub)
|
||||
:ok
|
||||
|
@ -13,25 +13,30 @@ defmodule AshGraphql.SubscriptionTest do
|
|||
|
||||
@query """
|
||||
subscription {
|
||||
subscribableCreated { id }
|
||||
subscribableCreated {
|
||||
created {
|
||||
id
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
@tag :wip
|
||||
test "subscription triggers work" do
|
||||
test "can subscribe to a resource" do
|
||||
id = "1"
|
||||
|
||||
assert {:ok, %{"subscribed" => topic}} =
|
||||
run_subscription(
|
||||
Absinthe.run(
|
||||
@query,
|
||||
Schema,
|
||||
variables: %{"userId" => id},
|
||||
context: %{pubsub: PubSub, actor: %{id: id}}
|
||||
context: %{actor: %{id: id}, pubsub: PubSub}
|
||||
)
|
||||
|
||||
mutation = """
|
||||
mutation CreateSubscribable($input: CreateSubscribableInput) {
|
||||
createSubscribable(input: $input) {
|
||||
result{
|
||||
id
|
||||
text
|
||||
}
|
||||
errors{
|
||||
|
@ -41,28 +46,14 @@ defmodule AshGraphql.SubscriptionTest do
|
|||
}
|
||||
"""
|
||||
|
||||
IO.inspect(self())
|
||||
|
||||
assert {:ok, %{data: data}} =
|
||||
run_subscription(mutation, Schema,
|
||||
variables: %{"input" => %{"text" => "foo"}},
|
||||
context: %{pubsub: PubSub}
|
||||
)
|
||||
Absinthe.run(mutation, Schema, variables: %{"input" => %{"text" => "foo"}})
|
||||
|
||||
assert_receive({:broadcast, absinthe_proxy, data, fields})
|
||||
end
|
||||
assert Enum.empty?(data["createSubscribable"]["errors"])
|
||||
|
||||
defp run_subscription(query, schema, opts) do
|
||||
opts = Keyword.update(opts, :context, %{pubsub: PubSub}, &Map.put(&1, :pubsub, PubSub))
|
||||
assert_receive({^topic, data})
|
||||
|
||||
case Absinthe.run(query, schema, opts) do
|
||||
# |> IO.inspect(label: :absinthe_run) do
|
||||
{:ok, %{"subscribed" => topic}} = val ->
|
||||
PubSub.subscribe(topic)
|
||||
val
|
||||
|
||||
val ->
|
||||
val
|
||||
end
|
||||
assert data["createSubscribable"]["result"]["id"] ==
|
||||
data["subscribableCreated"]["created"]["id"]
|
||||
end
|
||||
end
|
||||
|
|
|
@ -45,5 +45,6 @@ defmodule AshGraphql.Test.Domain do
|
|||
resource(AshGraphql.Test.Message)
|
||||
resource(AshGraphql.Test.TextMessage)
|
||||
resource(AshGraphql.Test.ImageMessage)
|
||||
resource(AshGraphql.Test.Subscribable)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -6,55 +6,26 @@ defmodule AshGraphql.Test.PubSub do
|
|||
end
|
||||
|
||||
def node_name() do
|
||||
node()
|
||||
Atom.to_string(node())
|
||||
end
|
||||
|
||||
def subscribe(topic) do
|
||||
# IO.inspect([topic: topic], label: "subscribe")
|
||||
Registry.register(__MODULE__, topic, [self()])
|
||||
def subscribe(_topic) do
|
||||
:ok
|
||||
end
|
||||
|
||||
defdelegate run_docset(pubsub, docs_and_topics, mutation_result),
|
||||
to: AshGraphql.Subscription.Runner
|
||||
|
||||
def publish_subscription(topic, data) do
|
||||
message =
|
||||
%{
|
||||
topic: topic,
|
||||
event: "subscription:data",
|
||||
result: data
|
||||
}
|
||||
|
||||
# |> IO.inspect(label: :publish_subscription)
|
||||
|
||||
Registry.dispatch(__MODULE__, topic, fn entries ->
|
||||
for {pid, _} <- entries, do: send(pid, {:broadcast, message})
|
||||
end)
|
||||
end
|
||||
|
||||
def broadcast(topic, event, notification) do
|
||||
# IO.inspect([topic: topic, event: event, notification: notification], label: "broadcast")
|
||||
|
||||
message =
|
||||
%{
|
||||
topic: topic,
|
||||
event: event,
|
||||
result: notification
|
||||
}
|
||||
|
||||
Registry.dispatch(__MODULE__, topic, fn entries ->
|
||||
for {pid, _} <- entries, do: send(pid, {:broadcast, message})
|
||||
end)
|
||||
end
|
||||
|
||||
def publish_mutation(proxy_topic, mutation_result, subscribed_fields) do
|
||||
# this pubsub is local and doesn't support clusters
|
||||
IO.inspect("publish mutation")
|
||||
|
||||
send(
|
||||
Application.get_env(__MODULE__, :notifier_test_pid) |> IO.inspect(label: :send_to),
|
||||
{:broadcast, proxy_topic, mutation_result, subscribed_fields}
|
||||
Application.get_env(__MODULE__, :notifier_test_pid),
|
||||
{topic, data}
|
||||
)
|
||||
|> IO.inspect(label: :send)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,29 +0,0 @@
|
|||
defmodule AshGraphql.Test.Registry do
|
||||
@moduledoc false
|
||||
use Ash.Registry
|
||||
|
||||
entries do
|
||||
entry(AshGraphql.Test.Comment)
|
||||
entry(AshGraphql.Test.CompositePrimaryKey)
|
||||
entry(AshGraphql.Test.CompositePrimaryKeyNotEncoded)
|
||||
entry(AshGraphql.Test.DoubleRelRecursive)
|
||||
entry(AshGraphql.Test.DoubleRelToRecursiveParentOfEmbed)
|
||||
entry(AshGraphql.Test.MapTypes)
|
||||
entry(AshGraphql.Test.MultitenantPostTag)
|
||||
entry(AshGraphql.Test.MultitenantTag)
|
||||
entry(AshGraphql.Test.NoObject)
|
||||
entry(AshGraphql.Test.NonIdPrimaryKey)
|
||||
entry(AshGraphql.Test.Post)
|
||||
entry(AshGraphql.Test.PostTag)
|
||||
entry(AshGraphql.Test.RelayPostTag)
|
||||
entry(AshGraphql.Test.RelayTag)
|
||||
entry(AshGraphql.Test.SponsoredComment)
|
||||
entry(AshGraphql.Test.Subscribable)
|
||||
entry(AshGraphql.Test.Tag)
|
||||
entry(AshGraphql.Test.User)
|
||||
entry(AshGraphql.Test.Channel)
|
||||
entry(AshGraphql.Test.Message)
|
||||
entry(AshGraphql.Test.TextMessage)
|
||||
entry(AshGraphql.Test.ImageMessage)
|
||||
end
|
||||
end
|
|
@ -136,7 +136,6 @@ defmodule AshGraphql.Test.Post do
|
|||
domain: AshGraphql.Test.Domain,
|
||||
data_layer: Ash.DataLayer.Ets,
|
||||
authorizers: [Ash.Policy.Authorizer],
|
||||
simple_notifiers: [AshGraphql.Resource.Notifier],
|
||||
extensions: [AshGraphql.Resource]
|
||||
|
||||
require Ash.Query
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
defmodule AshGraphql.Test.Subscribable do
|
||||
@moduledoc false
|
||||
use Ash.Resource,
|
||||
domain: AshGraphql.Test.Domain,
|
||||
data_layer: Ash.DataLayer.Ets,
|
||||
extensions: [AshGraphql.Resource]
|
||||
|
||||
|
@ -18,21 +19,21 @@ defmodule AshGraphql.Test.Subscribable do
|
|||
end
|
||||
|
||||
subscriptions do
|
||||
subscribe(:subscribable_created, fn _, _ ->
|
||||
IO.inspect("bucket_created")
|
||||
{:ok, topic: "*"}
|
||||
end)
|
||||
pubsub(AshGraphql.Test.PubSub)
|
||||
|
||||
subscribe(:subscribable_created)
|
||||
end
|
||||
end
|
||||
|
||||
actions do
|
||||
default_accept(:*)
|
||||
defaults([:create, :read, :update, :destroy])
|
||||
end
|
||||
|
||||
attributes do
|
||||
uuid_primary_key(:id)
|
||||
|
||||
attribute(:text, :string)
|
||||
attribute(:text, :string, public?: true)
|
||||
create_timestamp(:created_at)
|
||||
update_timestamp(:updated_at)
|
||||
end
|
||||
|
|
|
@ -7,10 +7,6 @@ defmodule AshGraphql.Test.Schema do
|
|||
|
||||
use AshGraphql, domains: @domains, generate_sdl_file: "priv/schema.graphql"
|
||||
|
||||
alias AshGraphql.Test.Post
|
||||
|
||||
require Ash.Query
|
||||
|
||||
query do
|
||||
end
|
||||
|
||||
|
@ -33,22 +29,5 @@ defmodule AshGraphql.Test.Schema do
|
|||
end
|
||||
|
||||
subscription do
|
||||
field :subscribable_created, :subscribable do
|
||||
config(fn
|
||||
_args, _info ->
|
||||
{:ok, topic: "*"}
|
||||
end)
|
||||
|
||||
resolve(fn args, _, resolution ->
|
||||
# loads all the data you need
|
||||
AshGraphql.Subscription.query_for_subscription(
|
||||
Post,
|
||||
Api,
|
||||
resolution
|
||||
)
|
||||
|> Ash.Query.filter(id == ^args.id)
|
||||
|> Ash.read(actor: resolution.context.current_user)
|
||||
end)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue