Switch from horde to syn. Masively refactor the Augie.Device metaprogramming.

This commit is contained in:
James Harton 2019-12-07 23:58:34 +13:00
parent 7986cbc166
commit 7b79922606
15 changed files with 306 additions and 429 deletions

View file

@ -25,6 +25,8 @@ config :logger, :console,
# Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason
config :syn, event_handler: Augie.SynEventHandler
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"

View file

@ -23,4 +23,6 @@ config :augie, Augie.Device.RaspberryPi,
vcgencmd_path: "/opt/vc/bin/vcgencmd",
meminfo_path: "/proc/meminfo"
config :augie, Augie.UpdateLockManager, lockfile_location: "/tmp/balena/updates.lock"
# config :logger, level: :debug

View file

@ -28,10 +28,7 @@ defmodule Augie.Application do
[
AugieWeb.Endpoint,
{DynamicSupervisor, name: Augie.LocalSupervisor, strategy: :one_for_one},
{Augie.DistributedRegistry, keys: :unique, name: Augie.DistributedRegistry},
{Augie.DistributedSupervisor,
strategy: :one_for_one, name: Augie.DistributedSupervisor},
Augie.NodeMonitor,
{DynamicSupervisor, name: Augie.DistributedSupervisor, strategy: :one_for_one},
Augie.UpdateLockManager,
Augie.DeviceStarter
]

View file

@ -13,7 +13,7 @@ defmodule Augie.DeviceStarter do
[
Device.ErlangMemory,
Device.ErlangSystem,
Device.HordeCluster,
Device.SynCluster,
Device.RaspberryPi
]
|> Enum.each(&Device.start_device(&1))

View file

@ -1,5 +1,7 @@
# credo:disable-for-this-file
defmodule Augie.Device do
alias Augie.Device
@moduledoc """
This module provides helpful infrastructure needed for all attached devices.
"""
@ -49,29 +51,30 @@ defmodule Augie.Device do
@optional_callbacks handle_metadata: 2, handle_metric: 2, device_id: 0, distribute: 0
defmacro __using__(_env) do
Module.register_attribute(__CALLER__.module, :metadata, accumulate: true)
Module.register_attribute(__CALLER__.module, :metrics, accumulate: true)
Module.register_attribute(__CALLER__.module, :metadata, accumulate: true)
quote do
alias Augie.Device
import Device, only: [metric: 1, metric: 2, metadata: 2, metadata: 1]
require Logger
@behaviour Device
@before_compile Device
import Augie.Device, only: [metric: 1, metric: 2, metadata: 2, metadata: 1]
@behaviour Augie.Device
@behaviour GenServer
@before_compile Augie.Device
@impl true
def device_id, do: __MODULE__
@impl true
def distribute, do: :cluster
@impl GenServer
def init(_args) do
device = init_device()
{:ok, %{device: device}}
@impl true
def init(opts) do
{:ok, opts}
end
@doc false
def child_spec(opts) do
%{
id: {Device, device_id()},
id: {Augie.Device, device_id()},
start: {__MODULE__, :start_link, [opts]},
shutdown: 10_000,
restart: :transient
@ -79,167 +82,193 @@ defmodule Augie.Device do
end
@doc false
def start_link(args), do: GenServer.start_link(__MODULE__, [args])
def start_link(args), do: GenServer.start_link(__MODULE__.Server, [args])
defoverridable(
device_id: 0,
distribute: 0,
init: 1,
child_spec: 1,
start_link: 1
child_spec: 1
)
end
end
defmacro __before_compile__(_env) do
# Define `handle_info/2` for those metrics which require polling.
__CALLER__.module
|> Module.get_attribute(:metrics, [])
|> Enum.filter(fn {_, opts} -> Keyword.has_key?(opts, :poll) end)
|> Enum.each(fn {name, opts} ->
unit = Keyword.get(opts, :unit)
target_module = __CALLER__.module
impl_module = Module.concat(target_module, "Server")
metrics = Module.get_attribute(target_module, :metrics, [])
metadata = Module.get_attribute(target_module, :metadata, [])
Module.eval_quoted(
__CALLER__.module,
quote do
@impl GenServer
def handle_info({:poll, unquote(name)}, state) do
case apply(__MODULE__, :handle_metric, [unquote(name), state]) do
{:ok, value, state} ->
value = if unquote(unit), do: {value, unquote(unit)}, else: value
Augie.Device.publish_metric(device_id(), unquote(name), value)
{:noreply, state}
Module.create(
impl_module,
quote do
use GenServer
@target unquote(target_module)
{:error, _reason, state} ->
{:noreply, state}
end
@impl GenServer
def init(opts) do
device_id = apply(@target, :device_id, [])
Augie.Device.register_device(device_id)
timers =
unquote(metrics)
|> Enum.filter(fn {_name, opts} -> Keyword.has_key?(opts, :poll) end)
|> Enum.map(fn {name, opts} -> {name, Keyword.get(opts, :poll)} end)
|> Enum.reduce(%{}, fn {name, interval}, timers ->
{:ok, timer} = :timer.send_interval(interval, {:poll, name})
Map.put(timers, name, timer)
end)
case apply(@target, :init, [opts]) do
{:ok, device_state} -> {:ok, %{timers: timers, device: device_state}}
{:ok, device_state, opt} -> {:ok, %{timers: timers, device: device_state}, opt}
:ignore -> :ignore
{:stop, reason} -> {:stop, reason}
end
end
unquote do
metrics
|> Enum.filter(fn {_, opts} -> Keyword.has_key?(opts, :poll) end)
|> Enum.map(fn {metric, opts} ->
unit = Keyword.get(opts, :unit)
quote do
@impl GenServer
def handle_info({:poll, unquote(metric)}, state) do
case apply(@target, :handle_metric, [unquote(metric), state.device]) do
{:ok, value, device_state} ->
value = if unquote(unit), do: {value, unquote(unit)}, else: value
Augie.Device.publish_metric(
apply(@target, :device_id, []),
unquote(metric),
value
)
{:noreply, %{state | device: device_state}}
{:error, _reason, device_state} ->
{:noreply, %{state | device: device_state}}
end
end
end
end)
end
def handle_info(message, state) do
case apply(@target, :handle_info, [message, state.device]) do
{:noreply, device_state} -> {:noreply, %{state | device: device_state}}
{:noreply, device_state, opt} -> {:noreply, %{state | device: device_state}, opt}
{:stop, reason, device_state} -> {:stop, reason, %{state | device: device_state}}
end
end
)
end)
# Define `handle_call/3` for `:get_metrics`.
Module.eval_quoted(
__CALLER__.module,
quote do
@impl GenServer
def handle_call(:get_metrics, _from, state) do
{metrics, state} =
@metrics
unquote(metrics)
|> Enum.reduce({%{}, state}, fn {name, opts}, {metrics, state} ->
case apply(__MODULE__, :handle_metric, [name, state]) do
{:ok, value, state} ->
case apply(@target, :handle_metric, [name, state.device]) do
{:ok, value, device_state} ->
unit = Keyword.get(opts, :unit)
value = if unit, do: {value, unit}, else: value
Augie.Device.publish_metric(device_id(), name, value)
{Map.put(metrics, name, value), state}
Augie.Device.publish_metric(apply(@target, :device_id, []), name, value)
{Map.put(metrics, name, value), %{state | device: device_state}}
{:error, _reason, state} ->
{metrics, state}
{:error, _reason, device_state} ->
{metrics, %{state | device: device_state}}
end
end)
{:reply, {:ok, metrics}, state}
end
end
)
# Define `handle_call/3` for `:get_metadata`.
Module.eval_quoted(
__CALLER__.module,
quote do
@impl GenServer
unquote do
Enum.map(metrics, fn {metric, opts} ->
unit = Keyword.get(opts, :unit)
quote do
def handle_call({:get_metric, unquote(metric)}, _from, state) do
case apply(@target, :handle_metric, [unquote(metric), state.device]) do
{:ok, value, device_state} ->
value = if unquote(unit), do: {value, unquote(unit)}, else: value
Augie.Device.publish_metric(
apply(@target, :device_id, []),
unquote(metric),
value
)
{:reply, value, %{state | device: device_state}}
{:error, reason, device_state} ->
{:reply, {:error, reason}, %{state | device: device_state}}
end
end
end
end)
end
def handle_call(:get_metadata, _from, state) do
{metadata, state} =
@metadata
unquote(metadata)
|> Enum.reduce({%{}, state}, fn {name, opts}, {md, state} ->
if Keyword.has_key?(opts, :value) do
value = Keyword.get(opts, :value)
{Map.put(md, name, value), state}
else
case apply(__MODULE__, :handle_metadata, [name, state]) do
{:ok, value, state} ->
case apply(@target, :handle_metadata, [name, state.device]) do
{:ok, value, device_state} ->
unit = Keyword.get(opts, :unit)
value = if unit, do: {value, unit}, else: value
{Map.put(md, name, value), state}
{Map.put(md, name, value), %{state | device: device_state}}
{:error, _reason, state} ->
{md, state}
{:error, _reason, device_state} ->
{md, %{state | device: device_state}}
end
end
end)
{:reply, {:ok, metadata}, state}
end
end
)
# Define `handle_call/3` for `{:get_metric, name}`
__CALLER__.module
|> Module.get_attribute(:metrics, [])
|> Enum.each(fn {name, opts} ->
unit = Keyword.get(opts, :unit)
def handle_call(message, from, state) do
case apply(@target, :handle_call, [message, from, state.device]) do
{:reply, reply, device_state} ->
{:reply, reply, %{state | device: device_state}}
Module.eval_quoted(
__CALLER__.module,
quote do
@impl GenServer
def handle_call({:get_metric, unquote(name)}, _from, state) do
case apply(__MODULE__, :handle_metric, [unquote(name), state]) do
{:ok, value, state} ->
value = if unquote(unit), do: {value, unquote(unit)}, else: value
Augie.Device.publish_metric(device_id(), unquote(name), value)
{:reply, value, state}
{:reply, reply, device_state, opt} ->
{:reply, reply, %{state | device: device_state}, opt}
{:error, reason, state} ->
{:reply, {:error, reason}, state}
end
{:noreply, device_state} ->
{:noreply, %{state | device: device_state}}
{:noreply, device_state, opt} ->
{:noreply, %{state | device: device_state}, opt}
{:stop, reason, reply, device_state} ->
{:stop, reason, reply, %{state | device: device_state}}
{:stop, reason, device_state} ->
{:stop, reason, %{state | device: device_state}}
end
end
)
end)
# Define `subscribe/1` and `unsubscribe/1` for all metrics.
__CALLER__.module
|> Module.get_attribute(:metrics, [])
|> Enum.each(fn {name, _opts} ->
Module.eval_quoted(
__CALLER__.module,
quote do
def subscribe(unquote(name)),
do: Augie.Device.add_metric_subscription(device_id(), unquote(name))
def unsubscribe(unquote(name)),
do: Augie.Device.remove_metric_subscription(device_id(), unquote(name))
@impl GenServer
def handle_cast(message, state) do
case apply(@target, :handle_cast, [message, state.device]) do
{:noreply, device_state} -> {:noreply, %{state | device: device_state}}
{:noreply, device_state, opt} -> {:noreply, %{state | device: device_state}, opt}
{:stop, reason, device_state} -> {:stop, reason, %{state | device: device_state}}
end
end
)
end)
end,
Macro.Env.location(__ENV__)
)
quote do
use GenServer
@doc """
Initialises the timers for the device if any metrics require polling, and
registers the device with the Collector.
"""
@spec init_device() :: map
def init_device do
device_id = device_id()
Augie.Device.register_device(device_id)
timers =
@metrics
|> Enum.filter(fn {_name, opts} -> Keyword.has_key?(opts, :poll) end)
|> Enum.map(fn {name, opts} -> {name, Keyword.get(opts, :poll)} end)
|> Enum.reduce(%{}, fn {name, interval}, timers ->
{:ok, timer} = :timer.send_interval(interval, {:poll, name})
Map.put(timers, name, timer)
end)
%{timers: timers}
end
end
:ok
end
@doc """
@ -294,11 +323,7 @@ defmodule Augie.Device do
"""
@spec add_device_subscription(device_id :: any) :: {:ok, pid} | {:error, reason :: any}
def add_device_subscription(device_id) do
Horde.Registry.register(
Augie.DistributedRegistry,
{Device, :device_subscription, device_id, self()},
self()
)
:syn.join({Device, :device_subscription, device_id}, self())
end
@doc """
@ -307,10 +332,7 @@ defmodule Augie.Device do
"""
@spec remove_device_subscription(device_id :: any) :: :ok
def remove_device_subscription(device_id) do
Horde.Registry.unregister(
Augie.DistributedRegistry,
{Device, :device_subscription, device_id, self()}
)
:syn.leave({Device, :device_subscription, device_id}, self())
end
@doc """
@ -320,11 +342,7 @@ defmodule Augie.Device do
@spec add_metric_subscription(device_id :: any, metric_name :: atom) ::
{:ok, pid} | {:error, reason :: any}
def add_metric_subscription(device_id, metric_name) do
Horde.Registry.register(
Augie.DistributedRegistry,
{Device, :metric_subscription, device_id, metric_name, self()},
self()
)
:syn.join({Device, :metric_subscription, device_id, metric_name}, self())
end
@doc """
@ -333,10 +351,7 @@ defmodule Augie.Device do
"""
@spec remove_metric_subscription(device_id :: any, metric_name :: atom) :: :ok
def remove_metric_subscription(device_id, metric_name) do
Horde.Registry.unregister(
Augie.DistributedRegistry,
{Device, :metric_subscription, device_id, metric_name, self()}
)
:syn.leave({Device, :metric_subscription, device_id, metric_name}, self())
end
@doc """
@ -345,11 +360,7 @@ defmodule Augie.Device do
"""
@spec add_device_registration_subscription() :: :ok
def add_device_registration_subscription do
Horde.Registry.register(
Augie.DistributedRegistry,
{Device, :registration_subscription, self()},
self()
)
:syn.join({Device, :registration_subscription}, self())
end
@doc """
@ -357,10 +368,7 @@ defmodule Augie.Device do
"""
@spec remove_device_registration_subscription() :: :ok
def remove_device_registration_subscription do
Horde.Registry.unregister(
Augie.DistributedRegistry,
{Device, :registration_subscription, self()}
)
:syn.leave({Device, :registration_subscription}, self())
end
@doc """
@ -372,12 +380,9 @@ defmodule Augie.Device do
"""
@spec publish_metric(device_id :: any, metric_name :: atom, sample :: any) :: :ok
def publish_metric(device_id, metric_name, sample) do
Augie.DistributedRegistry
|> Horde.Registry.select([
{{{Device, :device_subscription, device_id, :"$1"}, :_, :_}, [], [:"$1"]},
{{{Device, :metric_subscription, device_id, metric_name, :"$1"}, :_, :_}, [], [:"$1"]}
])
|> Enum.uniq()
:syn.get_members({Device, :device_subscription, device_id})
|> Stream.concat(:syn.get_members({Device, :metric_subscription, device_id, metric_name}))
|> Stream.uniq()
|> Enum.each(fn pid ->
send(pid, {:metric, device_id, metric_name, sample})
end)
@ -388,16 +393,12 @@ defmodule Augie.Device do
@doc """
Register a device as being present on the system.
"""
@spec register_device(device_id :: any) :: {:ok, pid} | {:error, reason :: any}
@spec register_device(device_id :: any) :: :ok | {:error, reason :: any}
def register_device(device_id) do
with {:ok, pid} <-
Horde.Registry.register(
Augie.DistributedRegistry,
{Device, :device, device_id},
self()
),
with :ok <- :syn.register({Device, :device, device_id}, self()),
:ok <- :syn.join({Device, :device}, self(), device_id),
:ok <- publish_device_registation(device_id),
do: {:ok, pid}
do: :ok
end
@doc """
@ -408,22 +409,12 @@ defmodule Augie.Device do
register the device again on the new node and the old registration will be
overwritten.
"""
@spec unregister_device(device_id :: any) :: :ok
@spec unregister_device(device_id :: any) :: :ok | {:error, reason :: any}
def unregister_device(device_id) do
Augie.DistributedRegistry
|> Horde.Registry.select([
{
{:"$1", :_, :_},
[
{:==, {:element, 1, :"$1"}, Device},
{:==, {:element, 3, :"$1"}, device_id}
],
[:"$1"]
}
])
|> Enum.each(&Horde.Registry.unregister(Augie.DistributedRegistry, &1))
publish_device_deregistration(device_id)
with :ok <- :syn.leave({Device, :device}, self()),
:ok <- :syn.unregister({Device, :device, device_id}),
:ok <- publish_device_deregistration(device_id),
do: :ok
end
@doc """
@ -431,12 +422,9 @@ defmodule Augie.Device do
"""
@spec registered_devices() :: [device_id :: any]
def registered_devices do
Augie.DistributedRegistry
|> Horde.Registry.select([
{{{Device, :device, :"$1"}, :"$2", :_}, [], [[:"$1", :"$2"]]}
])
|> Enum.filter(fn [_, pid] -> process_alive?(pid) end)
|> Enum.map(fn [device_id, _] -> device_id end)
{Device, :device}
|> :syn.get_members(:with_meta)
|> Enum.map(&elem(&1, 1))
end
@doc """
@ -454,7 +442,7 @@ defmodule Augie.Device do
This will take into account the module's preferred distribution stragety and
start it on the appropriate supervisor.
*Note:* these options will be passed to `child_spec/1` and `device_key/1`.
*Note:* these options will be passed to `child_spec/1`.
"""
@spec start_device({module :: atom, opts :: [Keyword.t()]}) ::
{:ok, pid} | {:error, reason :: any}
@ -463,7 +451,7 @@ defmodule Augie.Device do
case apply(module, :distribute, []) do
:node -> DynamicSupervisor.start_child(Augie.LocalSupervisor, spec)
:cluster -> Horde.DynamicSupervisor.start_child(Augie.DistributedSupervisor, spec)
:cluster -> DynamicSupervisor.start_child(Augie.DistributedSupervisor, spec)
end
end
@ -472,10 +460,7 @@ defmodule Augie.Device do
"""
@spec get_metadata(device_id :: any) :: {:ok, Map.t()} | {:error, reason :: any}
def get_metadata(device_id) do
GenServer.call(
{:via, Horde.Registry, {Augie.DistributedRegistry, {Device, :device, device_id}}},
:get_metadata
)
GenServer.call({:via, :syn, {Device, :device, device_id}}, :get_metadata)
end
@doc """
@ -483,10 +468,7 @@ defmodule Augie.Device do
"""
@spec get_metrics(device_id :: any) :: {:ok, Map.t()} | {:error, reason :: any}
def get_metrics(device_id) do
GenServer.call(
{:via, Horde.Registry, {Augie.DistributedRegistry, {Device, :device, device_id}}},
:get_metrics
)
GenServer.call({:via, :syn, {Device, :device, device_id}}, :get_metrics)
end
@doc """
@ -495,46 +477,14 @@ defmodule Augie.Device do
@spec get_metric(device_id :: any, metric_name :: atom) ::
{:ok, value :: any} | {:error, reason :: any}
def get_metric(device_id, metric_name) do
GenServer.call(
{:via, Horde.Registry, {Augie.DistributedRegistry, {Device, :device, device_id}}},
{:get_metric, metric_name}
)
GenServer.call({:via, :syn, {Device, :device, device_id}}, {:get_metric, metric_name})
end
defp publish_device_registation(device_id) do
Task.start(fn ->
:timer.sleep(250)
Augie.DistributedRegistry
|> Horde.Registry.select([
{{{Device, :registration_subscription, :"$1"}, :_, :_}, [], [:"$1"]}
])
|> Enum.uniq()
|> Enum.each(&send(&1, {:device_registered, device_id}))
end)
:ok
:syn.publish({Device, :registration_subscription}, {:device_registered, device_id})
end
defp publish_device_deregistration(device_id) do
Task.start(fn ->
:timer.sleep(250)
Augie.DistributedRegistry
|> Horde.Registry.select([
{{{Device, :registration_subscription, :"$1"}, :_, :_}, [], [:"$1"]}
])
|> Enum.uniq()
|> Enum.each(&send(&1, {:device_unregistered, device_id}))
end)
:ok
end
defp process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid)
defp process_alive?(pid) do
n = node(pid)
Node.list() |> Enum.member?(n) && :rpc.call(n, Process, :alive?, [pid])
:syn.publish({Device, :registration_subscription}, {:device_unregistered, device_id})
end
end

View file

@ -11,6 +11,7 @@ defmodule Augie.Device.ErlangSystem do
metadata(:process_limit)
metadata(:schedulers)
metadata(:version)
metadata(:node)
metric(:port_count, poll: 5_000)
metric(:process_count, poll: 5_000)
@ -26,6 +27,10 @@ defmodule Augie.Device.ErlangSystem do
def device_id, do: {__MODULE__, node()}
@impl true
def handle_metadata(:node, state) do
{:ok, node(), state}
end
def handle_metadata(name, state) do
{:ok, ExErlstats.system_info(name), state}
end

View file

@ -1,39 +0,0 @@
defmodule Augie.Device.HordeCluster do
use Augie.Device
@moduledoc """
Shows some simple information about the Horde cluster.
"""
metadata(:name, value: "Horde Cluster")
metric(:node_count, poll: 5_000)
metric(:registered_names, poll: 5_000)
metric(:distributed_processes, poll: 5_000)
@impl true
def distribute, do: :cluster
@impl true
def handle_metric(:node_count, state) do
count =
Augie.DistributedRegistry
|> Horde.Cluster.members()
|> length()
{:ok, count, state}
end
def handle_metric(:registered_names, state) do
{:ok, Horde.Registry.count(Augie.DistributedRegistry), state}
end
def handle_metric(:distributed_processes, state) do
count =
Augie.DistributedSupervisor
|> Horde.DynamicSupervisor.count_children()
|> Map.get(:active, 0)
{:ok, count, state}
end
end

View file

@ -0,0 +1,32 @@
defmodule Augie.Device.SynCluster do
use Augie.Device
@moduledoc """
Shows some simple information about the Horde cluster.
"""
metadata(:name, value: "Syn Cluster")
metric(:registered_names, poll: 5_000)
metric(:distributed_processes, poll: 5_000)
@impl true
def distribute, do: :cluster
@impl true
def handle_metric(:registered_names, state) do
{:ok, :syn.registry_count(), state}
end
def handle_metric(:distributed_processes, state) do
count = 1985
{:ok, count, state}
end
# It doesn't matter which node runs this process, as long as it's running
# somewhere.
def handle_call({:resolve_conflict, {_pid1, _meta1}, {pid2, _meta2}}, _from, state) do
{:stop, :normal, pid2, state}
end
end

View file

@ -1,20 +0,0 @@
defmodule Augie.DistributedRegistry do
use Horde.Registry
@moduledoc """
This is the Horde name registry which we share on the cluster.
"""
def start_link(opts), do: Horde.Registry.start_link(opts)
def init(options) do
[members: get_members()]
|> Keyword.merge(options)
|> Horde.Registry.init()
end
defp get_members do
[Node.self() | Node.list()]
|> Enum.map(fn node -> {__MODULE__, node} end)
end
end

View file

@ -1,26 +0,0 @@
defmodule Augie.DistributedSupervisor do
use Horde.DynamicSupervisor
alias Horde.DynamicSupervisor
@moduledoc """
This is the supervisor which supervises processes which must be running
somewhere on the cluster.
"""
defdelegate start_link(opts), to: DynamicSupervisor
defdelegate child_spec(opts), to: DynamicSupervisor
def init(options) do
{:ok, Keyword.put(options, :members, get_members())}
end
def start_child(spec) do
spec = Supervisor.child_spec(spec, [])
DynamicSupervisor.start_child(__MODULE__, spec)
end
defp get_members do
[Node.self() | Node.list()]
|> Enum.map(fn node -> {__MODULE__, node} end)
end
end

View file

@ -1,39 +0,0 @@
defmodule Augie.NodeMonitor do
use GenServer
alias Augie.{DistributedRegistry, DistributedSupervisor}
require Logger
@moduledoc """
This process monitors for changes to the number of nodes connected and updates
the Horde processes as required.
"""
def start_link(opts), do: GenServer.start_link(__MODULE__, [opts])
def init(_) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, nil}
end
def handle_info({:nodeup, node, _node_type}, state) do
Logger.info("Node #{node} up. Adding to cluster.")
set_members(DistributedRegistry)
set_members(DistributedSupervisor)
{:noreply, state}
end
def handle_info({:nodedown, node, _node_type}, state) do
Logger.info("Node #{node} down. Removing from cluster.")
set_members(DistributedRegistry)
set_members(DistributedSupervisor)
{:noreply, state}
end
defp set_members(name) do
members =
[Node.self() | Node.list()]
|> Enum.map(fn node -> {name, node} end)
:ok = Horde.Cluster.set_members(name, members)
end
end

View file

@ -0,0 +1,24 @@
defmodule Augie.SynEventHandler do
@behaviour :syn_event_handler
require Logger
@moduledoc """
This is the behaviour module for `:syn_event_handler`. Mostly it's used to
allow GenServers to specify their own conflict resolution scheme,
"""
# Assume that normal shutdowns are fine.
@impl true
def on_process_exit(_name, _pid, _meta, :normal), do: :ok
def on_process_exit(_name, _pid, _meta, :shutdown), do: :ok
def on_process_exit(name, _pid, _meta, reason) do
Logger.debug("Unexpected shutdown of #{inspect(name)}, reason: #{inspect(reason)}")
:ok
end
@impl true
def resolve_registry_conflict(_name, {pid1, meta1}, {pid2, meta2}) do
GenServer.call(pid1, {:resolve_conflict, {pid1, meta1}, {pid2, meta2}})
end
end

View file

@ -1,6 +1,5 @@
defmodule Augie.UpdateLockManager do
use GenServer
alias Horde.Registry
require Logger
@moduledoc """
@ -10,33 +9,27 @@ defmodule Augie.UpdateLockManager do
from updates so that we can do blue:green deploys and ensure that there's
always one machine able to orchestrate the robot and hold the robot state.
We do this by registering this process with the `Horde.Registry` on both
devices and send messages between them to discover whom was the most recently
started process. That process will hold the lock.
We do this by trying to start this process on both nodes, and then provide an
escape hatch for how to deal with the eventual name conflict.
"""
defmodule State do
@moduledoc false
defstruct lock: nil, started_at: nil, timer: nil, live_nodes: %{}
end
# The location of the file to be locked on disk.
@lockfile_location "/tmp/balena/updates.lock"
# How frequently to message the other peer(s) and decide whom should be the lock holder.
@update_interval 10_000
@doc false
def start_link(args), do: GenServer.start_link(__MODULE__, [args])
@impl true
def init(_args) do
if File.exists?(@lockfile_location), do: File.rmdir(@lockfile_location)
if File.exists?(lockfile_location()), do: File.rmdir(lockfile_location())
Process.flag(:trap_exit, true)
{:ok, timer} = :timer.send_interval(@update_interval, :tick)
now = DateTime.utc_now()
Registry.register(Augie.DistributedRegistry, {__MODULE__, node(), self()}, self())
{:ok, %State{timer: timer, started_at: now}}
:net_kernel.monitor_nodes(true)
now =
DateTime.utc_now()
|> DateTime.to_unix(:microsecond)
:syn.join(__MODULE__, self(), now)
{:ok, lock} = acquire_lock()
{:ok, %{lock: lock}}
end
# If we're terminating and we're holding the lock, then release it.
@ -44,65 +37,68 @@ defmodule Augie.UpdateLockManager do
def terminate(_reason, %{lock: nil}), do: :ok
def terminate(_reason, %{lock: lock}), do: release_lock(lock)
# Send our node name, pid, and started at time to all the registered processes
# in this process group.
@impl true
def handle_info(:tick, %{started_at: started_at} = state) do
Augie.DistributedRegistry
|> Registry.select([{{{__MODULE__, :_, :_}, :"$1", :_}, [], [:"$1"]}])
|> Enum.each(&send(&1, {:update, node(), self(), started_at}))
def handle_info({:nodeup, _node}, state), do: reacquire_if_required(state)
def handle_info({:nodedown, _node}, state), do: reacquire_if_required(state)
{:noreply, state}
defp reacquire_if_required(%{lock: nil} = state) do
:timer.sleep(250)
{pid, _started_at} =
__MODULE__
|> :syn.get_members(:with_meta)
|> Enum.max_by(&elem(&1, 1))
if pid == self() do
{:ok, lock} = acquire_lock()
{:noreply, %{state | lock: lock}}
else
{:noreply, state}
end
end
def handle_info({:update, n, p, s}, %{live_nodes: live_nodes, lock: lock} = state) do
live_nodes =
live_nodes
|> Map.put(n, %{pid: p, started_at: s, last_seen: DateTime.utc_now()})
defp reacquire_if_required(%{lock: lock} = state) do
:timer.sleep(250)
live_nodes =
live_nodes
|> Enum.reject(fn {_, %{last_seen: d}} ->
now = DateTime.utc_now() |> DateTime.to_unix(:millisecond)
last_seen_at = d |> DateTime.to_unix(:millisecond)
now - last_seen_at > @update_interval * 3
end)
|> Enum.into(%{})
{pid, _started_at} =
__MODULE__
|> :syn.get_members(:with_meta)
|> Enum.max_by(&elem(&1, 1))
state = %{state | live_nodes: live_nodes}
newest_node =
live_nodes
|> Enum.max_by(fn {_, %{started_at: d}} -> DateTime.to_unix(d, :microsecond) end)
|> elem(0)
cond do
lock && newest_node != node() ->
release_lock(lock)
{:noreply, %{state | lock: nil}}
is_nil(lock) && newest_node == node() ->
{:ok, lock} = acquire_lock()
{:noreply, %{state | lock: lock}}
true ->
{:noreply, state}
if pid == self() do
{:noreply, state}
else
:ok = release_lock(lock)
{:noreply, %{state | lock: nil}}
end
end
defp acquire_lock do
Logger.info("Acquiring update lock.")
lockfile_location = lockfile_location()
Logger.info("[#{__MODULE__}] Acquiring update lock #{inspect(lockfile_location)}.")
dir = Path.dirname(@lockfile_location)
dir = Path.dirname(lockfile_location)
if File.exists?(dir), do: File.rm_rf(dir)
File.mkdir_p(dir)
File.open(@lockfile_location, [:exclusive])
File.open(lockfile_location, [:exclusive])
end
defp release_lock(lock) do
:ok = File.close(lock)
:ok = File.rm(@lockfile_location)
Logger.info("Releasing update lock.")
with lockfile_location <- lockfile_location(),
:ok <- File.close(lock),
:ok <- File.rm(lockfile_location) do
Logger.info("[#{__MODULE__}] Released update lock #{inspect(lockfile_location)}.")
:ok
end
end
def lockfile_location do
:augie
|> Application.get_env(__MODULE__, [])
|> Keyword.get(
:lockfile_location,
"/tmp/#{Base.encode64(to_string(node()), padding: false)}/updates.lock"
)
end
end

View file

@ -37,7 +37,6 @@ defmodule Augie.MixProject do
{:ex_erlstats, "~> 0.1"},
{:floki, ">= 0.0.0", only: :test},
{:gettext, "~> 0.11"},
{:horde, "~> 0.7"},
{:jason, "~> 1.0"},
{:libcluster, "~> 3.1"},
{:mdns, "~> 1.0"},
@ -46,7 +45,8 @@ defmodule Augie.MixProject do
{:phoenix_live_view, "~> 0.4.0"},
{:phoenix_pubsub, "~> 1.1"},
{:phoenix, "~> 1.4.11"},
{:plug_cowboy, "~> 2.0"}
{:plug_cowboy, "~> 2.0"},
{:syn, "~> 2.0"}
]
end
end

View file

@ -6,23 +6,18 @@
"cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm"},
"credo": {:hex, :credo, "1.1.5", "caec7a3cadd2e58609d7ee25b3931b129e739e070539ad1a0cd7efeeb47014f4", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"dbus": {:hex, :dbus, "0.7.0", "71b7660523be6e222a8a4f9ddcbf54ad95638479bd17654975bb751aadbe81de", [:make, :rebar3], [], "hexpm"},
"delta_crdt": {:hex, :delta_crdt, "0.5.10", "e866f8d1b89bee497a98b9793e9ba0ea514112a1c41a0c30dcde3463d4984d14", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"dns": {:hex, :dns, "2.1.2", "81c46d39f7934f0e73368355126e4266762cf227ba61d5889635d83b2d64a493", [:mix], [{:socket, "~> 0.3.13", [hex: :socket, repo: "hexpm", optional: false]}], "hexpm"},
"ex_erlstats": {:hex, :ex_erlstats, "0.1.6", "f06af6136e67c2ba65848d411f63de8d672f972907e2e357c14ab3d067a6269c", [:mix], [], "hexpm"},
"file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm"},
"floki": {:hex, :floki, "0.23.1", "e100306ce7d8841d70a559748e5091542e2cfc67ffb3ade92b89a8435034dab1", [:mix], [{:html_entities, "~> 0.5.0", [hex: :html_entities, repo: "hexpm", optional: false]}], "hexpm"},
"gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"},
"gettext": {:hex, :gettext, "0.17.1", "8baab33482df4907b3eae22f719da492cee3981a26e649b9c2be1c0192616962", [:mix], [], "hexpm"},
"hackney": {:hex, :hackney, "1.15.2", "07e33c794f8f8964ee86cebec1a8ed88db5070e52e904b8f12209773c1036085", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.5", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"horde": {:hex, :horde, "0.7.1", "161e140e4e4fab5416b90a4e5b68fbe7fb78f62b265f87f01d661a58aa72be0c", [:mix], [{:delta_crdt, "~> 0.5.10", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.4.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm"},
"html_entities": {:hex, :html_entities, "0.5.0", "40f5c5b9cbe23073b48a4e69c67b6c11974f623a76165e2b92d098c0e88ccb1d", [:mix], [], "hexpm"},
"httpoison": {:hex, :httpoison, "1.6.2", "ace7c8d3a361cebccbed19c283c349b3d26991eff73a1eaaa8abae2e3c8089b6", [:mix], [{:hackney, "~> 1.15 and >= 1.15.2", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"libcluster": {:hex, :libcluster, "3.1.1", "cbab97b96141f47f2fe5563183c444bbce9282b3991ef054d69b8805546f0122", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"},
"mdns": {:hex, :mdns, "1.0.3", "f08414daf5636bf5cd364611e838818e9250c91a3282a817ad9174b03e757401", [:mix], [{:dns, "~> 2.0", [hex: :dns, repo: "hexpm", optional: false]}], "hexpm"},
"merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
"mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"},
@ -38,9 +33,7 @@
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"},
"socket": {:hex, :socket, "0.3.13", "98a2ab20ce17f95fb512c5cadddba32b57273e0d2dba2d2e5f976c5969d0c632", [:mix], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm"},
"swarm": {:hex, :swarm, "3.4.0", "64f8b30055d74640d2186c66354b33b999438692a91be275bb89cdc7e401f448", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm"},
"swarm_dynamic_supervisor": {:hex, :swarm_dynamic_supervisor, "0.1.0", "76bd351b6fbc9f5775d358f54736f13336d8ad882784cf350a0ad7ec53b60898", [:mix], [{:swarm, ">= 3.0.0", [hex: :swarm, repo: "hexpm", optional: false]}], "hexpm"},
"syn": {:hex, :syn, "2.0.3", "fe7eefd78ae2498fe017556f4a8385cba73f8f0ba51d1b65e3c5e6cbad5fc04b", [:rebar3], [], "hexpm"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm"},
"telemetry_poller": {:hex, :telemetry_poller, "0.4.1", "50d03d976a3b8ab4898d9e873852e688840df47685a13af90af40e1ba43a758b", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm"},
}