diff options
| -rw-r--r-- | lib/pleroma/web/mastodon_api/websocket_handler.ex | 69 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/state.ex | 19 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/streamer.ex | 65 | ||||
| -rw-r--r-- | test/integration/mastodon_websocket_test.exs | 12 | ||||
| -rw-r--r-- | test/notification_test.exs | 11 | ||||
| -rw-r--r-- | test/web/streamer/streamer_test.exs | 104 | 
6 files changed, 180 insertions, 100 deletions
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 5652a37c1..b1aebe014 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -12,29 +12,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    @behaviour :cowboy_websocket -  @streams [ -    "public", -    "public:local", -    "public:media", -    "public:local:media", -    "user", -    "user:notification", -    "direct", -    "list", -    "hashtag" -  ] -  @anonymous_streams ["public", "public:local", "hashtag"] -    # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.    @timeout :infinity    def init(%{qs: qs} = req, state) do -    with params <- :cow_qs.parse_qs(qs), +    with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),           sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), -         access_token <- List.keyfind(params, "access_token", 0), -         {_, stream} <- List.keyfind(params, "stream", 0), -         {:ok, user} <- allow_request(stream, [access_token, sec_websocket]), -         topic when is_binary(topic) <- expand_topic(stream, params) do +         access_token <- Map.get(params, "access_token"), +         {:ok, user} <- authenticate_request(access_token, sec_websocket), +         {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do        req =          if sec_websocket do            :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) @@ -44,14 +30,14 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do        {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}}      else -      {:error, code} -> -        Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") -        {:ok, req} = :cowboy_req.reply(code, req) +      {:error, :bad_topic} -> +        Logger.debug("#{__MODULE__} bad topic #{inspect(req)}") +        {:ok, req} = :cowboy_req.reply(404, req)          {:ok, req, state} -      error -> -        Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}") -        {:ok, req} = :cowboy_req.reply(400, req) +      {:error, :unauthorized} -> +        Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}") +        {:ok, req} = :cowboy_req.reply(401, req)          {:ok, req, state}      end    end @@ -93,50 +79,23 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    end    # Public streams without authentication. -  defp allow_request(stream, [nil, nil]) when stream in @anonymous_streams do +  defp authenticate_request(nil, nil) do      {:ok, nil}    end    # Authenticated streams. -  defp allow_request(stream, [access_token, sec_websocket]) when stream in @streams do -    token = -      with {"access_token", token} <- access_token do -        token -      else -        _ -> sec_websocket -      end +  defp authenticate_request(access_token, sec_websocket) do +    token = access_token || sec_websocket      with true <- is_bitstring(token),           %Token{user_id: user_id} <- Repo.get_by(Token, token: token),           user = %User{} <- User.get_cached_by_id(user_id) do        {:ok, user}      else -      _ -> {:error, 403} -    end -  end - -  # Not authenticated. -  defp allow_request(stream, _) when stream in @streams, do: {:error, 403} - -  # No matching stream. -  defp allow_request(_, _), do: {:error, 404} - -  defp expand_topic("hashtag", params) do -    case List.keyfind(params, "tag", 0) do -      {_, tag} -> "hashtag:#{tag}" -      _ -> nil +      _ -> {:error, :unauthorized}      end    end -  defp expand_topic("list", params) do -    case List.keyfind(params, "list", 0) do -      {_, list} -> "list:#{list}" -      _ -> nil -    end -  end - -  defp expand_topic(topic, _), do: topic -    defp streamer_socket(state) do      %{transport_pid: self(), assigns: state}    end diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex index 999550b88..4eb462a1a 100644 --- a/lib/pleroma/web/streamer/state.ex +++ b/lib/pleroma/web/streamer/state.ex @@ -36,30 +36,28 @@ defmodule Pleroma.Web.Streamer.State do    end    def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do -    internal_topic = internal_topic(topic, socket)      stream_socket = StreamerSocket.from_socket(socket)      sockets_for_topic =        sockets -      |> Map.get(internal_topic, []) +      |> Map.get(topic, [])        |> List.insert_at(0, stream_socket)        |> Enum.uniq() -    state = put_in(state, [:sockets, internal_topic], sockets_for_topic) +    state = put_in(state, [:sockets, topic], sockets_for_topic)      Logger.debug("Got new conn for #{topic}")      {:reply, state, state}    end    def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do -    internal_topic = internal_topic(topic, socket)      stream_socket = StreamerSocket.from_socket(socket)      sockets_for_topic =        sockets -      |> Map.get(internal_topic, []) +      |> Map.get(topic, [])        |> List.delete(stream_socket) -    state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) +    state = Kernel.put_in(state, [:sockets, topic], sockets_for_topic)      {:reply, state, state}    end @@ -70,13 +68,4 @@ defmodule Pleroma.Web.Streamer.State do    defp do_remove_socket(_env, topic, socket) do      GenServer.call(__MODULE__, {:remove, topic, socket})    end - -  defp internal_topic(topic, socket) -       when topic in ~w[user user:notification direct] do -    "#{topic}:#{socket.assigns[:user].id}" -  end - -  defp internal_topic(topic, _) do -    topic -  end  end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 814d5a729..1e5700b6a 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -5,10 +5,75 @@  defmodule Pleroma.Web.Streamer do    alias Pleroma.Web.Streamer.State    alias Pleroma.Web.Streamer.Worker +  alias Pleroma.User    @timeout 60_000    @mix_env Mix.env() +  @public_streams ["public", "public:local", "public:media", "public:local:media"] +  @user_streams ["user", "user:notification", "direct"] + +  @doc "Expands and authorizes a stream, and registers the process for streaming." +  @spec get_topic_and_add_socket(stream :: String.t(), State.t(), Map.t() | nil) :: +          {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} +  def get_topic_and_add_socket(stream, socket, params \\ %{}) do +    user = +      case socket do +        %{assigns: %{user: user}} -> user +        _ -> nil +      end + +    case get_topic(stream, user, params) do +      {:ok, topic} -> +        add_socket(topic, socket) +        {:ok, topic} + +      error -> +        error +    end +  end + +  @doc "Expand and authorizes a stream" +  @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: +          {:ok, topic :: String.t()} | {:error, :bad_topic} +  def get_topic(stream, user, params \\ %{}) + +  # Allow all public steams. +  def get_topic(stream, _, _) when stream in @public_streams do +    {:ok, stream} +  end + +  # Allow all hashtags streams. +  def get_topic("hashtag", _, %{"tag" => tag}) do +    {:ok, "hashtag:" <> tag} +  end + +  # Expand user streams. +  def get_topic(stream, %User{} = user, _) when stream in @user_streams do +    {:ok, stream <> ":" <> to_string(user.id)} +  end + +  def get_topic(stream, _, _) when stream in @user_streams do +    {:error, :unauthorized} +  end + +  # List streams. +  def get_topic("list", %User{} = user, %{"list" => id}) do +    if Pleroma.List.get(id, user) do +      {:ok, "list:" <> to_string(id)} +    else +      {:error, :bad_topic} +    end +  end + +  def get_topic("list", _, _) do +    {:error, :unauthorized} +  end + +  def get_topic(_, _, _) do +    {:error, :bad_topic} +  end +    def add_socket(topic, socket) do      State.add_socket(topic, socket)    end diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs index bd229c55f..39be5ad08 100644 --- a/test/integration/mastodon_websocket_test.exs +++ b/test/integration/mastodon_websocket_test.exs @@ -35,7 +35,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do    test "refuses invalid requests" do      capture_log(fn -> -      assert {:error, {400, _}} = start_socket() +      assert {:error, {404, _}} = start_socket()        assert {:error, {404, _}} = start_socket("?stream=ncjdk")        Process.sleep(30)      end) @@ -43,8 +43,8 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do    test "requires authentication and a valid token for protected streams" do      capture_log(fn -> -      assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") -      assert {:error, {403, _}} = start_socket("?stream=user") +      assert {:error, {401, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") +      assert {:error, {401, _}} = start_socket("?stream=user")        Process.sleep(30)      end)    end @@ -103,7 +103,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do        assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")        assert capture_log(fn -> -               assert {:error, {403, "Forbidden"}} = start_socket("?stream=user") +               assert {:error, {401, _}} = start_socket("?stream=user")                 Process.sleep(30)               end) =~ ":badarg"      end @@ -112,7 +112,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do        assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")        assert capture_log(fn -> -               assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification") +               assert {:error, {401, _}} = start_socket("?stream=user:notification")                 Process.sleep(30)               end) =~ ":badarg"      end @@ -121,7 +121,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do        assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])        assert capture_log(fn -> -               assert {:error, {403, "Forbidden"}} = +               assert {:error, {401, _}} =                          start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])                 Process.sleep(30) diff --git a/test/notification_test.exs b/test/notification_test.exs index e12418db3..d04754a9d 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -164,12 +164,13 @@ defmodule Pleroma.NotificationTest do        user = insert(:user)        task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)        task_user_notification = Task.async(fn -> assert_receive {:text, _}, 4_000 end) -      Streamer.add_socket("user", %{transport_pid: task.pid, assigns: %{user: user}}) -      Streamer.add_socket( -        "user:notification", -        %{transport_pid: task_user_notification.pid, assigns: %{user: user}} -      ) +      Streamer.get_topic_and_add_socket("user", %{transport_pid: task.pid, assigns: %{user: user}}) + +      Streamer.get_topic_and_add_socket("user:notification", %{ +        transport_pid: task_user_notification.pid, +        assigns: %{user: user} +      })        activity = insert(:note_activity) diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs index 720f8fa44..cbd5ec462 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -17,11 +17,81 @@ defmodule Pleroma.Web.StreamerTest do    @moduletag needs_streamer: true, capture_log: true -  @streamer_timeout 150 +  @streamer_timeout 300    @streamer_start_wait 10    clear_config([:instance, :skip_thread_containment]) +  describe "get_topic without an user" do +    test "allows public" do +      assert {:ok, "public"} = Streamer.get_topic("public", nil) +      assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil) +      assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil) +      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil) +    end + +    test "allows hashtag streams" do +      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"}) +    end + +    test "disallows user streams" do +      assert {:error, _} = Streamer.get_topic("user", nil) +      assert {:error, _} = Streamer.get_topic("user:notification", nil) +      assert {:error, _} = Streamer.get_topic("direct", nil) +    end + +    test "disallows list streams" do +      assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42}) +    end +  end + +  describe "get_topic with an user" do +    setup do +      user = insert(:user) +      {:ok, %{user: user}} +    end + +    test "allows public streams", %{user: user} do +      assert {:ok, "public"} = Streamer.get_topic("public", user) +      assert {:ok, "public:local"} = Streamer.get_topic("public:local", user) +      assert {:ok, "public:media"} = Streamer.get_topic("public:media", user) +      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user) +    end + +    test "allows user streams", %{user: user} do +      expected_user_topic = "user:#{user.id}" +      expected_notif_topic = "user:notification:#{user.id}" +      expected_direct_topic = "direct:#{user.id}" +      assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user) +      assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user) +      assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user) +    end + +    test "allows hashtag streams", %{user: user} do +      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"}) +    end + +    test "disallows registering to an user stream", %{user: user} do +      another_user = insert(:user) +      assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user) +      assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user) +      assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user) +    end + +    test "allows list stream that are owned by the user", %{user: user} do +      {:ok, list} = List.create("Test", user) +      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) +      assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id}) +    end + +    test "disallows list stream that are not owned by the user", %{user: user} do +      another_user = insert(:user) +      {:ok, list} = List.create("Test", another_user) +      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) +      assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id}) +    end +  end +    describe "user streams" do      setup do        user = insert(:user) @@ -35,7 +105,7 @@ defmodule Pleroma.Web.StreamerTest do            assert_receive {:text, _}, @streamer_timeout          end) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "user",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -50,7 +120,7 @@ defmodule Pleroma.Web.StreamerTest do            assert_receive {:text, _}, @streamer_timeout          end) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "user:notification",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -70,7 +140,7 @@ defmodule Pleroma.Web.StreamerTest do        task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "user:notification",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -90,7 +160,7 @@ defmodule Pleroma.Web.StreamerTest do        task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "user:notification",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -110,7 +180,7 @@ defmodule Pleroma.Web.StreamerTest do        task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "user:notification",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -127,7 +197,7 @@ defmodule Pleroma.Web.StreamerTest do        Process.sleep(@streamer_start_wait) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "user:notification",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -415,14 +485,10 @@ defmodule Pleroma.Web.StreamerTest do          assert_receive {:text, _}, 1_000        end) -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user_a -    } - -    Streamer.add_socket( -      "list:#{list.id}", -      fake_socket +    Streamer.get_topic_and_add_socket( +      "list", +      %{transport_pid: task.pid, assigns: %{user: user_a}}, +      %{"list" => list.id}      )      Worker.handle_call({:stream, "list", activity}, self(), %{}) @@ -497,7 +563,7 @@ defmodule Pleroma.Web.StreamerTest do      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) -    Streamer.add_socket( +    Streamer.get_topic_and_add_socket(        "user",        %{transport_pid: task.pid, assigns: %{user: user2}}      ) @@ -527,7 +593,7 @@ defmodule Pleroma.Web.StreamerTest do            assert last_status["pleroma"]["direct_conversation_id"] == participation.id          end) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "direct",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -561,7 +627,7 @@ defmodule Pleroma.Web.StreamerTest do        Process.sleep(@streamer_start_wait) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "direct",          %{transport_pid: task.pid, assigns: %{user: user}}        ) @@ -604,7 +670,7 @@ defmodule Pleroma.Web.StreamerTest do        Process.sleep(@streamer_start_wait) -      Streamer.add_socket( +      Streamer.get_topic_and_add_socket(          "direct",          %{transport_pid: task.pid, assigns: %{user: user}}        )  | 
