83 lines
2.1 KiB
Elixir
83 lines
2.1 KiB
Elixir
defmodule Podbox.Download.Broadway do
|
|
@moduledoc """
|
|
A broadway which does as many simultaneous downloads as possible.
|
|
"""
|
|
|
|
use Broadway
|
|
alias Broadway.Message
|
|
alias Ecto.UUID
|
|
alias Podbox.{Download, Download.Asset, Download.Producer, Download.Request}
|
|
|
|
@doc false
|
|
@spec start_link(Keyword.t()) :: GenServer.on_start()
|
|
def start_link(opts) do
|
|
producer = Keyword.get(opts, :producer, Producer)
|
|
|
|
Broadway.start_link(__MODULE__,
|
|
name: __MODULE__,
|
|
producer: [
|
|
module: {producer, []},
|
|
transformer: {__MODULE__, :transform, []},
|
|
concurrency: 1
|
|
],
|
|
processors: [
|
|
default: [concurrency: System.schedulers_online() * 4]
|
|
]
|
|
)
|
|
end
|
|
|
|
@doc false
|
|
@impl true
|
|
def handle_message(:default, %{data: asset} = message, _context)
|
|
when is_struct(asset, Asset) do
|
|
case Request.request(asset) do
|
|
:ok -> message
|
|
{:error, reason} -> Message.failed(message, reason)
|
|
end
|
|
end
|
|
|
|
@doc false
|
|
@spec transform(Asset.t(), Keyword.t()) :: Message.t()
|
|
def transform(asset, _opts) do
|
|
%Message{
|
|
data: asset,
|
|
acknowledger: {__MODULE__, :asset, asset.id}
|
|
}
|
|
end
|
|
|
|
@doc false
|
|
@spec ack(:asset, [UUID.t()], [UUID.t()]) :: :ok
|
|
def ack(:asset, successful, failed) do
|
|
successful
|
|
|> Stream.map(& &1.data)
|
|
|> Ash.bulk_update!(:complete, %{}, return_errors?: true, domain: Download, resource: Asset)
|
|
|
|
failed
|
|
|> Stream.map(fn
|
|
%{status: {:failed, error}, data: data} ->
|
|
{describe_error(error), data}
|
|
|
|
%{status: {type, error, _stacktrace}, data: data} when type in [:throw, :error, :exit] ->
|
|
{describe_error(error), data}
|
|
end)
|
|
|> Enum.group_by(&elem(&1, 0))
|
|
|> Enum.each(fn {failure, assets} ->
|
|
Ash.bulk_update!(assets, :failed, %{error_detail: inspect(failure)},
|
|
domain: Download,
|
|
resource: Asset
|
|
)
|
|
end)
|
|
end
|
|
|
|
defp describe_error(error) when is_exception(error), do: Exception.message(error)
|
|
|
|
defp describe_error(error) when is_binary(error) do
|
|
if String.printable?(error) do
|
|
error
|
|
else
|
|
inspect(error)
|
|
end
|
|
end
|
|
|
|
defp describe_error(error), do: inspect(error)
|
|
end
|