diff options
| -rw-r--r-- | lib/pleroma/application.ex | 5 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/federator.ex | 43 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/retry_queue.ex | 76 | ||||
| -rw-r--r-- | lib/pleroma/web/websub/websub.ex | 25 | 
4 files changed, 124 insertions, 25 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index eedad7675..707200737 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -57,8 +57,9 @@ defmodule Pleroma.Application do            id: :cachex_idem          ),          worker(Pleroma.Web.Federator, []), -        worker(Pleroma.Stats, []), -        worker(Pleroma.Gopher.Server, []) +        worker(Pleroma.Web.Federator.RetryQueue, []), +        worker(Pleroma.Gopher.Server, []), +        worker(Pleroma.Stats, [])        ] ++          if Mix.env() == :test,            do: [], diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 6554fd2ef..eefc9b483 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,6 +3,7 @@ defmodule Pleroma.Web.Federator do    alias Pleroma.User    alias Pleroma.Activity    alias Pleroma.Web.{WebFinger, Websub} +  alias Pleroma.Web.Federator.RetryQueue    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.ActivityPub.Relay    alias Pleroma.Web.ActivityPub.Transmogrifier @@ -122,29 +123,25 @@ defmodule Pleroma.Web.Federator do    end    def handle(:publish_single_ap, params) do -    ActivityPub.publish_one(params) -  end - -  def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do -    signature = @websub.sign(secret || "", xml) -    Logger.debug(fn -> "Pushing #{topic} to #{callback}" end) - -    with {:ok, %{status_code: code}} <- -           @httpoison.post( -             callback, -             xml, -             [ -               {"Content-Type", "application/atom+xml"}, -               {"X-Hub-Signature", "sha1=#{signature}"} -             ], -             timeout: 10000, -             recv_timeout: 20000, -             hackney: [pool: :default] -           ) do -      Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end) -    else -      e -> -        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end) +    case ActivityPub.publish_one(params) do +      {:ok, _} -> +        :ok + +      {:error, _} -> +        RetryQueue.enqueue(params, :activitypub) +    end +  end + +  def handle( +        :publish_single_websub, +        %{xml: xml, topic: topic, callback: callback, secret: secret} = params +      ) do +    case Websub.publish_one(params) do +      {:ok, _} -> +        :ok + +      {:error, _} -> +        RetryQueue.enqueue(params, :websub)      end    end diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex new file mode 100644 index 000000000..1d38cd5a3 --- /dev/null +++ b/lib/pleroma/web/federator/retry_queue.ex @@ -0,0 +1,76 @@ +defmodule Pleroma.Web.Federator.RetryQueue do +  use GenServer +  alias Pleroma.Web.{WebFinger, Websub} +  alias Pleroma.Web.ActivityPub.ActivityPub +  require Logger + +  @websub Application.get_env(:pleroma, :websub) +  @ostatus Application.get_env(:pleroma, :websub) +  @httpoison Application.get_env(:pleroma, :websub) +  @instance Application.get_env(:pleroma, :websub) +  # initial timeout, 5 min +  @initial_timeout 30_000 +  @max_retries 5 + +  def init(args) do +    {:ok, args} +  end + +  def start_link() do +    GenServer.start_link(__MODULE__, %{}, name: __MODULE__) +  end + +  def enqueue(data, transport, retries \\ 0) do +    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) +  end + +  def handle_cast({:maybe_enqueue, data, transport, retries}, state) do +    if retries > @max_retries do +      Logger.debug("Maximum retries reached on #{inspect(data)}") +      {:noreply, state} +    else +      Process.send_after( +        __MODULE__, +        {:send, data, transport, retries}, +        growth_function(retries) +      ) + +      {:noreply, state} +    end +  end + +  def handle_info({:send, %{topic: topic} = data, :websub, retries}, state) do +    Logger.debug("RetryQueue: Retrying to send object #{topic}") + +    case Websub.publish_one(data) do +      {:ok, _} -> +        {:noreply, state} + +      {:error, reason} -> +        enqueue(data, :websub, retries) +        {:noreply, state} +    end +  end + +  def handle_info({:send, %{id: id} = data, :activitypub, retries}, state) do +    Logger.debug("RetryQueue: Retrying to send object #{id}") + +    case ActivityPub.publish_one(data) do +      {:ok, _} -> +        {:noreply, state} + +      {:error, reason} -> +        enqueue(data, :activitypub, retries) +        {:noreply, state} +    end +  end + +  def handle_info(unknown, state) do +    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") +    {:noreply, state} +  end + +  defp growth_function(retries) do +    round(@initial_timeout * :math.pow(retries, 3)) +  end +end diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index e494811f9..396dcf045 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -252,4 +252,29 @@ defmodule Pleroma.Web.Websub do        Pleroma.Web.Federator.enqueue(:request_subscription, sub)      end)    end + +  def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do +    signature = sign(secret || "", xml) +    Logger.info(fn -> "Pushing #{topic} to #{callback}" end) + +    with {:ok, %{status_code: code}} <- +           @httpoison.post( +             callback, +             xml, +             [ +               {"Content-Type", "application/atom+xml"}, +               {"X-Hub-Signature", "sha1=#{signature}"} +             ], +             timeout: 10000, +             recv_timeout: 20000, +             hackney: [pool: :default] +           ) do +      Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) +      {:ok, code} +    else +      e -> +        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end) +        {:error, e} +    end +  end  end | 
