Add a cluster data collector and make metadata publishable too.

This commit is contained in:
James Harton 2019-12-04 16:22:52 +13:00
parent 69c8121a89
commit 7c9cc581d7
4 changed files with 85 additions and 3 deletions

View file

@ -196,7 +196,10 @@ defmodule Augie.Collector do
def handle_cast({:metadata, device_id, metadata}, %{table: table} = state) do
rows =
metadata
|> Enum.map(fn {name, value} -> {device_id, :metadata, name, value} end)
|> Enum.map(fn {name, value} ->
Swarm.publish({__MODULE__, :subscribers, device_id}, {:metadata, device_id, name, value})
{device_id, :metadata, name, value}
end)
:ets.insert(table, rows)

View file

@ -0,0 +1,77 @@
defmodule Augie.Collector.Cluster do
alias Augie.Collector
use GenServer
@moduledoc """
This module retrieves information about the connected nodes in the cluster and
sends them to the collector.
"""
@update_interval 5_000
@doc false
def child_spec(_) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [[]]},
restart: :permanent,
type: :worker
}
end
@doc false
def start_link(args), do: GenServer.start_link(__MODULE__, [args])
@impl true
def init(_args) do
{:ok, timer} = :timer.send_interval(@update_interval, :update)
:net_kernel.monitor_nodes(true)
nodes = [Node.self() | Node.list()]
update_metadata(nodes)
{:ok, %{timer: timer, nodes: nodes}}
end
@impl true
def handle_info(:update, %{nodes: nodes} = state) do
nodes
|> Enum.each(fn node ->
before_time = DateTime.utc_now() |> DateTime.to_unix(:microsecond)
if Node.ping(node) == :pong do
after_time = DateTime.utc_now() |> DateTime.to_unix(:microsecond)
Collector.record(__MODULE__, [{:"#{node}_ping_ms", (after_time - before_time) / 1_000}])
end
end)
{:noreply, state}
end
def handle_info({:nodeup, node}, %{nodes: nodes} = state) do
if Enum.member?(nodes, node) do
{:noreply, state}
else
nodes = [node | nodes]
update_metadata(nodes)
{:noreply, %{state | nodes: nodes}}
end
end
def handle_info({:nodedown, node}, %{nodes: nodes} = state) do
if Enum.member?(nodes, node) do
nodes = nodes |> List.delete(node)
update_metadata(nodes)
{:noreply, %{state | nodes: nodes}}
else
{:noreply, state}
end
end
def handle_info({:swarm, :die}, state) do
{:stop, :shutdown, state}
end
defp update_metadata(nodes) do
Collector.metadata(__MODULE__, nodes: nodes, collector: __MODULE__)
end
end

View file

@ -91,7 +91,8 @@ defmodule Augie.Collector.RaspberryPi do
hostname: hostname(),
model: read_file("/sys/firmware/devicetree/base/model", "Unknown model"),
serial_number: read_file("/sys/firmware/devicetree/base/serial-number"),
memory: total_memory()
memory: total_memory(),
collector: __MODULE__
]
end

View file

@ -14,7 +14,8 @@ defmodule Augie.DistributedSupervisor do
Task.async(fn ->
[
Augie.Collector
Augie.Collector,
Augie.Collector.Cluster
]
|> Enum.each(&start_child(&1))
end)