diff --git a/config/config.exs b/config/config.exs index 79c376b..0c6831b 100644 --- a/config/config.exs +++ b/config/config.exs @@ -7,7 +7,7 @@ config :ash, :validate_domain_config_inclusion?, false config :logger, level: :warning config :ash, :pub_sub, debug?: true -config :logger, level: :debug +config :logger, level: :info if Mix.env() == :dev do config :git_ops, diff --git a/lib/subscription/config.ex b/lib/subscription/config.ex index a732bdf..d45b174 100644 --- a/lib/subscription/config.ex +++ b/lib/subscription/config.ex @@ -14,8 +14,6 @@ defmodule AshGraphql.Subscription.Config do read_action = @subscription.read_action || Ash.Resource.Info.primary_action!(@resource, :read).name - dbg(@subscription) - actor = case @subscription.actor do {module, opts} -> diff --git a/lib/subscription/endpoint.ex b/lib/subscription/endpoint.ex index cda2e1a..14fc393 100644 --- a/lib/subscription/endpoint.ex +++ b/lib/subscription/endpoint.ex @@ -3,65 +3,8 @@ defmodule AshGraphql.Subscription.Endpoint do quote do use Absinthe.Phoenix.Endpoint - alias Absinthe.Pipeline.BatchResolver - - require Logger - - def run_docset(pubsub, docs_and_topics, notification) do - for {topic, key_strategy, doc} <- docs_and_topics do - try do - pipeline = - Absinthe.Subscription.Local.pipeline(doc, notification) - - {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) - - Logger.debug(""" - Absinthe Subscription Publication - Field Topic: #{inspect(key_strategy)} - Subscription id: #{inspect(topic)} - Data: #{inspect(data)} - """) - - case should_send?(data) do - false -> - :ok - - true -> - :ok = pubsub.publish_subscription(topic, data) - end - rescue - e -> - BatchResolver.pipeline_error(e, __STACKTRACE__) - end - end - end - - defp should_send?(%{errors: errors}) do - # if the user is not allowed to see the data or the query didn't - # return any data we do not send the error to the client - # because it would just expose unnecessary information - # and the user can not really do anything usefull with it - not (errors - |> List.wrap() - |> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found"] end)) - end - - defp should_send?(_), do: true - - defp get_filter(topic) do - [_, rest] = String.split(topic, "__absinthe__:doc:") - [filter, _] = String.split(rest, ":") - - case Base.decode64(filter) do - {:ok, filter} -> - :erlang.binary_to_term(filter) - - _ -> - nil - end - rescue - _ -> nil - end + defdelegate run_docset(pubsub, docs_and_topics, notification), + to: AshGraphql.Subscription.Runner end end end diff --git a/lib/subscription/notifier.ex b/lib/subscription/notifier.ex index 0644f3f..3c91da1 100644 --- a/lib/subscription/notifier.ex +++ b/lib/subscription/notifier.ex @@ -12,5 +12,7 @@ defmodule AshGraphql.Subscription.Notifier do Absinthe.Subscription.publish(pub_sub, notification, [{subscription.name, "*"}]) end end + + :ok end end diff --git a/lib/subscription/runner.ex b/lib/subscription/runner.ex new file mode 100644 index 0000000..fc39934 --- /dev/null +++ b/lib/subscription/runner.ex @@ -0,0 +1,46 @@ +defmodule AshGraphql.Subscription.Runner do + alias Absinthe.Pipeline.BatchResolver + + require Logger + + def run_docset(pubsub, docs_and_topics, notification) do + for {topic, key_strategy, doc} <- docs_and_topics do + try do + pipeline = + Absinthe.Subscription.Local.pipeline(doc, notification) + + {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) + + Logger.debug(""" + Absinthe Subscription Publication + Field Topic: #{inspect(key_strategy)} + Subscription id: #{inspect(topic)} + Data: #{inspect(data)} + """) + + case should_send?(data) do + false -> + :ok + + true -> + :ok = pubsub.publish_subscription(topic, data) + end + rescue + e -> + BatchResolver.pipeline_error(e, __STACKTRACE__) + end + end + end + + defp should_send?(%{errors: errors}) do + # if the user is not allowed to see the data or the query didn't + # return any data we do not send the error to the client + # because it would just expose unnecessary information + # and the user can not really do anything usefull with it + not (errors + |> List.wrap() + |> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found"] end)) + end + + defp should_send?(_), do: true +end diff --git a/test/subscription_test.exs b/test/subscription_test.exs index 4c473a3..288b8c1 100644 --- a/test/subscription_test.exs +++ b/test/subscription_test.exs @@ -5,7 +5,7 @@ defmodule AshGraphql.SubscriptionTest do alias AshGraphql.Test.Schema setup do - Application.put_env(PubSub, :notifier_test_pid, self() |> IO.inspect(label: :test_process)) + Application.put_env(PubSub, :notifier_test_pid, self()) {:ok, _} = PubSub.start_link() {:ok, _} = Absinthe.Subscription.start_link(PubSub) :ok @@ -13,25 +13,30 @@ defmodule AshGraphql.SubscriptionTest do @query """ subscription { - subscribableCreated { id } + subscribableCreated { + created { + id + } + } } """ @tag :wip - test "subscription triggers work" do + test "can subscribe to a resource" do id = "1" assert {:ok, %{"subscribed" => topic}} = - run_subscription( + Absinthe.run( @query, Schema, variables: %{"userId" => id}, - context: %{pubsub: PubSub, actor: %{id: id}} + context: %{actor: %{id: id}, pubsub: PubSub} ) mutation = """ mutation CreateSubscribable($input: CreateSubscribableInput) { createSubscribable(input: $input) { result{ + id text } errors{ @@ -41,28 +46,14 @@ defmodule AshGraphql.SubscriptionTest do } """ - IO.inspect(self()) - assert {:ok, %{data: data}} = - run_subscription(mutation, Schema, - variables: %{"input" => %{"text" => "foo"}}, - context: %{pubsub: PubSub} - ) + Absinthe.run(mutation, Schema, variables: %{"input" => %{"text" => "foo"}}) - assert_receive({:broadcast, absinthe_proxy, data, fields}) - end + assert Enum.empty?(data["createSubscribable"]["errors"]) - defp run_subscription(query, schema, opts) do - opts = Keyword.update(opts, :context, %{pubsub: PubSub}, &Map.put(&1, :pubsub, PubSub)) + assert_receive({^topic, data}) - case Absinthe.run(query, schema, opts) do - # |> IO.inspect(label: :absinthe_run) do - {:ok, %{"subscribed" => topic}} = val -> - PubSub.subscribe(topic) - val - - val -> - val - end + assert data["createSubscribable"]["result"]["id"] == + data["subscribableCreated"]["created"]["id"] end end diff --git a/test/support/domain.ex b/test/support/domain.ex index 916b682..74c08ba 100644 --- a/test/support/domain.ex +++ b/test/support/domain.ex @@ -45,5 +45,6 @@ defmodule AshGraphql.Test.Domain do resource(AshGraphql.Test.Message) resource(AshGraphql.Test.TextMessage) resource(AshGraphql.Test.ImageMessage) + resource(AshGraphql.Test.Subscribable) end end diff --git a/test/support/pub_sub.ex b/test/support/pub_sub.ex index 6d02d0f..ce02ef1 100644 --- a/test/support/pub_sub.ex +++ b/test/support/pub_sub.ex @@ -6,55 +6,26 @@ defmodule AshGraphql.Test.PubSub do end def node_name() do - node() + Atom.to_string(node()) end - def subscribe(topic) do - # IO.inspect([topic: topic], label: "subscribe") - Registry.register(__MODULE__, topic, [self()]) + def subscribe(_topic) do :ok end + defdelegate run_docset(pubsub, docs_and_topics, mutation_result), + to: AshGraphql.Subscription.Runner + def publish_subscription(topic, data) do - message = - %{ - topic: topic, - event: "subscription:data", - result: data - } - - # |> IO.inspect(label: :publish_subscription) - - Registry.dispatch(__MODULE__, topic, fn entries -> - for {pid, _} <- entries, do: send(pid, {:broadcast, message}) - end) - end - - def broadcast(topic, event, notification) do - # IO.inspect([topic: topic, event: event, notification: notification], label: "broadcast") - - message = - %{ - topic: topic, - event: event, - result: notification - } - - Registry.dispatch(__MODULE__, topic, fn entries -> - for {pid, _} <- entries, do: send(pid, {:broadcast, message}) - end) - end - - def publish_mutation(proxy_topic, mutation_result, subscribed_fields) do - # this pubsub is local and doesn't support clusters - IO.inspect("publish mutation") - send( - Application.get_env(__MODULE__, :notifier_test_pid) |> IO.inspect(label: :send_to), - {:broadcast, proxy_topic, mutation_result, subscribed_fields} + Application.get_env(__MODULE__, :notifier_test_pid), + {topic, data} ) - |> IO.inspect(label: :send) :ok end + + def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do + :ok + end end diff --git a/test/support/registry.ex b/test/support/registry.ex deleted file mode 100644 index 530d1fc..0000000 --- a/test/support/registry.ex +++ /dev/null @@ -1,29 +0,0 @@ -defmodule AshGraphql.Test.Registry do - @moduledoc false - use Ash.Registry - - entries do - entry(AshGraphql.Test.Comment) - entry(AshGraphql.Test.CompositePrimaryKey) - entry(AshGraphql.Test.CompositePrimaryKeyNotEncoded) - entry(AshGraphql.Test.DoubleRelRecursive) - entry(AshGraphql.Test.DoubleRelToRecursiveParentOfEmbed) - entry(AshGraphql.Test.MapTypes) - entry(AshGraphql.Test.MultitenantPostTag) - entry(AshGraphql.Test.MultitenantTag) - entry(AshGraphql.Test.NoObject) - entry(AshGraphql.Test.NonIdPrimaryKey) - entry(AshGraphql.Test.Post) - entry(AshGraphql.Test.PostTag) - entry(AshGraphql.Test.RelayPostTag) - entry(AshGraphql.Test.RelayTag) - entry(AshGraphql.Test.SponsoredComment) - entry(AshGraphql.Test.Subscribable) - entry(AshGraphql.Test.Tag) - entry(AshGraphql.Test.User) - entry(AshGraphql.Test.Channel) - entry(AshGraphql.Test.Message) - entry(AshGraphql.Test.TextMessage) - entry(AshGraphql.Test.ImageMessage) - end -end diff --git a/test/support/resources/post.ex b/test/support/resources/post.ex index 0240deb..edccef5 100644 --- a/test/support/resources/post.ex +++ b/test/support/resources/post.ex @@ -136,7 +136,6 @@ defmodule AshGraphql.Test.Post do domain: AshGraphql.Test.Domain, data_layer: Ash.DataLayer.Ets, authorizers: [Ash.Policy.Authorizer], - simple_notifiers: [AshGraphql.Resource.Notifier], extensions: [AshGraphql.Resource] require Ash.Query diff --git a/test/support/resources/subscribable.ex b/test/support/resources/subscribable.ex index cd41ac1..bf25b9f 100644 --- a/test/support/resources/subscribable.ex +++ b/test/support/resources/subscribable.ex @@ -1,6 +1,7 @@ defmodule AshGraphql.Test.Subscribable do @moduledoc false use Ash.Resource, + domain: AshGraphql.Test.Domain, data_layer: Ash.DataLayer.Ets, extensions: [AshGraphql.Resource] @@ -18,21 +19,21 @@ defmodule AshGraphql.Test.Subscribable do end subscriptions do - subscribe(:subscribable_created, fn _, _ -> - IO.inspect("bucket_created") - {:ok, topic: "*"} - end) + pubsub(AshGraphql.Test.PubSub) + + subscribe(:subscribable_created) end end actions do + default_accept(:*) defaults([:create, :read, :update, :destroy]) end attributes do uuid_primary_key(:id) - attribute(:text, :string) + attribute(:text, :string, public?: true) create_timestamp(:created_at) update_timestamp(:updated_at) end diff --git a/test/support/schema.ex b/test/support/schema.ex index afc26f5..64b2ca7 100644 --- a/test/support/schema.ex +++ b/test/support/schema.ex @@ -7,10 +7,6 @@ defmodule AshGraphql.Test.Schema do use AshGraphql, domains: @domains, generate_sdl_file: "priv/schema.graphql" - alias AshGraphql.Test.Post - - require Ash.Query - query do end @@ -33,22 +29,5 @@ defmodule AshGraphql.Test.Schema do end subscription do - field :subscribable_created, :subscribable do - config(fn - _args, _info -> - {:ok, topic: "*"} - end) - - resolve(fn args, _, resolution -> - # loads all the data you need - AshGraphql.Subscription.query_for_subscription( - Post, - Api, - resolution - ) - |> Ash.Query.filter(id == ^args.id) - |> Ash.read(actor: resolution.context.current_user) - end) - end end end