defmodule Podbox.Download.Asset do @moduledoc """ An item in the download queue. """ alias Ash.Query alias Podbox.{Download, Repo} require Ash.Query use Ash.Resource, data_layer: AshSqlite.DataLayer, domain: Download, notifiers: [Ash.Notifier.PubSub] @type t :: Ash.Resource.record() @asset_types [:feed, :enclosure] @valid_states [:queued, :dequeued, :downloading, :complete, :failed] attributes do uuid_primary_key :id attribute :uri, :string, allow_nil?: false attribute :state, :atom, allow_nil?: false, default: :queued, constraints: [one_of: @valid_states] attribute :remaining_retries, :integer, allow_nil?: false, constraints: [min: 0] attribute :total_bytes, :integer, allow_nil?: true, constraints: [min: 0] attribute :transferred_bytes, :integer, allow_nil?: true, constraints: [min: 0] attribute :content_type, :string, allow_nil?: true attribute :error_detail, :string, allow_nil?: true attribute :headers, :map, allow_nil?: false, public?: true, sensitive?: true, default: %{} attribute :asset_type, :atom, allow_nil?: false, constraints: [one_of: @asset_types], public?: true create_timestamp :inserted_at update_timestamp :updated_at end sqlite do table "download_assets" repo Repo end pub_sub do module Podbox.PubSub prefix "download:asset" publish :enqueue, "queued" publish :started, "started" publish :pause, "paused" publish :failed, "failed" publish :progress, "progress" publish :complete, "complete" end actions do create :enqueue do argument :uri, :string, allow_nil?: false, public?: true argument :headers, :map, allow_nil?: true, public?: true, sensitive?: true, default: %{} argument :retries, :integer, allow_nil?: true, default: 5, public?: true argument :asset_type, :atom, allow_nil?: false, public?: true, constraints: [one_of: @asset_types] change set_attribute(:uri, arg(:uri)) change set_attribute(:headers, arg(:headers)), where: [present(:headers)] change set_attribute(:remaining_retries, arg(:retries)) change set_attribute(:state, :queued) change set_attribute(:asset_type, arg(:asset_type)) end update :started do argument :total_bytes, :integer, allow_nil?: true, constraints: [min: 0], public?: true argument :transferred_bytes, :integer, allow_nil?: true, constraints: [min: 0], public?: true argument :content_type, :string, allow_nil?: true, public?: true change set_attribute(:state, :downloading) change set_attribute(:content_type, arg(:content_type)), where: [present(:content_type)] change set_attribute(:total_bytes, arg(:total_bytes)), where: [present(:total_bytes)] change set_attribute(:transferred_bytes, arg(:transferred_bytes)), where: [present(:transferred_bytes)] end update :pause do change set_attribute(:state, :queued) end update :failed do argument :error_detail, :string, allow_nil?: false, public?: true change set_attribute(:error_detail, arg(:error_detail)) change set_attribute(:state, :failed), where: [attribute_equals(:remaining_retries, 0)] change set_attribute(:state, :queued), where: [compare(:remaining_retries, greater_than: 0)] change increment(:remaining_retries, amount: -1), where: [compare(:remaining_retries, greater_than: 0)] end update :progress do argument :transferred_bytes, :integer, allow_nil?: false, constraints: [min: 0] change set_attribute(:transferred_bytes, arg(:transferred_bytes)) end update :complete do argument :transferred_bytes, :integer, allow_nil?: true, constraints: [min: 0], public?: true change set_attribute(:state, :complete) change set_attribute(:transferred_bytes, arg(:transferred_bytes)), where: [present(:transferred_bytes)] end read :queued do argument :limit, :integer, allow_nil?: true, constraints: [min: 0] prepare build(sort: [updated_at: :desc], limit: arg(:limit)) end action :dequeue, {:array, :struct} do constraints items: [instance_of: __MODULE__] argument :limit, :integer, allow_nil?: true, constraints: [min: 0] run &dequeue/2 end destroy :destroy do primary? true end read :read do primary? true pagination keyset?: true, required?: false end read :get_by_id do get_by [:id] end update :dequeued do change set_attribute(:state, :dequeued) end end @doc false def dequeue(input, _) do __MODULE__ |> Query.for_read(:read) |> Query.sort(updated_at: :asc) |> Query.filter(state == :queued || (state == :dequeued && updated_at <= ago(2, :hour))) |> Query.limit(input.arguments.limit) |> Ash.bulk_update(:dequeued, %{}, return_records?: true) |> case do %{status: :success, records: records} -> {:ok, records} %{errors: errors} -> {:error, errors} end end @doc false def asset_types, do: @asset_types @doc false def valid_states, do: @valid_states end