diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/application.ex | 8 | ||||
| -rw-r--r-- | lib/pleroma/object.ex | 8 | ||||
| -rw-r--r-- | lib/pleroma/web/channels/user_socket.ex | 7 | ||||
| -rw-r--r-- | lib/pleroma/web/endpoint.ex | 6 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/federator.ex | 55 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/retry_queue.ex | 71 | ||||
| -rw-r--r-- | lib/pleroma/web/media_proxy/media_proxy.ex | 6 | ||||
| -rw-r--r-- | lib/pleroma/web/ostatus/ostatus_controller.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/web/router.ex | 8 | ||||
| -rw-r--r-- | lib/pleroma/web/websub/websub.ex | 25 | 
10 files changed, 147 insertions, 49 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 50adf68df..2d86efae5 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -9,6 +9,7 @@ defmodule Pleroma.Application do    # See http://elixir-lang.org/docs/stable/elixir/Application.html    # for more information on OTP Applications +  @env Mix.env()    def start(_type, _args) do      import Supervisor.Spec      import Cachex.Spec @@ -63,10 +64,11 @@ defmodule Pleroma.Application do            id: :cachex_idem          ),          worker(Pleroma.Web.Federator, []), -        worker(Pleroma.Stats, []), -        worker(Pleroma.Gopher.Server, []) +        worker(Pleroma.Web.Federator.RetryQueue, []), +        worker(Pleroma.Gopher.Server, []), +        worker(Pleroma.Stats, [])        ] ++ -        if Mix.env() == :test, +        if @env == :test,            do: [],            else:              [worker(Pleroma.Web.Streamer, [])] ++ diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 067ecfaf4..03a75dfbd 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -31,10 +31,12 @@ defmodule Pleroma.Object do    def normalize(ap_id) when is_binary(ap_id), do: Object.get_by_ap_id(ap_id)    def normalize(_), do: nil -  def get_cached_by_ap_id(ap_id) do -    if Mix.env() == :test do +  if Mix.env() == :test do +    def get_cached_by_ap_id(ap_id) do        get_by_ap_id(ap_id) -    else +    end +  else +    def get_cached_by_ap_id(ap_id) do        key = "object:#{ap_id}"        Cachex.fetch!(:object_cache, key, fn _ -> diff --git a/lib/pleroma/web/channels/user_socket.ex b/lib/pleroma/web/channels/user_socket.ex index 21b22b409..07ddee169 100644 --- a/lib/pleroma/web/channels/user_socket.ex +++ b/lib/pleroma/web/channels/user_socket.ex @@ -4,9 +4,7 @@ defmodule Pleroma.Web.UserSocket do    ## Channels    # channel "room:*", Pleroma.Web.RoomChannel -  if Application.get_env(:pleroma, :chat) |> Keyword.get(:enabled) do -    channel("chat:*", Pleroma.Web.ChatChannel) -  end +  channel("chat:*", Pleroma.Web.ChatChannel)    ## Transports    transport(:websocket, Phoenix.Transports.WebSocket) @@ -24,7 +22,8 @@ defmodule Pleroma.Web.UserSocket do    # See `Phoenix.Token` documentation for examples in    # performing token verification on connect.    def connect(%{"token" => token}, socket) do -    with {:ok, user_id} <- Phoenix.Token.verify(socket, "user socket", token, max_age: 84600), +    with true <- Pleroma.Config.get([:chat, :enabled]), +         {:ok, user_id} <- Phoenix.Token.verify(socket, "user socket", token, max_age: 84600),           %User{} = user <- Pleroma.Repo.get(User, user_id) do        {:ok, assign(socket, :user_name, user.nickname)}      else diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex index 85bb4ff5f..8728c908b 100644 --- a/lib/pleroma/web/endpoint.ex +++ b/lib/pleroma/web/endpoint.ex @@ -1,9 +1,7 @@  defmodule Pleroma.Web.Endpoint do    use Phoenix.Endpoint, otp_app: :pleroma -  if Application.get_env(:pleroma, :chat) |> Keyword.get(:enabled) do -    socket("/socket", Pleroma.Web.UserSocket) -  end +  socket("/socket", Pleroma.Web.UserSocket)    socket("/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket) @@ -58,7 +56,7 @@ defmodule Pleroma.Web.Endpoint do      Plug.Session,      store: :cookie,      key: cookie_name, -    signing_salt: "CqaoopA2", +    signing_salt: {Pleroma.Config, :get, [[__MODULE__, :signing_salt], "CqaoopA2"]},      http_only: true,      secure:        Application.get_env(:pleroma, Pleroma.Web.Endpoint) |> Keyword.get(:secure_cookie_flag), diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 6554fd2ef..ac3d7c132 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,6 +3,7 @@ defmodule Pleroma.Web.Federator do    alias Pleroma.User    alias Pleroma.Activity    alias Pleroma.Web.{WebFinger, Websub} +  alias Pleroma.Web.Federator.RetryQueue    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.ActivityPub.Relay    alias Pleroma.Web.ActivityPub.Transmogrifier @@ -122,29 +123,25 @@ defmodule Pleroma.Web.Federator do    end    def handle(:publish_single_ap, params) do -    ActivityPub.publish_one(params) -  end - -  def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do -    signature = @websub.sign(secret || "", xml) -    Logger.debug(fn -> "Pushing #{topic} to #{callback}" end) - -    with {:ok, %{status_code: code}} <- -           @httpoison.post( -             callback, -             xml, -             [ -               {"Content-Type", "application/atom+xml"}, -               {"X-Hub-Signature", "sha1=#{signature}"} -             ], -             timeout: 10000, -             recv_timeout: 20000, -             hackney: [pool: :default] -           ) do -      Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end) -    else -      e -> -        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end) +    case ActivityPub.publish_one(params) do +      {:ok, _} -> +        :ok + +      {:error, _} -> +        RetryQueue.enqueue(params, ActivityPub) +    end +  end + +  def handle( +        :publish_single_websub, +        %{xml: xml, topic: topic, callback: callback, secret: secret} = params +      ) do +    case Websub.publish_one(params) do +      {:ok, _} -> +        :ok + +      {:error, _} -> +        RetryQueue.enqueue(params, Websub)      end    end @@ -153,11 +150,15 @@ defmodule Pleroma.Web.Federator do      {:error, "Don't know what to do with this"}    end -  def enqueue(type, payload, priority \\ 1) do -    if Pleroma.Config.get([:instance, :federating]) do -      if Mix.env() == :test do +  if Mix.env() == :test do +    def enqueue(type, payload, priority \\ 1) do +      if Pleroma.Config.get([:instance, :federating]) do          handle(type, payload) -      else +      end +    end +  else +    def enqueue(type, payload, priority \\ 1) do +      if Pleroma.Config.get([:instance, :federating]) do          GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})        end      end diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex new file mode 100644 index 000000000..06c094f26 --- /dev/null +++ b/lib/pleroma/web/federator/retry_queue.ex @@ -0,0 +1,71 @@ +defmodule Pleroma.Web.Federator.RetryQueue do +  use GenServer +  alias Pleroma.Web.{WebFinger, Websub} +  alias Pleroma.Web.ActivityPub.ActivityPub +  require Logger + +  @websub Application.get_env(:pleroma, :websub) +  @ostatus Application.get_env(:pleroma, :websub) +  @httpoison Application.get_env(:pleroma, :websub) +  @instance Application.get_env(:pleroma, :websub) +  # initial timeout, 5 min +  @initial_timeout 30_000 +  @max_retries 5 + +  def init(args) do +    {:ok, args} +  end + +  def start_link() do +    GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__) +  end + +  def enqueue(data, transport, retries \\ 0) do +    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) +  end + +  def get_retry_params(retries) do +    if retries > @max_retries do +      {:drop, "Max retries reached"} +    else +      {:retry, growth_function(retries)} +    end +  end + +  def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do +    case get_retry_params(retries) do +      {:retry, timeout} -> +        Process.send_after( +          __MODULE__, +          {:send, data, transport, retries}, +          growth_function(retries) +        ) + +        {:noreply, state} + +      {:drop, message} -> +        Logger.debug(message) +        {:noreply, %{state | dropped: drop_count + 1}} +    end +  end + +  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do +    case transport.publish_one(data) do +      {:ok, _} -> +        {:noreply, %{state | delivered: delivery_count + 1}} + +      {:error, reason} -> +        enqueue(data, transport, retries) +        {:noreply, state} +    end +  end + +  def handle_info(unknown, state) do +    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") +    {:noreply, state} +  end + +  defp growth_function(retries) do +    round(@initial_timeout * :math.pow(retries, 3)) +  end +end diff --git a/lib/pleroma/web/media_proxy/media_proxy.ex b/lib/pleroma/web/media_proxy/media_proxy.ex index 93c36b4ed..0fc0a07b2 100644 --- a/lib/pleroma/web/media_proxy/media_proxy.ex +++ b/lib/pleroma/web/media_proxy/media_proxy.ex @@ -3,6 +3,8 @@ defmodule Pleroma.Web.MediaProxy do    def url(nil), do: nil +  def url(""), do: nil +    def url(url = "/" <> _), do: url    def url(url) do @@ -15,10 +17,10 @@ defmodule Pleroma.Web.MediaProxy do        base64 = Base.url_encode64(url, @base64_opts)        sig = :crypto.hmac(:sha, secret, base64)        sig64 = sig |> Base.url_encode64(@base64_opts) -      filename = Path.basename(URI.parse(url).path) +      filename = if path = URI.parse(url).path, do: "/" <> Path.basename(path), else: ""        Keyword.get(config, :base_url, Pleroma.Web.base_url()) <> -        "/proxy/#{sig64}/#{base64}/#{filename}" +        "/proxy/#{sig64}/#{base64}#{filename}"      end    end diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex index 34fdf9727..af6e22c2b 100644 --- a/lib/pleroma/web/ostatus/ostatus_controller.ex +++ b/lib/pleroma/web/ostatus/ostatus_controller.ex @@ -136,7 +136,7 @@ defmodule Pleroma.Web.OStatus.OStatusController do          "html" ->            conn            |> put_resp_content_type("text/html") -          |> send_file(200, "priv/static/index.html") +          |> send_file(200, Application.app_dir(:pleroma, "priv/static/index.html"))          _ ->            represent_activity(conn, format, activity, user) diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index 462369806..07f69fcd8 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -404,11 +404,9 @@ defmodule Fallback.RedirectController do    use Pleroma.Web, :controller    def redirector(conn, _params) do -    if Mix.env() != :test do -      conn -      |> put_resp_content_type("text/html") -      |> send_file(200, "priv/static/index.html") -    end +    conn +    |> put_resp_content_type("text/html") +    |> send_file(200, Application.app_dir(:pleroma, "priv/static/index.html"))    end    def registration_page(conn, params) do diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index e494811f9..396dcf045 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -252,4 +252,29 @@ defmodule Pleroma.Web.Websub do        Pleroma.Web.Federator.enqueue(:request_subscription, sub)      end)    end + +  def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do +    signature = sign(secret || "", xml) +    Logger.info(fn -> "Pushing #{topic} to #{callback}" end) + +    with {:ok, %{status_code: code}} <- +           @httpoison.post( +             callback, +             xml, +             [ +               {"Content-Type", "application/atom+xml"}, +               {"X-Hub-Signature", "sha1=#{signature}"} +             ], +             timeout: 10000, +             recv_timeout: 20000, +             hackney: [pool: :default] +           ) do +      Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) +      {:ok, code} +    else +      e -> +        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end) +        {:error, e} +    end +  end  end | 
