From c20d4b453e46e5befbb392e1e0855236c4e5f7ab Mon Sep 17 00:00:00 2001 From: James Harton Date: Sun, 8 Apr 2018 16:18:48 +1200 Subject: [PATCH] Add postgres database triggers and use them to notify subscribers. --- README.md | 35 +++++++----- lib/faces/application.ex | 3 +- lib/faces/gallery/event_listener.ex | 57 +++++++++++++++++++ ...8032823_broadcast_people_table_changes.exs | 47 +++++++++++++++ 4 files changed, 128 insertions(+), 14 deletions(-) create mode 100644 lib/faces/gallery/event_listener.ex create mode 100644 priv/repo/migrations/20180408032823_broadcast_people_table_changes.exs diff --git a/README.md b/README.md index 427aec4..a633ce3 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,26 @@ -# Extra for experts +# More extra for experts -Our little React app is pretty neat. When it loads it queries the server for a -list of people and shows their faces and allows us to run a mutation to add a -new person to our list. +Our existing subscription only triggers if someone else uses the `importPerson` +mutation to trigger the subscription event - what if we want to let clients know +whenever a row is added to our `people` table? Maybe it's added by another +service, or something. -There's only one problem. What happens if someone else comes along and adds a -face to our gallery? Our client won't know that the data has changed and will -just keep showing us stale data. This is where [GraphQL -Subscriptions](http://graphql.org/blog/subscriptions-in-graphql-and-relay/) come -in. +In this branch we're using Postgres triggers and stored procedures to send +notifications whenever a row is added to the `people` table and we're using +`Postgrex.Notifications` to subscribe to them. A substantian portion of the +code for this example comes from [this hackernoon post](https://hackernoon.com/get-notified-of-user-signups-and-plan-changes-automatically-using-postgres-phoenix-pubsub-e67d061b04bc). -Absinthe has built-in support for subscriptions and since we're using Phoenix we -have a well-tested channel implementation to run it over (Absinthe can use it's -own socket protocol too, if you're not running in Phoenix). +Things to look at: -Next, take a look at `step-6`. + * `priv/repo/migrations/20180408032823_broadcast_people_table_changes.exs` + * `lib/faces/gallery/event_listener.ex` + +[Demo](http://localhost:4000) + +Let's try manually inserting a row (via `psql faces_dev`): +```sql +INSERT INTO "people" ("username", "name", "location", "avatar_url", "inserted_at", "updated_at") +VALUES ('pupper', 'pupper', 'the bath', 'https://media.giphy.com/media/3o6Zt9ved5rCuidNlK/giphy.gif', NOW(), NOW()); +``` + +Next, move on to `step-7`. diff --git a/lib/faces/application.ex b/lib/faces/application.ex index f58c382..e445be7 100644 --- a/lib/faces/application.ex +++ b/lib/faces/application.ex @@ -14,7 +14,8 @@ defmodule Faces.Application do supervisor(FacesWeb.Endpoint, []), # Start your own worker by calling: Faces.Worker.start_link(arg1, arg2, arg3) # worker(Faces.Worker, [arg1, arg2, arg3]), - supervisor(Absinthe.Subscription, [FacesWeb.Endpoint]) + supervisor(Absinthe.Subscription, [FacesWeb.Endpoint]), + worker(Faces.Gallery.EventListener, []) ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/faces/gallery/event_listener.ex b/lib/faces/gallery/event_listener.ex new file mode 100644 index 0000000..2a4cb13 --- /dev/null +++ b/lib/faces/gallery/event_listener.ex @@ -0,0 +1,57 @@ +defmodule Faces.Gallery.EventListener do + use GenServer + require Logger + alias Absinthe.Subscription + + @notification_channel "people_changes" + + def start_link, do: GenServer.start_link(__MODULE__, []) + + def init(_opts) do + pg_config = Faces.Repo.config() + {:ok, pid} = Postgrex.Notifications.start_link(pg_config) + {:ok, ref} = Postgrex.Notifications.listen(pid, @notification_channel) + Logger.debug(fn -> "#{__MODULE__} subscribed to channel #{@notification_channel}" end) + {:ok, {pid, @notification_channel, ref}} + end + + def handle_info({:notification, _pid, _ref, @notification_channel, payload}, _state) do + with {:ok, event} <- Poison.decode(payload), + {:ok, person} <- strip_person_from_event(event), + :ok <- publish_via_absinthe(person) do + {:noreply, :event_handled} + else + _error -> {:noreply, :event_received} + end + end + + def handle_info(_, _state), do: {:noreply, :event_received} + + defp strip_person_from_event(%{"new_row_data" => row_data}) do + person = + row_data + |> Map.take(["id", "username", "name", "location", "avatar_url"]) + |> Map.put("inserted_at", cooerce_time(Map.get(row_data, "inserted_at"))) + |> Map.put("updated_at", cooerce_time(Map.get(row_data, "updated_at"))) + |> Enum.reduce(%{}, fn {key, value}, map -> Map.put(map, String.to_atom(key), value) end) + + {:ok, person} + end + + defp strip_person_from_event(_), do: {:error, "Invalid event data"} + + defp publish_via_absinthe(person) do + Subscription.publish(FacesWeb.Endpoint, person, person_added: "person_added") + end + + defp cooerce_time(nil), do: DateTime.utc_now() + + defp cooerce_time(iso8601) do + with {:ok, ndt} <- NaiveDateTime.from_iso8601(iso8601), + {:ok, dt} <- DateTime.from_naive(ndt, "Etc/UTC") do + dt + else + {:error, reason} -> {:error, reason} + end + end +end diff --git a/priv/repo/migrations/20180408032823_broadcast_people_table_changes.exs b/priv/repo/migrations/20180408032823_broadcast_people_table_changes.exs new file mode 100644 index 0000000..dc2a136 --- /dev/null +++ b/priv/repo/migrations/20180408032823_broadcast_people_table_changes.exs @@ -0,0 +1,47 @@ +defmodule Faces.Repo.Migrations.BroadcastPeopleTableChanges do + use Ecto.Migration + + def up do + # Create a function that broadcasts row changes + execute(" + CREATE OR REPLACE FUNCTION broadcast_changes() + RETURNS trigger AS $$ + DECLARE + current_row RECORD; + BEGIN + IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN + current_row := NEW; + ELSE + current_row := OLD; + END IF; + IF (TG_OP = 'INSERT') THEN + OLD := NEW; + END IF; + PERFORM pg_notify( + 'people_changes', + json_build_object( + 'table', TG_TABLE_NAME, + 'type', TG_OP, + 'id', current_row.id, + 'new_row_data', row_to_json(NEW), + 'old_row_data', row_to_json(OLD) + )::text + ); + RETURN current_row; + END; + $$ LANGUAGE plpgsql;") + + # Create a trigger links the people table to the broadcast function + execute(" + CREATE TRIGGER notify_people_changes_trigger + AFTER INSERT OR UPDATE + ON people + FOR EACH ROW + EXECUTE PROCEDURE broadcast_changes();") + end + + def down do + execute("DROP TRIGGER IF EXISTS notify_people_changes_trigger ON people;") + execute("DROP FUNCTION IF EXISTS broadcast_changes;") + end +end