podbox_ash/lib/podbox/download/broadway.ex

84 lines
2.1 KiB
Elixir
Raw Normal View History

2024-05-22 13:17:21 +12:00
defmodule Podbox.Download.Broadway do
@moduledoc """
A broadway which does as many simultaneous downloads as possible.
"""
use Broadway
alias Broadway.Message
alias Ecto.UUID
alias Podbox.{Download, Download.Asset, Download.Producer, Download.Request}
@doc false
@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts) do
producer = Keyword.get(opts, :producer, Producer)
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {producer, []},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [
default: [concurrency: System.schedulers_online() * 4]
]
)
end
@doc false
@impl true
def handle_message(:default, %{data: asset} = message, _context)
when is_struct(asset, Asset) do
case Request.request(asset) do
:ok -> message
{:error, reason} -> Message.failed(message, reason)
end
end
@doc false
@spec transform(Asset.t(), Keyword.t()) :: Message.t()
def transform(asset, _opts) do
%Message{
data: asset,
acknowledger: {__MODULE__, :asset, asset.id}
}
end
@doc false
@spec ack(:asset, [UUID.t()], [UUID.t()]) :: :ok
def ack(:asset, successful, failed) do
successful
|> Stream.map(& &1.data)
|> Ash.bulk_update!(:complete, %{}, return_errors?: true, domain: Download, resource: Asset)
failed
|> Stream.map(fn
%{status: {:failed, error}, data: data} ->
{describe_error(error), data}
%{status: {type, error, _stacktrace}, data: data} when type in [:throw, :error, :exit] ->
{describe_error(error), data}
end)
|> Enum.group_by(&elem(&1, 0))
|> Enum.each(fn {failure, assets} ->
Ash.bulk_update!(assets, :failed, %{error_detail: inspect(failure)},
domain: Download,
resource: Asset
)
end)
end
defp describe_error(error) when is_exception(error), do: Exception.message(error)
defp describe_error(error) when is_binary(error) do
if String.printable?(error) do
error
else
inspect(error)
end
end
defp describe_error(error), do: inspect(error)
end