Add postgres database triggers and use them to notify subscribers.

This commit is contained in:
James Harton 2018-04-08 16:18:48 +12:00
parent 39373e2147
commit c20d4b453e
4 changed files with 128 additions and 14 deletions

View file

@ -1,17 +1,26 @@
# Extra for experts
# More extra for experts
Our little React app is pretty neat. When it loads it queries the server for a
list of people and shows their faces and allows us to run a mutation to add a
new person to our list.
Our existing subscription only triggers if someone else uses the `importPerson`
mutation to trigger the subscription event - what if we want to let clients know
whenever a row is added to our `people` table? Maybe it's added by another
service, or something.
There's only one problem. What happens if someone else comes along and adds a
face to our gallery? Our client won't know that the data has changed and will
just keep showing us stale data. This is where [GraphQL
Subscriptions](http://graphql.org/blog/subscriptions-in-graphql-and-relay/) come
in.
In this branch we're using Postgres triggers and stored procedures to send
notifications whenever a row is added to the `people` table and we're using
`Postgrex.Notifications` to subscribe to them. A substantian portion of the
code for this example comes from [this hackernoon post](https://hackernoon.com/get-notified-of-user-signups-and-plan-changes-automatically-using-postgres-phoenix-pubsub-e67d061b04bc).
Absinthe has built-in support for subscriptions and since we're using Phoenix we
have a well-tested channel implementation to run it over (Absinthe can use it's
own socket protocol too, if you're not running in Phoenix).
Things to look at:
Next, take a look at `step-6`.
* `priv/repo/migrations/20180408032823_broadcast_people_table_changes.exs`
* `lib/faces/gallery/event_listener.ex`
[Demo](http://localhost:4000)
Let's try manually inserting a row (via `psql faces_dev`):
```sql
INSERT INTO "people" ("username", "name", "location", "avatar_url", "inserted_at", "updated_at")
VALUES ('pupper', 'pupper', 'the bath', 'https://media.giphy.com/media/3o6Zt9ved5rCuidNlK/giphy.gif', NOW(), NOW());
```
Next, move on to `step-7`.

View file

@ -14,7 +14,8 @@ defmodule Faces.Application do
supervisor(FacesWeb.Endpoint, []),
# Start your own worker by calling: Faces.Worker.start_link(arg1, arg2, arg3)
# worker(Faces.Worker, [arg1, arg2, arg3]),
supervisor(Absinthe.Subscription, [FacesWeb.Endpoint])
supervisor(Absinthe.Subscription, [FacesWeb.Endpoint]),
worker(Faces.Gallery.EventListener, [])
]
# See https://hexdocs.pm/elixir/Supervisor.html

View file

@ -0,0 +1,57 @@
defmodule Faces.Gallery.EventListener do
use GenServer
require Logger
alias Absinthe.Subscription
@notification_channel "people_changes"
def start_link, do: GenServer.start_link(__MODULE__, [])
def init(_opts) do
pg_config = Faces.Repo.config()
{:ok, pid} = Postgrex.Notifications.start_link(pg_config)
{:ok, ref} = Postgrex.Notifications.listen(pid, @notification_channel)
Logger.debug(fn -> "#{__MODULE__} subscribed to channel #{@notification_channel}" end)
{:ok, {pid, @notification_channel, ref}}
end
def handle_info({:notification, _pid, _ref, @notification_channel, payload}, _state) do
with {:ok, event} <- Poison.decode(payload),
{:ok, person} <- strip_person_from_event(event),
:ok <- publish_via_absinthe(person) do
{:noreply, :event_handled}
else
_error -> {:noreply, :event_received}
end
end
def handle_info(_, _state), do: {:noreply, :event_received}
defp strip_person_from_event(%{"new_row_data" => row_data}) do
person =
row_data
|> Map.take(["id", "username", "name", "location", "avatar_url"])
|> Map.put("inserted_at", cooerce_time(Map.get(row_data, "inserted_at")))
|> Map.put("updated_at", cooerce_time(Map.get(row_data, "updated_at")))
|> Enum.reduce(%{}, fn {key, value}, map -> Map.put(map, String.to_atom(key), value) end)
{:ok, person}
end
defp strip_person_from_event(_), do: {:error, "Invalid event data"}
defp publish_via_absinthe(person) do
Subscription.publish(FacesWeb.Endpoint, person, person_added: "person_added")
end
defp cooerce_time(nil), do: DateTime.utc_now()
defp cooerce_time(iso8601) do
with {:ok, ndt} <- NaiveDateTime.from_iso8601(iso8601),
{:ok, dt} <- DateTime.from_naive(ndt, "Etc/UTC") do
dt
else
{:error, reason} -> {:error, reason}
end
end
end

View file

@ -0,0 +1,47 @@
defmodule Faces.Repo.Migrations.BroadcastPeopleTableChanges do
use Ecto.Migration
def up do
# Create a function that broadcasts row changes
execute("
CREATE OR REPLACE FUNCTION broadcast_changes()
RETURNS trigger AS $$
DECLARE
current_row RECORD;
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
current_row := NEW;
ELSE
current_row := OLD;
END IF;
IF (TG_OP = 'INSERT') THEN
OLD := NEW;
END IF;
PERFORM pg_notify(
'people_changes',
json_build_object(
'table', TG_TABLE_NAME,
'type', TG_OP,
'id', current_row.id,
'new_row_data', row_to_json(NEW),
'old_row_data', row_to_json(OLD)
)::text
);
RETURN current_row;
END;
$$ LANGUAGE plpgsql;")
# Create a trigger links the people table to the broadcast function
execute("
CREATE TRIGGER notify_people_changes_trigger
AFTER INSERT OR UPDATE
ON people
FOR EACH ROW
EXECUTE PROCEDURE broadcast_changes();")
end
def down do
execute("DROP TRIGGER IF EXISTS notify_people_changes_trigger ON people;")
execute("DROP FUNCTION IF EXISTS broadcast_changes;")
end
end