summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/pleroma/application.ex5
-rw-r--r--lib/pleroma/web/federator/federator.ex43
-rw-r--r--lib/pleroma/web/federator/retry_queue.ex71
-rw-r--r--lib/pleroma/web/media_proxy/media_proxy.ex6
-rw-r--r--lib/pleroma/web/websub/websub.ex25
-rw-r--r--test/media_proxy_test.exs113
-rw-r--r--test/web/retry_queue_test.exs31
7 files changed, 267 insertions, 27 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 4c0e3ddb0..30f0b14c8 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -58,8 +58,9 @@ defmodule Pleroma.Application do
id: :cachex_idem
),
worker(Pleroma.Web.Federator, []),
- worker(Pleroma.Stats, []),
- worker(Pleroma.Gopher.Server, [])
+ worker(Pleroma.Web.Federator.RetryQueue, []),
+ worker(Pleroma.Gopher.Server, []),
+ worker(Pleroma.Stats, [])
] ++
if @env == :test,
do: [],
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index f047708d3..ac3d7c132 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -3,6 +3,7 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.User
alias Pleroma.Activity
alias Pleroma.Web.{WebFinger, Websub}
+ alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.Transmogrifier
@@ -122,29 +123,25 @@ defmodule Pleroma.Web.Federator do
end
def handle(:publish_single_ap, params) do
- ActivityPub.publish_one(params)
- end
-
- def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do
- signature = @websub.sign(secret || "", xml)
- Logger.debug(fn -> "Pushing #{topic} to #{callback}" end)
-
- with {:ok, %{status_code: code}} <-
- @httpoison.post(
- callback,
- xml,
- [
- {"Content-Type", "application/atom+xml"},
- {"X-Hub-Signature", "sha1=#{signature}"}
- ],
- timeout: 10000,
- recv_timeout: 20000,
- hackney: [pool: :default]
- ) do
- Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end)
- else
- e ->
- Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
+ case ActivityPub.publish_one(params) do
+ {:ok, _} ->
+ :ok
+
+ {:error, _} ->
+ RetryQueue.enqueue(params, ActivityPub)
+ end
+ end
+
+ def handle(
+ :publish_single_websub,
+ %{xml: xml, topic: topic, callback: callback, secret: secret} = params
+ ) do
+ case Websub.publish_one(params) do
+ {:ok, _} ->
+ :ok
+
+ {:error, _} ->
+ RetryQueue.enqueue(params, Websub)
end
end
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
new file mode 100644
index 000000000..06c094f26
--- /dev/null
+++ b/lib/pleroma/web/federator/retry_queue.ex
@@ -0,0 +1,71 @@
+defmodule Pleroma.Web.Federator.RetryQueue do
+ use GenServer
+ alias Pleroma.Web.{WebFinger, Websub}
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ require Logger
+
+ @websub Application.get_env(:pleroma, :websub)
+ @ostatus Application.get_env(:pleroma, :websub)
+ @httpoison Application.get_env(:pleroma, :websub)
+ @instance Application.get_env(:pleroma, :websub)
+ # initial timeout, 5 min
+ @initial_timeout 30_000
+ @max_retries 5
+
+ def init(args) do
+ {:ok, args}
+ end
+
+ def start_link() do
+ GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
+ end
+
+ def enqueue(data, transport, retries \\ 0) do
+ GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
+ end
+
+ def get_retry_params(retries) do
+ if retries > @max_retries do
+ {:drop, "Max retries reached"}
+ else
+ {:retry, growth_function(retries)}
+ end
+ end
+
+ def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
+ case get_retry_params(retries) do
+ {:retry, timeout} ->
+ Process.send_after(
+ __MODULE__,
+ {:send, data, transport, retries},
+ growth_function(retries)
+ )
+
+ {:noreply, state}
+
+ {:drop, message} ->
+ Logger.debug(message)
+ {:noreply, %{state | dropped: drop_count + 1}}
+ end
+ end
+
+ def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
+ case transport.publish_one(data) do
+ {:ok, _} ->
+ {:noreply, %{state | delivered: delivery_count + 1}}
+
+ {:error, reason} ->
+ enqueue(data, transport, retries)
+ {:noreply, state}
+ end
+ end
+
+ def handle_info(unknown, state) do
+ Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
+ {:noreply, state}
+ end
+
+ defp growth_function(retries) do
+ round(@initial_timeout * :math.pow(retries, 3))
+ end
+end
diff --git a/lib/pleroma/web/media_proxy/media_proxy.ex b/lib/pleroma/web/media_proxy/media_proxy.ex
index 93c36b4ed..0fc0a07b2 100644
--- a/lib/pleroma/web/media_proxy/media_proxy.ex
+++ b/lib/pleroma/web/media_proxy/media_proxy.ex
@@ -3,6 +3,8 @@ defmodule Pleroma.Web.MediaProxy do
def url(nil), do: nil
+ def url(""), do: nil
+
def url(url = "/" <> _), do: url
def url(url) do
@@ -15,10 +17,10 @@ defmodule Pleroma.Web.MediaProxy do
base64 = Base.url_encode64(url, @base64_opts)
sig = :crypto.hmac(:sha, secret, base64)
sig64 = sig |> Base.url_encode64(@base64_opts)
- filename = Path.basename(URI.parse(url).path)
+ filename = if path = URI.parse(url).path, do: "/" <> Path.basename(path), else: ""
Keyword.get(config, :base_url, Pleroma.Web.base_url()) <>
- "/proxy/#{sig64}/#{base64}/#{filename}"
+ "/proxy/#{sig64}/#{base64}#{filename}"
end
end
diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex
index e494811f9..396dcf045 100644
--- a/lib/pleroma/web/websub/websub.ex
+++ b/lib/pleroma/web/websub/websub.ex
@@ -252,4 +252,29 @@ defmodule Pleroma.Web.Websub do
Pleroma.Web.Federator.enqueue(:request_subscription, sub)
end)
end
+
+ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do
+ signature = sign(secret || "", xml)
+ Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
+
+ with {:ok, %{status_code: code}} <-
+ @httpoison.post(
+ callback,
+ xml,
+ [
+ {"Content-Type", "application/atom+xml"},
+ {"X-Hub-Signature", "sha1=#{signature}"}
+ ],
+ timeout: 10000,
+ recv_timeout: 20000,
+ hackney: [pool: :default]
+ ) do
+ Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
+ {:ok, code}
+ else
+ e ->
+ Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
+ {:error, e}
+ end
+ end
end
diff --git a/test/media_proxy_test.exs b/test/media_proxy_test.exs
new file mode 100644
index 000000000..c69ed7ea4
--- /dev/null
+++ b/test/media_proxy_test.exs
@@ -0,0 +1,113 @@
+defmodule Pleroma.MediaProxyTest do
+ use ExUnit.Case
+ import Pleroma.Web.MediaProxy
+
+ describe "when enabled" do
+ setup do
+ enabled = Pleroma.Config.get([:media_proxy, :enabled])
+
+ unless enabled do
+ Pleroma.Config.put([:media_proxy, :enabled], true)
+ on_exit(fn -> Pleroma.Config.put([:media_proxy, :enabled], enabled) end)
+ end
+
+ :ok
+ end
+
+ test "ignores invalid url" do
+ assert url(nil) == nil
+ assert url("") == nil
+ end
+
+ test "ignores relative url" do
+ assert url("/local") == "/local"
+ assert url("/") == "/"
+ end
+
+ test "ignores local url" do
+ local_url = Pleroma.Web.Endpoint.url() <> "/hello"
+ local_root = Pleroma.Web.Endpoint.url()
+ assert url(local_url) == local_url
+ assert url(local_root) == local_root
+ end
+
+ test "encodes and decodes URL" do
+ url = "https://pleroma.soykaf.com/static/logo.png"
+ encoded = url(url)
+
+ assert String.starts_with?(
+ encoded,
+ Pleroma.Config.get([:media_proxy, :base_url], Pleroma.Web.base_url())
+ )
+
+ assert String.ends_with?(encoded, "/logo.png")
+
+ assert decode_result(encoded) == url
+ end
+
+ test "encodes and decodes URL without a path" do
+ url = "https://pleroma.soykaf.com"
+ encoded = url(url)
+ assert decode_result(encoded) == url
+ end
+
+ test "encodes and decodes URL without an extension" do
+ url = "https://pleroma.soykaf.com/path/"
+ encoded = url(url)
+ assert String.ends_with?(encoded, "/path")
+ assert decode_result(encoded) == url
+ end
+
+ test "encodes and decodes URL and ignores query params for the path" do
+ url = "https://pleroma.soykaf.com/static/logo.png?93939393939&bunny=true"
+ encoded = url(url)
+ assert String.ends_with?(encoded, "/logo.png")
+ assert decode_result(encoded) == url
+ end
+
+ test "validates signature" do
+ secret_key_base = Pleroma.Config.get([Pleroma.Web.Endpoint, :secret_key_base])
+
+ on_exit(fn ->
+ Pleroma.Config.put([Pleroma.Web.Endpoint, :secret_key_base], secret_key_base)
+ end)
+
+ encoded = url("https://pleroma.social")
+
+ Pleroma.Config.put(
+ [Pleroma.Web.Endpoint, :secret_key_base],
+ "00000000000000000000000000000000000000000000000"
+ )
+
+ [_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/")
+ assert decode_url(sig, base64) == {:error, :invalid_signature}
+ end
+ end
+
+ describe "when disabled" do
+ setup do
+ enabled = Pleroma.Config.get([:media_proxy, :enabled])
+
+ if enabled do
+ Pleroma.Config.put([:media_proxy, :enabled], false)
+
+ on_exit(fn ->
+ Pleroma.Config.put([:media_proxy, :enabled], enabled)
+ :ok
+ end)
+ end
+
+ :ok
+ end
+
+ test "does not encode remote urls" do
+ assert url("https://google.fr") == "https://google.fr"
+ end
+ end
+
+ defp decode_result(encoded) do
+ [_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/")
+ {:ok, decoded} = decode_url(sig, base64)
+ decoded
+ end
+end
diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs
new file mode 100644
index 000000000..ce2964993
--- /dev/null
+++ b/test/web/retry_queue_test.exs
@@ -0,0 +1,31 @@
+defmodule MockActivityPub do
+ def publish_one(ret) do
+ {ret, "success"}
+ end
+end
+
+defmodule Pleroma.ActivityTest do
+ use Pleroma.DataCase
+ alias Pleroma.Web.Federator.RetryQueue
+
+ @small_retry_count 0
+ @hopeless_retry_count 10
+
+ test "failed posts are retried" do
+ {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
+
+ assert {:noreply, %{delivered: 1}} ==
+ RetryQueue.handle_info({:send, :ok, MockActivityPub, @small_retry_count}, %{
+ delivered: 0
+ })
+ end
+
+ test "posts that have been tried too many times are dropped" do
+ {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
+
+ assert {:noreply, %{dropped: 1}} ==
+ RetryQueue.handle_cast({:maybe_enqueue, %{}, nil, @hopeless_retry_count}, %{
+ dropped: 0
+ })
+ end
+end