diff --git a/augie/lib/augie/collector.ex b/augie/lib/augie/collector.ex index daae535..567734f 100644 --- a/augie/lib/augie/collector.ex +++ b/augie/lib/augie/collector.ex @@ -196,7 +196,10 @@ defmodule Augie.Collector do def handle_cast({:metadata, device_id, metadata}, %{table: table} = state) do rows = metadata - |> Enum.map(fn {name, value} -> {device_id, :metadata, name, value} end) + |> Enum.map(fn {name, value} -> + Swarm.publish({__MODULE__, :subscribers, device_id}, {:metadata, device_id, name, value}) + {device_id, :metadata, name, value} + end) :ets.insert(table, rows) diff --git a/augie/lib/augie/collector/cluster.ex b/augie/lib/augie/collector/cluster.ex new file mode 100644 index 0000000..22e7272 --- /dev/null +++ b/augie/lib/augie/collector/cluster.ex @@ -0,0 +1,77 @@ +defmodule Augie.Collector.Cluster do + alias Augie.Collector + use GenServer + + @moduledoc """ + This module retrieves information about the connected nodes in the cluster and + sends them to the collector. + """ + + @update_interval 5_000 + + @doc false + def child_spec(_) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [[]]}, + restart: :permanent, + type: :worker + } + end + + @doc false + def start_link(args), do: GenServer.start_link(__MODULE__, [args]) + + @impl true + def init(_args) do + {:ok, timer} = :timer.send_interval(@update_interval, :update) + :net_kernel.monitor_nodes(true) + nodes = [Node.self() | Node.list()] + update_metadata(nodes) + + {:ok, %{timer: timer, nodes: nodes}} + end + + @impl true + def handle_info(:update, %{nodes: nodes} = state) do + nodes + |> Enum.each(fn node -> + before_time = DateTime.utc_now() |> DateTime.to_unix(:microsecond) + + if Node.ping(node) == :pong do + after_time = DateTime.utc_now() |> DateTime.to_unix(:microsecond) + Collector.record(__MODULE__, [{:"#{node}_ping_ms", (after_time - before_time) / 1_000}]) + end + end) + + {:noreply, state} + end + + def handle_info({:nodeup, node}, %{nodes: nodes} = state) do + if Enum.member?(nodes, node) do + {:noreply, state} + else + nodes = [node | nodes] + update_metadata(nodes) + {:noreply, %{state | nodes: nodes}} + end + end + + def handle_info({:nodedown, node}, %{nodes: nodes} = state) do + if Enum.member?(nodes, node) do + nodes = nodes |> List.delete(node) + update_metadata(nodes) + {:noreply, %{state | nodes: nodes}} + else + {:noreply, state} + end + end + + def handle_info({:swarm, :die}, state) do + {:stop, :shutdown, state} + end + + defp update_metadata(nodes) do + Collector.metadata(__MODULE__, nodes: nodes, collector: __MODULE__) + end +end diff --git a/augie/lib/augie/collector/raspberry_pi.ex b/augie/lib/augie/collector/raspberry_pi.ex index d00f747..6bf5253 100644 --- a/augie/lib/augie/collector/raspberry_pi.ex +++ b/augie/lib/augie/collector/raspberry_pi.ex @@ -91,7 +91,8 @@ defmodule Augie.Collector.RaspberryPi do hostname: hostname(), model: read_file("/sys/firmware/devicetree/base/model", "Unknown model"), serial_number: read_file("/sys/firmware/devicetree/base/serial-number"), - memory: total_memory() + memory: total_memory(), + collector: __MODULE__ ] end diff --git a/augie/lib/augie/distributed_supervisor.ex b/augie/lib/augie/distributed_supervisor.ex index fdd093b..6d9bcd5 100644 --- a/augie/lib/augie/distributed_supervisor.ex +++ b/augie/lib/augie/distributed_supervisor.ex @@ -14,7 +14,8 @@ defmodule Augie.DistributedSupervisor do Task.async(fn -> [ - Augie.Collector + Augie.Collector, + Augie.Collector.Cluster ] |> Enum.each(&start_child(&1)) end)