diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/web/mastodon_api/websocket_handler.ex | 69 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/streamer.ex | 73 | 
2 files changed, 74 insertions, 68 deletions
| diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 393d093e5..94e4595d8 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -19,26 +19,12 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    # Hibernate every X messages    @hibernate_every 100 -  @streams [ -    "public", -    "public:local", -    "public:media", -    "public:local:media", -    "user", -    "user:notification", -    "direct", -    "list", -    "hashtag" -  ] -  @anonymous_streams ["public", "public:local", "hashtag"] -    def init(%{qs: qs} = req, state) do -    with params <- :cow_qs.parse_qs(qs), +    with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),           sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), -         access_token <- List.keyfind(params, "access_token", 0), -         {_, stream} <- List.keyfind(params, "stream", 0), -         {:ok, user} <- allow_request(stream, [access_token, sec_websocket]), -         topic when is_binary(topic) <- expand_topic(stream, params) do +         access_token <- Map.get(params, "access_token"), +         {:ok, user} <- authenticate_request(access_token, sec_websocket), +         {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do        req =          if sec_websocket do            :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) @@ -49,14 +35,14 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do        {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},         %{idle_timeout: @timeout}}      else -      {:error, code} -> -        Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") -        {:ok, req} = :cowboy_req.reply(code, req) +      {:error, :bad_topic} -> +        Logger.debug("#{__MODULE__} bad topic #{inspect(req)}") +        {:ok, req} = :cowboy_req.reply(404, req)          {:ok, req, state} -      error -> -        Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}") -        {:ok, req} = :cowboy_req.reply(400, req) +      {:error, :unauthorized} -> +        Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}") +        {:ok, req} = :cowboy_req.reply(401, req)          {:ok, req, state}      end    end @@ -124,50 +110,23 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    end    # Public streams without authentication. -  defp allow_request(stream, [nil, nil]) when stream in @anonymous_streams do +  defp authenticate_request(nil, nil) do      {:ok, nil}    end    # Authenticated streams. -  defp allow_request(stream, [access_token, sec_websocket]) when stream in @streams do -    token = -      with {"access_token", token} <- access_token do -        token -      else -        _ -> sec_websocket -      end +  defp authenticate_request(access_token, sec_websocket) do +    token = access_token || sec_websocket      with true <- is_bitstring(token),           %Token{user_id: user_id} <- Repo.get_by(Token, token: token),           user = %User{} <- User.get_cached_by_id(user_id) do        {:ok, user}      else -      _ -> {:error, 403} -    end -  end - -  # Not authenticated. -  defp allow_request(stream, _) when stream in @streams, do: {:error, 403} - -  # No matching stream. -  defp allow_request(_, _), do: {:error, 404} - -  defp expand_topic("hashtag", params) do -    case List.keyfind(params, "tag", 0) do -      {_, tag} -> "hashtag:#{tag}" -      _ -> nil +      _ -> {:error, :unauthorized}      end    end -  defp expand_topic("list", params) do -    case List.keyfind(params, "list", 0) do -      {_, list} -> "list:#{list}" -      _ -> nil -    end -  end - -  defp expand_topic(topic, _), do: topic -    defp timer do      Process.send_after(self(), :tick, @tick)    end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 5ad4aa936..49a400df7 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -21,12 +21,68 @@ defmodule Pleroma.Web.Streamer do    def registry, do: @registry -  def add_socket(topic, %User{} = user) do -    if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true) +  @public_streams ["public", "public:local", "public:media", "public:local:media"] +  @user_streams ["user", "user:notification", "direct"] + +  @doc "Expands and authorizes a stream, and registers the process for streaming." +  @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: +          {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} +  def get_topic_and_add_socket(stream, user, params \\ %{}) do +    case get_topic(stream, user, params) do +      {:ok, topic} -> add_socket(topic, user) +      error -> error +    end +  end + +  @doc "Expand and authorizes a stream" +  @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: +          {:ok, topic :: String.t()} | {:error, :bad_topic} +  def get_topic(stream, user, params \\ %{}) + +  # Allow all public steams. +  def get_topic(stream, _, _) when stream in @public_streams do +    {:ok, stream}    end -  def add_socket(topic, _) do -    if should_env_send?(), do: Registry.register(@registry, topic, false) +  # Allow all hashtags streams. +  def get_topic("hashtag", _, %{"tag" => tag}) do +    {:ok, "hashtag:" <> tag} +  end + +  # Expand user streams. +  def get_topic(stream, %User{} = user, _) when stream in @user_streams do +    {:ok, stream <> ":" <> to_string(user.id)} +  end + +  def get_topic(stream, _, _) when stream in @user_streams do +    {:error, :unauthorized} +  end + +  # List streams. +  def get_topic("list", %User{} = user, %{"list" => id}) do +    if Pleroma.List.get(id, user) do +      {:ok, "list:" <> to_string(id)} +    else +      {:error, :bad_topic} +    end +  end + +  def get_topic("list", _, _) do +    {:error, :unauthorized} +  end + +  def get_topic(_, _, _) do +    {:error, :bad_topic} +  end + +  @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic." +  def add_socket(topic, user) do +    if should_env_send?() do +      auth? = if user, do: true +      Registry.register(@registry, topic, auth?) +    end + +    {:ok, topic}    end    def remove_socket(topic) do @@ -231,13 +287,4 @@ defmodule Pleroma.Web.Streamer do      true ->        def should_env_send?, do: true    end - -  defp user_topic(topic, user) -       when topic in ~w[user user:notification direct] do -    "#{topic}:#{user.id}" -  end - -  defp user_topic(topic, _) do -    topic -  end  end | 
