From 7b79922606768a6ffde90e0aba3efc995d49b8e2 Mon Sep 17 00:00:00 2001 From: James Harton Date: Sat, 7 Dec 2019 23:58:34 +1300 Subject: [PATCH] Switch from `horde` to `syn`. Masively refactor the `Augie.Device` metaprogramming. --- augie/config/config.exs | 2 + augie/config/releases.exs | 2 + augie/lib/augie/application.ex | 5 +- augie/lib/augie/device_starter.ex | 2 +- augie/lib/augie/devices/device.ex | 404 ++++++++++------------ augie/lib/augie/devices/erlang_system.ex | 5 + augie/lib/augie/devices/horde_cluster.ex | 39 --- augie/lib/augie/devices/syn_cluster.ex | 32 ++ augie/lib/augie/distributed_registry.ex | 20 -- augie/lib/augie/distributed_supervisor.ex | 26 -- augie/lib/augie/node_monitor.ex | 39 --- augie/lib/augie/syn_event_handler.ex | 24 ++ augie/lib/augie/update_lock_manager.ex | 122 ++++--- augie/mix.exs | 4 +- augie/mix.lock | 9 +- 15 files changed, 306 insertions(+), 429 deletions(-) delete mode 100644 augie/lib/augie/devices/horde_cluster.ex create mode 100644 augie/lib/augie/devices/syn_cluster.ex delete mode 100644 augie/lib/augie/distributed_registry.ex delete mode 100644 augie/lib/augie/distributed_supervisor.ex delete mode 100644 augie/lib/augie/node_monitor.ex create mode 100644 augie/lib/augie/syn_event_handler.ex diff --git a/augie/config/config.exs b/augie/config/config.exs index 87c8c8a..124e05f 100644 --- a/augie/config/config.exs +++ b/augie/config/config.exs @@ -25,6 +25,8 @@ config :logger, :console, # Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason +config :syn, event_handler: Augie.SynEventHandler + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --git a/augie/config/releases.exs b/augie/config/releases.exs index 8ebe4b8..6aded49 100644 --- a/augie/config/releases.exs +++ b/augie/config/releases.exs @@ -23,4 +23,6 @@ config :augie, Augie.Device.RaspberryPi, vcgencmd_path: "/opt/vc/bin/vcgencmd", meminfo_path: "/proc/meminfo" +config :augie, Augie.UpdateLockManager, lockfile_location: "/tmp/balena/updates.lock" + # config :logger, level: :debug diff --git a/augie/lib/augie/application.ex b/augie/lib/augie/application.ex index b92bd99..6734313 100644 --- a/augie/lib/augie/application.ex +++ b/augie/lib/augie/application.ex @@ -28,10 +28,7 @@ defmodule Augie.Application do [ AugieWeb.Endpoint, {DynamicSupervisor, name: Augie.LocalSupervisor, strategy: :one_for_one}, - {Augie.DistributedRegistry, keys: :unique, name: Augie.DistributedRegistry}, - {Augie.DistributedSupervisor, - strategy: :one_for_one, name: Augie.DistributedSupervisor}, - Augie.NodeMonitor, + {DynamicSupervisor, name: Augie.DistributedSupervisor, strategy: :one_for_one}, Augie.UpdateLockManager, Augie.DeviceStarter ] diff --git a/augie/lib/augie/device_starter.ex b/augie/lib/augie/device_starter.ex index 4dfb3f1..1ae41cd 100644 --- a/augie/lib/augie/device_starter.ex +++ b/augie/lib/augie/device_starter.ex @@ -13,7 +13,7 @@ defmodule Augie.DeviceStarter do [ Device.ErlangMemory, Device.ErlangSystem, - Device.HordeCluster, + Device.SynCluster, Device.RaspberryPi ] |> Enum.each(&Device.start_device(&1)) diff --git a/augie/lib/augie/devices/device.ex b/augie/lib/augie/devices/device.ex index 1958688..6359e7e 100644 --- a/augie/lib/augie/devices/device.ex +++ b/augie/lib/augie/devices/device.ex @@ -1,5 +1,7 @@ # credo:disable-for-this-file defmodule Augie.Device do + alias Augie.Device + @moduledoc """ This module provides helpful infrastructure needed for all attached devices. """ @@ -49,29 +51,30 @@ defmodule Augie.Device do @optional_callbacks handle_metadata: 2, handle_metric: 2, device_id: 0, distribute: 0 defmacro __using__(_env) do - Module.register_attribute(__CALLER__.module, :metadata, accumulate: true) Module.register_attribute(__CALLER__.module, :metrics, accumulate: true) + Module.register_attribute(__CALLER__.module, :metadata, accumulate: true) quote do - alias Augie.Device - import Device, only: [metric: 1, metric: 2, metadata: 2, metadata: 1] - require Logger - @behaviour Device - @before_compile Device + import Augie.Device, only: [metric: 1, metric: 2, metadata: 2, metadata: 1] + @behaviour Augie.Device + @behaviour GenServer + @before_compile Augie.Device + @impl true def device_id, do: __MODULE__ + + @impl true def distribute, do: :cluster - @impl GenServer - def init(_args) do - device = init_device() - {:ok, %{device: device}} + @impl true + def init(opts) do + {:ok, opts} end @doc false def child_spec(opts) do %{ - id: {Device, device_id()}, + id: {Augie.Device, device_id()}, start: {__MODULE__, :start_link, [opts]}, shutdown: 10_000, restart: :transient @@ -79,167 +82,193 @@ defmodule Augie.Device do end @doc false - def start_link(args), do: GenServer.start_link(__MODULE__, [args]) + def start_link(args), do: GenServer.start_link(__MODULE__.Server, [args]) defoverridable( device_id: 0, distribute: 0, init: 1, - child_spec: 1, - start_link: 1 + child_spec: 1 ) end end defmacro __before_compile__(_env) do - # Define `handle_info/2` for those metrics which require polling. - __CALLER__.module - |> Module.get_attribute(:metrics, []) - |> Enum.filter(fn {_, opts} -> Keyword.has_key?(opts, :poll) end) - |> Enum.each(fn {name, opts} -> - unit = Keyword.get(opts, :unit) + target_module = __CALLER__.module + impl_module = Module.concat(target_module, "Server") + metrics = Module.get_attribute(target_module, :metrics, []) + metadata = Module.get_attribute(target_module, :metadata, []) - Module.eval_quoted( - __CALLER__.module, - quote do - @impl GenServer - def handle_info({:poll, unquote(name)}, state) do - case apply(__MODULE__, :handle_metric, [unquote(name), state]) do - {:ok, value, state} -> - value = if unquote(unit), do: {value, unquote(unit)}, else: value - Augie.Device.publish_metric(device_id(), unquote(name), value) - {:noreply, state} + Module.create( + impl_module, + quote do + use GenServer + @target unquote(target_module) - {:error, _reason, state} -> - {:noreply, state} - end + @impl GenServer + def init(opts) do + device_id = apply(@target, :device_id, []) + + Augie.Device.register_device(device_id) + + timers = + unquote(metrics) + |> Enum.filter(fn {_name, opts} -> Keyword.has_key?(opts, :poll) end) + |> Enum.map(fn {name, opts} -> {name, Keyword.get(opts, :poll)} end) + |> Enum.reduce(%{}, fn {name, interval}, timers -> + {:ok, timer} = :timer.send_interval(interval, {:poll, name}) + Map.put(timers, name, timer) + end) + + case apply(@target, :init, [opts]) do + {:ok, device_state} -> {:ok, %{timers: timers, device: device_state}} + {:ok, device_state, opt} -> {:ok, %{timers: timers, device: device_state}, opt} + :ignore -> :ignore + {:stop, reason} -> {:stop, reason} + end + end + + unquote do + metrics + |> Enum.filter(fn {_, opts} -> Keyword.has_key?(opts, :poll) end) + |> Enum.map(fn {metric, opts} -> + unit = Keyword.get(opts, :unit) + + quote do + @impl GenServer + def handle_info({:poll, unquote(metric)}, state) do + case apply(@target, :handle_metric, [unquote(metric), state.device]) do + {:ok, value, device_state} -> + value = if unquote(unit), do: {value, unquote(unit)}, else: value + + Augie.Device.publish_metric( + apply(@target, :device_id, []), + unquote(metric), + value + ) + + {:noreply, %{state | device: device_state}} + + {:error, _reason, device_state} -> + {:noreply, %{state | device: device_state}} + end + end + end + end) + end + + def handle_info(message, state) do + case apply(@target, :handle_info, [message, state.device]) do + {:noreply, device_state} -> {:noreply, %{state | device: device_state}} + {:noreply, device_state, opt} -> {:noreply, %{state | device: device_state}, opt} + {:stop, reason, device_state} -> {:stop, reason, %{state | device: device_state}} end end - ) - end) - # Define `handle_call/3` for `:get_metrics`. - Module.eval_quoted( - __CALLER__.module, - quote do @impl GenServer def handle_call(:get_metrics, _from, state) do {metrics, state} = - @metrics + unquote(metrics) |> Enum.reduce({%{}, state}, fn {name, opts}, {metrics, state} -> - case apply(__MODULE__, :handle_metric, [name, state]) do - {:ok, value, state} -> + case apply(@target, :handle_metric, [name, state.device]) do + {:ok, value, device_state} -> unit = Keyword.get(opts, :unit) value = if unit, do: {value, unit}, else: value - Augie.Device.publish_metric(device_id(), name, value) - {Map.put(metrics, name, value), state} + Augie.Device.publish_metric(apply(@target, :device_id, []), name, value) + {Map.put(metrics, name, value), %{state | device: device_state}} - {:error, _reason, state} -> - {metrics, state} + {:error, _reason, device_state} -> + {metrics, %{state | device: device_state}} end end) {:reply, {:ok, metrics}, state} end - end - ) - # Define `handle_call/3` for `:get_metadata`. - Module.eval_quoted( - __CALLER__.module, - quote do - @impl GenServer + unquote do + Enum.map(metrics, fn {metric, opts} -> + unit = Keyword.get(opts, :unit) + + quote do + def handle_call({:get_metric, unquote(metric)}, _from, state) do + case apply(@target, :handle_metric, [unquote(metric), state.device]) do + {:ok, value, device_state} -> + value = if unquote(unit), do: {value, unquote(unit)}, else: value + + Augie.Device.publish_metric( + apply(@target, :device_id, []), + unquote(metric), + value + ) + + {:reply, value, %{state | device: device_state}} + + {:error, reason, device_state} -> + {:reply, {:error, reason}, %{state | device: device_state}} + end + end + end + end) + end + def handle_call(:get_metadata, _from, state) do {metadata, state} = - @metadata + unquote(metadata) |> Enum.reduce({%{}, state}, fn {name, opts}, {md, state} -> if Keyword.has_key?(opts, :value) do value = Keyword.get(opts, :value) {Map.put(md, name, value), state} else - case apply(__MODULE__, :handle_metadata, [name, state]) do - {:ok, value, state} -> + case apply(@target, :handle_metadata, [name, state.device]) do + {:ok, value, device_state} -> unit = Keyword.get(opts, :unit) value = if unit, do: {value, unit}, else: value - {Map.put(md, name, value), state} + {Map.put(md, name, value), %{state | device: device_state}} - {:error, _reason, state} -> - {md, state} + {:error, _reason, device_state} -> + {md, %{state | device: device_state}} end end end) {:reply, {:ok, metadata}, state} end - end - ) - # Define `handle_call/3` for `{:get_metric, name}` - __CALLER__.module - |> Module.get_attribute(:metrics, []) - |> Enum.each(fn {name, opts} -> - unit = Keyword.get(opts, :unit) + def handle_call(message, from, state) do + case apply(@target, :handle_call, [message, from, state.device]) do + {:reply, reply, device_state} -> + {:reply, reply, %{state | device: device_state}} - Module.eval_quoted( - __CALLER__.module, - quote do - @impl GenServer - def handle_call({:get_metric, unquote(name)}, _from, state) do - case apply(__MODULE__, :handle_metric, [unquote(name), state]) do - {:ok, value, state} -> - value = if unquote(unit), do: {value, unquote(unit)}, else: value - Augie.Device.publish_metric(device_id(), unquote(name), value) - {:reply, value, state} + {:reply, reply, device_state, opt} -> + {:reply, reply, %{state | device: device_state}, opt} - {:error, reason, state} -> - {:reply, {:error, reason}, state} - end + {:noreply, device_state} -> + {:noreply, %{state | device: device_state}} + + {:noreply, device_state, opt} -> + {:noreply, %{state | device: device_state}, opt} + + {:stop, reason, reply, device_state} -> + {:stop, reason, reply, %{state | device: device_state}} + + {:stop, reason, device_state} -> + {:stop, reason, %{state | device: device_state}} end end - ) - end) - # Define `subscribe/1` and `unsubscribe/1` for all metrics. - __CALLER__.module - |> Module.get_attribute(:metrics, []) - |> Enum.each(fn {name, _opts} -> - Module.eval_quoted( - __CALLER__.module, - quote do - def subscribe(unquote(name)), - do: Augie.Device.add_metric_subscription(device_id(), unquote(name)) - - def unsubscribe(unquote(name)), - do: Augie.Device.remove_metric_subscription(device_id(), unquote(name)) + @impl GenServer + def handle_cast(message, state) do + case apply(@target, :handle_cast, [message, state.device]) do + {:noreply, device_state} -> {:noreply, %{state | device: device_state}} + {:noreply, device_state, opt} -> {:noreply, %{state | device: device_state}, opt} + {:stop, reason, device_state} -> {:stop, reason, %{state | device: device_state}} + end end - ) - end) + end, + Macro.Env.location(__ENV__) + ) - quote do - use GenServer - - @doc """ - Initialises the timers for the device if any metrics require polling, and - registers the device with the Collector. - """ - @spec init_device() :: map - def init_device do - device_id = device_id() - - Augie.Device.register_device(device_id) - - timers = - @metrics - |> Enum.filter(fn {_name, opts} -> Keyword.has_key?(opts, :poll) end) - |> Enum.map(fn {name, opts} -> {name, Keyword.get(opts, :poll)} end) - |> Enum.reduce(%{}, fn {name, interval}, timers -> - {:ok, timer} = :timer.send_interval(interval, {:poll, name}) - Map.put(timers, name, timer) - end) - - %{timers: timers} - end - end + :ok end @doc """ @@ -294,11 +323,7 @@ defmodule Augie.Device do """ @spec add_device_subscription(device_id :: any) :: {:ok, pid} | {:error, reason :: any} def add_device_subscription(device_id) do - Horde.Registry.register( - Augie.DistributedRegistry, - {Device, :device_subscription, device_id, self()}, - self() - ) + :syn.join({Device, :device_subscription, device_id}, self()) end @doc """ @@ -307,10 +332,7 @@ defmodule Augie.Device do """ @spec remove_device_subscription(device_id :: any) :: :ok def remove_device_subscription(device_id) do - Horde.Registry.unregister( - Augie.DistributedRegistry, - {Device, :device_subscription, device_id, self()} - ) + :syn.leave({Device, :device_subscription, device_id}, self()) end @doc """ @@ -320,11 +342,7 @@ defmodule Augie.Device do @spec add_metric_subscription(device_id :: any, metric_name :: atom) :: {:ok, pid} | {:error, reason :: any} def add_metric_subscription(device_id, metric_name) do - Horde.Registry.register( - Augie.DistributedRegistry, - {Device, :metric_subscription, device_id, metric_name, self()}, - self() - ) + :syn.join({Device, :metric_subscription, device_id, metric_name}, self()) end @doc """ @@ -333,10 +351,7 @@ defmodule Augie.Device do """ @spec remove_metric_subscription(device_id :: any, metric_name :: atom) :: :ok def remove_metric_subscription(device_id, metric_name) do - Horde.Registry.unregister( - Augie.DistributedRegistry, - {Device, :metric_subscription, device_id, metric_name, self()} - ) + :syn.leave({Device, :metric_subscription, device_id, metric_name}, self()) end @doc """ @@ -345,11 +360,7 @@ defmodule Augie.Device do """ @spec add_device_registration_subscription() :: :ok def add_device_registration_subscription do - Horde.Registry.register( - Augie.DistributedRegistry, - {Device, :registration_subscription, self()}, - self() - ) + :syn.join({Device, :registration_subscription}, self()) end @doc """ @@ -357,10 +368,7 @@ defmodule Augie.Device do """ @spec remove_device_registration_subscription() :: :ok def remove_device_registration_subscription do - Horde.Registry.unregister( - Augie.DistributedRegistry, - {Device, :registration_subscription, self()} - ) + :syn.leave({Device, :registration_subscription}, self()) end @doc """ @@ -372,12 +380,9 @@ defmodule Augie.Device do """ @spec publish_metric(device_id :: any, metric_name :: atom, sample :: any) :: :ok def publish_metric(device_id, metric_name, sample) do - Augie.DistributedRegistry - |> Horde.Registry.select([ - {{{Device, :device_subscription, device_id, :"$1"}, :_, :_}, [], [:"$1"]}, - {{{Device, :metric_subscription, device_id, metric_name, :"$1"}, :_, :_}, [], [:"$1"]} - ]) - |> Enum.uniq() + :syn.get_members({Device, :device_subscription, device_id}) + |> Stream.concat(:syn.get_members({Device, :metric_subscription, device_id, metric_name})) + |> Stream.uniq() |> Enum.each(fn pid -> send(pid, {:metric, device_id, metric_name, sample}) end) @@ -388,16 +393,12 @@ defmodule Augie.Device do @doc """ Register a device as being present on the system. """ - @spec register_device(device_id :: any) :: {:ok, pid} | {:error, reason :: any} + @spec register_device(device_id :: any) :: :ok | {:error, reason :: any} def register_device(device_id) do - with {:ok, pid} <- - Horde.Registry.register( - Augie.DistributedRegistry, - {Device, :device, device_id}, - self() - ), + with :ok <- :syn.register({Device, :device, device_id}, self()), + :ok <- :syn.join({Device, :device}, self(), device_id), :ok <- publish_device_registation(device_id), - do: {:ok, pid} + do: :ok end @doc """ @@ -408,22 +409,12 @@ defmodule Augie.Device do register the device again on the new node and the old registration will be overwritten. """ - @spec unregister_device(device_id :: any) :: :ok + @spec unregister_device(device_id :: any) :: :ok | {:error, reason :: any} def unregister_device(device_id) do - Augie.DistributedRegistry - |> Horde.Registry.select([ - { - {:"$1", :_, :_}, - [ - {:==, {:element, 1, :"$1"}, Device}, - {:==, {:element, 3, :"$1"}, device_id} - ], - [:"$1"] - } - ]) - |> Enum.each(&Horde.Registry.unregister(Augie.DistributedRegistry, &1)) - - publish_device_deregistration(device_id) + with :ok <- :syn.leave({Device, :device}, self()), + :ok <- :syn.unregister({Device, :device, device_id}), + :ok <- publish_device_deregistration(device_id), + do: :ok end @doc """ @@ -431,12 +422,9 @@ defmodule Augie.Device do """ @spec registered_devices() :: [device_id :: any] def registered_devices do - Augie.DistributedRegistry - |> Horde.Registry.select([ - {{{Device, :device, :"$1"}, :"$2", :_}, [], [[:"$1", :"$2"]]} - ]) - |> Enum.filter(fn [_, pid] -> process_alive?(pid) end) - |> Enum.map(fn [device_id, _] -> device_id end) + {Device, :device} + |> :syn.get_members(:with_meta) + |> Enum.map(&elem(&1, 1)) end @doc """ @@ -454,7 +442,7 @@ defmodule Augie.Device do This will take into account the module's preferred distribution stragety and start it on the appropriate supervisor. - *Note:* these options will be passed to `child_spec/1` and `device_key/1`. + *Note:* these options will be passed to `child_spec/1`. """ @spec start_device({module :: atom, opts :: [Keyword.t()]}) :: {:ok, pid} | {:error, reason :: any} @@ -463,7 +451,7 @@ defmodule Augie.Device do case apply(module, :distribute, []) do :node -> DynamicSupervisor.start_child(Augie.LocalSupervisor, spec) - :cluster -> Horde.DynamicSupervisor.start_child(Augie.DistributedSupervisor, spec) + :cluster -> DynamicSupervisor.start_child(Augie.DistributedSupervisor, spec) end end @@ -472,10 +460,7 @@ defmodule Augie.Device do """ @spec get_metadata(device_id :: any) :: {:ok, Map.t()} | {:error, reason :: any} def get_metadata(device_id) do - GenServer.call( - {:via, Horde.Registry, {Augie.DistributedRegistry, {Device, :device, device_id}}}, - :get_metadata - ) + GenServer.call({:via, :syn, {Device, :device, device_id}}, :get_metadata) end @doc """ @@ -483,10 +468,7 @@ defmodule Augie.Device do """ @spec get_metrics(device_id :: any) :: {:ok, Map.t()} | {:error, reason :: any} def get_metrics(device_id) do - GenServer.call( - {:via, Horde.Registry, {Augie.DistributedRegistry, {Device, :device, device_id}}}, - :get_metrics - ) + GenServer.call({:via, :syn, {Device, :device, device_id}}, :get_metrics) end @doc """ @@ -495,46 +477,14 @@ defmodule Augie.Device do @spec get_metric(device_id :: any, metric_name :: atom) :: {:ok, value :: any} | {:error, reason :: any} def get_metric(device_id, metric_name) do - GenServer.call( - {:via, Horde.Registry, {Augie.DistributedRegistry, {Device, :device, device_id}}}, - {:get_metric, metric_name} - ) + GenServer.call({:via, :syn, {Device, :device, device_id}}, {:get_metric, metric_name}) end defp publish_device_registation(device_id) do - Task.start(fn -> - :timer.sleep(250) - - Augie.DistributedRegistry - |> Horde.Registry.select([ - {{{Device, :registration_subscription, :"$1"}, :_, :_}, [], [:"$1"]} - ]) - |> Enum.uniq() - |> Enum.each(&send(&1, {:device_registered, device_id})) - end) - - :ok + :syn.publish({Device, :registration_subscription}, {:device_registered, device_id}) end defp publish_device_deregistration(device_id) do - Task.start(fn -> - :timer.sleep(250) - - Augie.DistributedRegistry - |> Horde.Registry.select([ - {{{Device, :registration_subscription, :"$1"}, :_, :_}, [], [:"$1"]} - ]) - |> Enum.uniq() - |> Enum.each(&send(&1, {:device_unregistered, device_id})) - end) - - :ok - end - - defp process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid) - - defp process_alive?(pid) do - n = node(pid) - Node.list() |> Enum.member?(n) && :rpc.call(n, Process, :alive?, [pid]) + :syn.publish({Device, :registration_subscription}, {:device_unregistered, device_id}) end end diff --git a/augie/lib/augie/devices/erlang_system.ex b/augie/lib/augie/devices/erlang_system.ex index a8dc7c5..fa43dd0 100644 --- a/augie/lib/augie/devices/erlang_system.ex +++ b/augie/lib/augie/devices/erlang_system.ex @@ -11,6 +11,7 @@ defmodule Augie.Device.ErlangSystem do metadata(:process_limit) metadata(:schedulers) metadata(:version) + metadata(:node) metric(:port_count, poll: 5_000) metric(:process_count, poll: 5_000) @@ -26,6 +27,10 @@ defmodule Augie.Device.ErlangSystem do def device_id, do: {__MODULE__, node()} @impl true + def handle_metadata(:node, state) do + {:ok, node(), state} + end + def handle_metadata(name, state) do {:ok, ExErlstats.system_info(name), state} end diff --git a/augie/lib/augie/devices/horde_cluster.ex b/augie/lib/augie/devices/horde_cluster.ex deleted file mode 100644 index 7ff1e8e..0000000 --- a/augie/lib/augie/devices/horde_cluster.ex +++ /dev/null @@ -1,39 +0,0 @@ -defmodule Augie.Device.HordeCluster do - use Augie.Device - - @moduledoc """ - Shows some simple information about the Horde cluster. - """ - - metadata(:name, value: "Horde Cluster") - - metric(:node_count, poll: 5_000) - metric(:registered_names, poll: 5_000) - metric(:distributed_processes, poll: 5_000) - - @impl true - def distribute, do: :cluster - - @impl true - def handle_metric(:node_count, state) do - count = - Augie.DistributedRegistry - |> Horde.Cluster.members() - |> length() - - {:ok, count, state} - end - - def handle_metric(:registered_names, state) do - {:ok, Horde.Registry.count(Augie.DistributedRegistry), state} - end - - def handle_metric(:distributed_processes, state) do - count = - Augie.DistributedSupervisor - |> Horde.DynamicSupervisor.count_children() - |> Map.get(:active, 0) - - {:ok, count, state} - end -end diff --git a/augie/lib/augie/devices/syn_cluster.ex b/augie/lib/augie/devices/syn_cluster.ex new file mode 100644 index 0000000..ceeb585 --- /dev/null +++ b/augie/lib/augie/devices/syn_cluster.ex @@ -0,0 +1,32 @@ +defmodule Augie.Device.SynCluster do + use Augie.Device + + @moduledoc """ + Shows some simple information about the Horde cluster. + """ + + metadata(:name, value: "Syn Cluster") + + metric(:registered_names, poll: 5_000) + metric(:distributed_processes, poll: 5_000) + + @impl true + def distribute, do: :cluster + + @impl true + def handle_metric(:registered_names, state) do + {:ok, :syn.registry_count(), state} + end + + def handle_metric(:distributed_processes, state) do + count = 1985 + + {:ok, count, state} + end + + # It doesn't matter which node runs this process, as long as it's running + # somewhere. + def handle_call({:resolve_conflict, {_pid1, _meta1}, {pid2, _meta2}}, _from, state) do + {:stop, :normal, pid2, state} + end +end diff --git a/augie/lib/augie/distributed_registry.ex b/augie/lib/augie/distributed_registry.ex deleted file mode 100644 index 4bcd8a8..0000000 --- a/augie/lib/augie/distributed_registry.ex +++ /dev/null @@ -1,20 +0,0 @@ -defmodule Augie.DistributedRegistry do - use Horde.Registry - - @moduledoc """ - This is the Horde name registry which we share on the cluster. - """ - - def start_link(opts), do: Horde.Registry.start_link(opts) - - def init(options) do - [members: get_members()] - |> Keyword.merge(options) - |> Horde.Registry.init() - end - - defp get_members do - [Node.self() | Node.list()] - |> Enum.map(fn node -> {__MODULE__, node} end) - end -end diff --git a/augie/lib/augie/distributed_supervisor.ex b/augie/lib/augie/distributed_supervisor.ex deleted file mode 100644 index 1e4b31e..0000000 --- a/augie/lib/augie/distributed_supervisor.ex +++ /dev/null @@ -1,26 +0,0 @@ -defmodule Augie.DistributedSupervisor do - use Horde.DynamicSupervisor - alias Horde.DynamicSupervisor - - @moduledoc """ - This is the supervisor which supervises processes which must be running - somewhere on the cluster. - """ - - defdelegate start_link(opts), to: DynamicSupervisor - defdelegate child_spec(opts), to: DynamicSupervisor - - def init(options) do - {:ok, Keyword.put(options, :members, get_members())} - end - - def start_child(spec) do - spec = Supervisor.child_spec(spec, []) - DynamicSupervisor.start_child(__MODULE__, spec) - end - - defp get_members do - [Node.self() | Node.list()] - |> Enum.map(fn node -> {__MODULE__, node} end) - end -end diff --git a/augie/lib/augie/node_monitor.ex b/augie/lib/augie/node_monitor.ex deleted file mode 100644 index 0c818fa..0000000 --- a/augie/lib/augie/node_monitor.ex +++ /dev/null @@ -1,39 +0,0 @@ -defmodule Augie.NodeMonitor do - use GenServer - alias Augie.{DistributedRegistry, DistributedSupervisor} - require Logger - - @moduledoc """ - This process monitors for changes to the number of nodes connected and updates - the Horde processes as required. - """ - - def start_link(opts), do: GenServer.start_link(__MODULE__, [opts]) - - def init(_) do - :net_kernel.monitor_nodes(true, node_type: :visible) - {:ok, nil} - end - - def handle_info({:nodeup, node, _node_type}, state) do - Logger.info("Node #{node} up. Adding to cluster.") - set_members(DistributedRegistry) - set_members(DistributedSupervisor) - {:noreply, state} - end - - def handle_info({:nodedown, node, _node_type}, state) do - Logger.info("Node #{node} down. Removing from cluster.") - set_members(DistributedRegistry) - set_members(DistributedSupervisor) - {:noreply, state} - end - - defp set_members(name) do - members = - [Node.self() | Node.list()] - |> Enum.map(fn node -> {name, node} end) - - :ok = Horde.Cluster.set_members(name, members) - end -end diff --git a/augie/lib/augie/syn_event_handler.ex b/augie/lib/augie/syn_event_handler.ex new file mode 100644 index 0000000..23a9c85 --- /dev/null +++ b/augie/lib/augie/syn_event_handler.ex @@ -0,0 +1,24 @@ +defmodule Augie.SynEventHandler do + @behaviour :syn_event_handler + require Logger + + @moduledoc """ + This is the behaviour module for `:syn_event_handler`. Mostly it's used to + allow GenServers to specify their own conflict resolution scheme, + """ + + # Assume that normal shutdowns are fine. + @impl true + def on_process_exit(_name, _pid, _meta, :normal), do: :ok + def on_process_exit(_name, _pid, _meta, :shutdown), do: :ok + + def on_process_exit(name, _pid, _meta, reason) do + Logger.debug("Unexpected shutdown of #{inspect(name)}, reason: #{inspect(reason)}") + :ok + end + + @impl true + def resolve_registry_conflict(_name, {pid1, meta1}, {pid2, meta2}) do + GenServer.call(pid1, {:resolve_conflict, {pid1, meta1}, {pid2, meta2}}) + end +end diff --git a/augie/lib/augie/update_lock_manager.ex b/augie/lib/augie/update_lock_manager.ex index 3243e8d..cddd35d 100644 --- a/augie/lib/augie/update_lock_manager.ex +++ b/augie/lib/augie/update_lock_manager.ex @@ -1,6 +1,5 @@ defmodule Augie.UpdateLockManager do use GenServer - alias Horde.Registry require Logger @moduledoc """ @@ -10,33 +9,27 @@ defmodule Augie.UpdateLockManager do from updates so that we can do blue:green deploys and ensure that there's always one machine able to orchestrate the robot and hold the robot state. - We do this by registering this process with the `Horde.Registry` on both - devices and send messages between them to discover whom was the most recently - started process. That process will hold the lock. + We do this by trying to start this process on both nodes, and then provide an + escape hatch for how to deal with the eventual name conflict. """ - defmodule State do - @moduledoc false - defstruct lock: nil, started_at: nil, timer: nil, live_nodes: %{} - end - - # The location of the file to be locked on disk. - @lockfile_location "/tmp/balena/updates.lock" - - # How frequently to message the other peer(s) and decide whom should be the lock holder. - @update_interval 10_000 - @doc false def start_link(args), do: GenServer.start_link(__MODULE__, [args]) @impl true def init(_args) do - if File.exists?(@lockfile_location), do: File.rmdir(@lockfile_location) + if File.exists?(lockfile_location()), do: File.rmdir(lockfile_location()) Process.flag(:trap_exit, true) - {:ok, timer} = :timer.send_interval(@update_interval, :tick) - now = DateTime.utc_now() - Registry.register(Augie.DistributedRegistry, {__MODULE__, node(), self()}, self()) - {:ok, %State{timer: timer, started_at: now}} + :net_kernel.monitor_nodes(true) + + now = + DateTime.utc_now() + |> DateTime.to_unix(:microsecond) + + :syn.join(__MODULE__, self(), now) + + {:ok, lock} = acquire_lock() + {:ok, %{lock: lock}} end # If we're terminating and we're holding the lock, then release it. @@ -44,65 +37,68 @@ defmodule Augie.UpdateLockManager do def terminate(_reason, %{lock: nil}), do: :ok def terminate(_reason, %{lock: lock}), do: release_lock(lock) - # Send our node name, pid, and started at time to all the registered processes - # in this process group. @impl true - def handle_info(:tick, %{started_at: started_at} = state) do - Augie.DistributedRegistry - |> Registry.select([{{{__MODULE__, :_, :_}, :"$1", :_}, [], [:"$1"]}]) - |> Enum.each(&send(&1, {:update, node(), self(), started_at})) + def handle_info({:nodeup, _node}, state), do: reacquire_if_required(state) + def handle_info({:nodedown, _node}, state), do: reacquire_if_required(state) - {:noreply, state} + defp reacquire_if_required(%{lock: nil} = state) do + :timer.sleep(250) + + {pid, _started_at} = + __MODULE__ + |> :syn.get_members(:with_meta) + |> Enum.max_by(&elem(&1, 1)) + + if pid == self() do + {:ok, lock} = acquire_lock() + {:noreply, %{state | lock: lock}} + else + {:noreply, state} + end end - def handle_info({:update, n, p, s}, %{live_nodes: live_nodes, lock: lock} = state) do - live_nodes = - live_nodes - |> Map.put(n, %{pid: p, started_at: s, last_seen: DateTime.utc_now()}) + defp reacquire_if_required(%{lock: lock} = state) do + :timer.sleep(250) - live_nodes = - live_nodes - |> Enum.reject(fn {_, %{last_seen: d}} -> - now = DateTime.utc_now() |> DateTime.to_unix(:millisecond) - last_seen_at = d |> DateTime.to_unix(:millisecond) - now - last_seen_at > @update_interval * 3 - end) - |> Enum.into(%{}) + {pid, _started_at} = + __MODULE__ + |> :syn.get_members(:with_meta) + |> Enum.max_by(&elem(&1, 1)) - state = %{state | live_nodes: live_nodes} - - newest_node = - live_nodes - |> Enum.max_by(fn {_, %{started_at: d}} -> DateTime.to_unix(d, :microsecond) end) - |> elem(0) - - cond do - lock && newest_node != node() -> - release_lock(lock) - {:noreply, %{state | lock: nil}} - - is_nil(lock) && newest_node == node() -> - {:ok, lock} = acquire_lock() - {:noreply, %{state | lock: lock}} - - true -> - {:noreply, state} + if pid == self() do + {:noreply, state} + else + :ok = release_lock(lock) + {:noreply, %{state | lock: nil}} end end defp acquire_lock do - Logger.info("Acquiring update lock.") + lockfile_location = lockfile_location() + Logger.info("[#{__MODULE__}] Acquiring update lock #{inspect(lockfile_location)}.") - dir = Path.dirname(@lockfile_location) + dir = Path.dirname(lockfile_location) if File.exists?(dir), do: File.rm_rf(dir) File.mkdir_p(dir) - File.open(@lockfile_location, [:exclusive]) + File.open(lockfile_location, [:exclusive]) end defp release_lock(lock) do - :ok = File.close(lock) - :ok = File.rm(@lockfile_location) - Logger.info("Releasing update lock.") + with lockfile_location <- lockfile_location(), + :ok <- File.close(lock), + :ok <- File.rm(lockfile_location) do + Logger.info("[#{__MODULE__}] Released update lock #{inspect(lockfile_location)}.") + :ok + end + end + + def lockfile_location do + :augie + |> Application.get_env(__MODULE__, []) + |> Keyword.get( + :lockfile_location, + "/tmp/#{Base.encode64(to_string(node()), padding: false)}/updates.lock" + ) end end diff --git a/augie/mix.exs b/augie/mix.exs index 01e2a3f..52f984f 100644 --- a/augie/mix.exs +++ b/augie/mix.exs @@ -37,7 +37,6 @@ defmodule Augie.MixProject do {:ex_erlstats, "~> 0.1"}, {:floki, ">= 0.0.0", only: :test}, {:gettext, "~> 0.11"}, - {:horde, "~> 0.7"}, {:jason, "~> 1.0"}, {:libcluster, "~> 3.1"}, {:mdns, "~> 1.0"}, @@ -46,7 +45,8 @@ defmodule Augie.MixProject do {:phoenix_live_view, "~> 0.4.0"}, {:phoenix_pubsub, "~> 1.1"}, {:phoenix, "~> 1.4.11"}, - {:plug_cowboy, "~> 2.0"} + {:plug_cowboy, "~> 2.0"}, + {:syn, "~> 2.0"} ] end end diff --git a/augie/mix.lock b/augie/mix.lock index 9959980..2721c1f 100644 --- a/augie/mix.lock +++ b/augie/mix.lock @@ -6,23 +6,18 @@ "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm"}, "credo": {:hex, :credo, "1.1.5", "caec7a3cadd2e58609d7ee25b3931b129e739e070539ad1a0cd7efeeb47014f4", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, "dbus": {:hex, :dbus, "0.7.0", "71b7660523be6e222a8a4f9ddcbf54ad95638479bd17654975bb751aadbe81de", [:make, :rebar3], [], "hexpm"}, - "delta_crdt": {:hex, :delta_crdt, "0.5.10", "e866f8d1b89bee497a98b9793e9ba0ea514112a1c41a0c30dcde3463d4984d14", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, "dns": {:hex, :dns, "2.1.2", "81c46d39f7934f0e73368355126e4266762cf227ba61d5889635d83b2d64a493", [:mix], [{:socket, "~> 0.3.13", [hex: :socket, repo: "hexpm", optional: false]}], "hexpm"}, "ex_erlstats": {:hex, :ex_erlstats, "0.1.6", "f06af6136e67c2ba65848d411f63de8d672f972907e2e357c14ab3d067a6269c", [:mix], [], "hexpm"}, "file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm"}, "floki": {:hex, :floki, "0.23.1", "e100306ce7d8841d70a559748e5091542e2cfc67ffb3ade92b89a8435034dab1", [:mix], [{:html_entities, "~> 0.5.0", [hex: :html_entities, repo: "hexpm", optional: false]}], "hexpm"}, - "gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"}, "gettext": {:hex, :gettext, "0.17.1", "8baab33482df4907b3eae22f719da492cee3981a26e649b9c2be1c0192616962", [:mix], [], "hexpm"}, "hackney": {:hex, :hackney, "1.15.2", "07e33c794f8f8964ee86cebec1a8ed88db5070e52e904b8f12209773c1036085", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.5", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, - "horde": {:hex, :horde, "0.7.1", "161e140e4e4fab5416b90a4e5b68fbe7fb78f62b265f87f01d661a58aa72be0c", [:mix], [{:delta_crdt, "~> 0.5.10", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.4.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm"}, "html_entities": {:hex, :html_entities, "0.5.0", "40f5c5b9cbe23073b48a4e69c67b6c11974f623a76165e2b92d098c0e88ccb1d", [:mix], [], "hexpm"}, "httpoison": {:hex, :httpoison, "1.6.2", "ace7c8d3a361cebccbed19c283c349b3d26991eff73a1eaaa8abae2e3c8089b6", [:mix], [{:hackney, "~> 1.15 and >= 1.15.2", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "libcluster": {:hex, :libcluster, "3.1.1", "cbab97b96141f47f2fe5563183c444bbce9282b3991ef054d69b8805546f0122", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, - "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"}, "mdns": {:hex, :mdns, "1.0.3", "f08414daf5636bf5cd364611e838818e9250c91a3282a817ad9174b03e757401", [:mix], [{:dns, "~> 2.0", [hex: :dns, repo: "hexpm", optional: false]}], "hexpm"}, - "merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"}, @@ -38,9 +33,7 @@ "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, "socket": {:hex, :socket, "0.3.13", "98a2ab20ce17f95fb512c5cadddba32b57273e0d2dba2d2e5f976c5969d0c632", [:mix], [], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm"}, - "swarm": {:hex, :swarm, "3.4.0", "64f8b30055d74640d2186c66354b33b999438692a91be275bb89cdc7e401f448", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm"}, - "swarm_dynamic_supervisor": {:hex, :swarm_dynamic_supervisor, "0.1.0", "76bd351b6fbc9f5775d358f54736f13336d8ad882784cf350a0ad7ec53b60898", [:mix], [{:swarm, ">= 3.0.0", [hex: :swarm, repo: "hexpm", optional: false]}], "hexpm"}, + "syn": {:hex, :syn, "2.0.3", "fe7eefd78ae2498fe017556f4a8385cba73f8f0ba51d1b65e3c5e6cbad5fc04b", [:rebar3], [], "hexpm"}, "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm"}, - "telemetry_poller": {:hex, :telemetry_poller, "0.4.1", "50d03d976a3b8ab4898d9e873852e688840df47685a13af90af40e1ba43a758b", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm"}, }