diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/application.ex | 5 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/federator.ex | 43 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/retry_queue.ex | 71 | ||||
| -rw-r--r-- | lib/pleroma/web/media_proxy/media_proxy.ex | 6 | ||||
| -rw-r--r-- | lib/pleroma/web/websub/websub.ex | 25 | 
5 files changed, 123 insertions, 27 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 4c0e3ddb0..30f0b14c8 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -58,8 +58,9 @@ defmodule Pleroma.Application do            id: :cachex_idem          ),          worker(Pleroma.Web.Federator, []), -        worker(Pleroma.Stats, []), -        worker(Pleroma.Gopher.Server, []) +        worker(Pleroma.Web.Federator.RetryQueue, []), +        worker(Pleroma.Gopher.Server, []), +        worker(Pleroma.Stats, [])        ] ++          if @env == :test,            do: [], diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f047708d3..ac3d7c132 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,6 +3,7 @@ defmodule Pleroma.Web.Federator do    alias Pleroma.User    alias Pleroma.Activity    alias Pleroma.Web.{WebFinger, Websub} +  alias Pleroma.Web.Federator.RetryQueue    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.ActivityPub.Relay    alias Pleroma.Web.ActivityPub.Transmogrifier @@ -122,29 +123,25 @@ defmodule Pleroma.Web.Federator do    end    def handle(:publish_single_ap, params) do -    ActivityPub.publish_one(params) -  end - -  def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do -    signature = @websub.sign(secret || "", xml) -    Logger.debug(fn -> "Pushing #{topic} to #{callback}" end) - -    with {:ok, %{status_code: code}} <- -           @httpoison.post( -             callback, -             xml, -             [ -               {"Content-Type", "application/atom+xml"}, -               {"X-Hub-Signature", "sha1=#{signature}"} -             ], -             timeout: 10000, -             recv_timeout: 20000, -             hackney: [pool: :default] -           ) do -      Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end) -    else -      e -> -        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end) +    case ActivityPub.publish_one(params) do +      {:ok, _} -> +        :ok + +      {:error, _} -> +        RetryQueue.enqueue(params, ActivityPub) +    end +  end + +  def handle( +        :publish_single_websub, +        %{xml: xml, topic: topic, callback: callback, secret: secret} = params +      ) do +    case Websub.publish_one(params) do +      {:ok, _} -> +        :ok + +      {:error, _} -> +        RetryQueue.enqueue(params, Websub)      end    end diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex new file mode 100644 index 000000000..06c094f26 --- /dev/null +++ b/lib/pleroma/web/federator/retry_queue.ex @@ -0,0 +1,71 @@ +defmodule Pleroma.Web.Federator.RetryQueue do +  use GenServer +  alias Pleroma.Web.{WebFinger, Websub} +  alias Pleroma.Web.ActivityPub.ActivityPub +  require Logger + +  @websub Application.get_env(:pleroma, :websub) +  @ostatus Application.get_env(:pleroma, :websub) +  @httpoison Application.get_env(:pleroma, :websub) +  @instance Application.get_env(:pleroma, :websub) +  # initial timeout, 5 min +  @initial_timeout 30_000 +  @max_retries 5 + +  def init(args) do +    {:ok, args} +  end + +  def start_link() do +    GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__) +  end + +  def enqueue(data, transport, retries \\ 0) do +    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) +  end + +  def get_retry_params(retries) do +    if retries > @max_retries do +      {:drop, "Max retries reached"} +    else +      {:retry, growth_function(retries)} +    end +  end + +  def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do +    case get_retry_params(retries) do +      {:retry, timeout} -> +        Process.send_after( +          __MODULE__, +          {:send, data, transport, retries}, +          growth_function(retries) +        ) + +        {:noreply, state} + +      {:drop, message} -> +        Logger.debug(message) +        {:noreply, %{state | dropped: drop_count + 1}} +    end +  end + +  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do +    case transport.publish_one(data) do +      {:ok, _} -> +        {:noreply, %{state | delivered: delivery_count + 1}} + +      {:error, reason} -> +        enqueue(data, transport, retries) +        {:noreply, state} +    end +  end + +  def handle_info(unknown, state) do +    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") +    {:noreply, state} +  end + +  defp growth_function(retries) do +    round(@initial_timeout * :math.pow(retries, 3)) +  end +end diff --git a/lib/pleroma/web/media_proxy/media_proxy.ex b/lib/pleroma/web/media_proxy/media_proxy.ex index 93c36b4ed..0fc0a07b2 100644 --- a/lib/pleroma/web/media_proxy/media_proxy.ex +++ b/lib/pleroma/web/media_proxy/media_proxy.ex @@ -3,6 +3,8 @@ defmodule Pleroma.Web.MediaProxy do    def url(nil), do: nil +  def url(""), do: nil +    def url(url = "/" <> _), do: url    def url(url) do @@ -15,10 +17,10 @@ defmodule Pleroma.Web.MediaProxy do        base64 = Base.url_encode64(url, @base64_opts)        sig = :crypto.hmac(:sha, secret, base64)        sig64 = sig |> Base.url_encode64(@base64_opts) -      filename = Path.basename(URI.parse(url).path) +      filename = if path = URI.parse(url).path, do: "/" <> Path.basename(path), else: ""        Keyword.get(config, :base_url, Pleroma.Web.base_url()) <> -        "/proxy/#{sig64}/#{base64}/#{filename}" +        "/proxy/#{sig64}/#{base64}#{filename}"      end    end diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index e494811f9..396dcf045 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -252,4 +252,29 @@ defmodule Pleroma.Web.Websub do        Pleroma.Web.Federator.enqueue(:request_subscription, sub)      end)    end + +  def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do +    signature = sign(secret || "", xml) +    Logger.info(fn -> "Pushing #{topic} to #{callback}" end) + +    with {:ok, %{status_code: code}} <- +           @httpoison.post( +             callback, +             xml, +             [ +               {"Content-Type", "application/atom+xml"}, +               {"X-Hub-Signature", "sha1=#{signature}"} +             ], +             timeout: 10000, +             recv_timeout: 20000, +             hackney: [pool: :default] +           ) do +      Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) +      {:ok, code} +    else +      e -> +        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end) +        {:error, e} +    end +  end  end | 
