diff options
Diffstat (limited to 'test/web/streamer')
| -rw-r--r-- | test/web/streamer/ping_test.exs | 36 | ||||
| -rw-r--r-- | test/web/streamer/state_test.exs | 54 | ||||
| -rw-r--r-- | test/web/streamer/streamer_test.exs | 699 | 
3 files changed, 325 insertions, 464 deletions
| diff --git a/test/web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs deleted file mode 100644 index 3d52c00e4..000000000 --- a/test/web/streamer/ping_test.exs +++ /dev/null @@ -1,36 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.PingTest do -  use Pleroma.DataCase - -  import Pleroma.Factory -  alias Pleroma.Web.Streamer - -  setup do -    start_supervised({Streamer.supervisor(), [ping_interval: 30]}) - -    :ok -  end - -  describe "sockets" do -    setup do -      user = insert(:user) -      {:ok, %{user: user}} -    end - -    test "it sends pings", %{user: user} do -      task = -        Task.async(fn -> -          assert_receive {:text, received_event}, 40 -          assert_receive {:text, received_event}, 40 -          assert_receive {:text, received_event}, 40 -        end) - -      Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}}) - -      Task.await(task) -    end -  end -end diff --git a/test/web/streamer/state_test.exs b/test/web/streamer/state_test.exs deleted file mode 100644 index d1aeac541..000000000 --- a/test/web/streamer/state_test.exs +++ /dev/null @@ -1,54 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.StateTest do -  use Pleroma.DataCase - -  import Pleroma.Factory -  alias Pleroma.Web.Streamer -  alias Pleroma.Web.Streamer.StreamerSocket - -  @moduletag needs_streamer: true - -  describe "sockets" do -    setup do -      user = insert(:user) -      user2 = insert(:user) -      {:ok, %{user: user, user2: user2}} -    end - -    test "it can add a socket", %{user: user} do -      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) - -      assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets()) -    end - -    test "it can add multiple sockets per user", %{user: user} do -      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) -      Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}}) - -      assert( -        %{ -          "public" => [ -            %StreamerSocket{transport_pid: 2}, -            %StreamerSocket{transport_pid: 1} -          ] -        } = Streamer.get_sockets() -      ) -    end - -    test "it will not add a duplicate socket", %{user: user} do -      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) -      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) - -      assert( -        %{ -          "activity" => [ -            %StreamerSocket{transport_pid: 1} -          ] -        } = Streamer.get_sockets() -      ) -    end -  end -end diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs index 1cf20f1c2..95b7d1420 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -1,5 +1,5 @@  # Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.StreamerTest do @@ -12,15 +12,80 @@ defmodule Pleroma.Web.StreamerTest do    alias Pleroma.User    alias Pleroma.Web.CommonAPI    alias Pleroma.Web.Streamer -  alias Pleroma.Web.Streamer.StreamerSocket -  alias Pleroma.Web.Streamer.Worker    @moduletag needs_streamer: true, capture_log: true -  @streamer_timeout 150 -  @streamer_start_wait 10 +  setup do: clear_config([:instance, :skip_thread_containment]) -  clear_config([:instance, :skip_thread_containment]) +  describe "get_topic without an user" do +    test "allows public" do +      assert {:ok, "public"} = Streamer.get_topic("public", nil) +      assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil) +      assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil) +      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil) +    end + +    test "allows hashtag streams" do +      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"}) +    end + +    test "disallows user streams" do +      assert {:error, _} = Streamer.get_topic("user", nil) +      assert {:error, _} = Streamer.get_topic("user:notification", nil) +      assert {:error, _} = Streamer.get_topic("direct", nil) +    end + +    test "disallows list streams" do +      assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42}) +    end +  end + +  describe "get_topic with an user" do +    setup do +      user = insert(:user) +      {:ok, %{user: user}} +    end + +    test "allows public streams", %{user: user} do +      assert {:ok, "public"} = Streamer.get_topic("public", user) +      assert {:ok, "public:local"} = Streamer.get_topic("public:local", user) +      assert {:ok, "public:media"} = Streamer.get_topic("public:media", user) +      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user) +    end + +    test "allows user streams", %{user: user} do +      expected_user_topic = "user:#{user.id}" +      expected_notif_topic = "user:notification:#{user.id}" +      expected_direct_topic = "direct:#{user.id}" +      assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user) +      assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user) +      assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user) +    end + +    test "allows hashtag streams", %{user: user} do +      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"}) +    end + +    test "disallows registering to an user stream", %{user: user} do +      another_user = insert(:user) +      assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user) +      assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user) +      assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user) +    end + +    test "allows list stream that are owned by the user", %{user: user} do +      {:ok, list} = List.create("Test", user) +      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) +      assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id}) +    end + +    test "disallows list stream that are not owned by the user", %{user: user} do +      another_user = insert(:user) +      {:ok, list} = List.create("Test", another_user) +      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) +      assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id}) +    end +  end    describe "user streams" do      setup do @@ -29,34 +94,36 @@ defmodule Pleroma.Web.StreamerTest do        {:ok, %{user: user, notify: notify}}      end -    test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do -      task = -        Task.async(fn -> -          assert_receive {:text, _}, @streamer_timeout -        end) +    test "it streams the user's post in the 'user' stream", %{user: user} do +      Streamer.get_topic_and_add_socket("user", user) +      {:ok, activity} = CommonAPI.post(user, %{status: "hey"}) +      assert_receive {:render_with_user, _, _, ^activity} +      refute Streamer.filtered_by_user?(user, activity) +    end + +    test "it streams boosts of the user in the 'user' stream", %{user: user} do +      Streamer.get_topic_and_add_socket("user", user) -      Streamer.add_socket( -        "user", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +      other_user = insert(:user) +      {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) +      {:ok, announce, _} = CommonAPI.repeat(activity.id, user) +      assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce} +      refute Streamer.filtered_by_user?(user, announce) +    end + +    test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do +      Streamer.get_topic_and_add_socket("user", user)        Streamer.stream("user", notify) -      Task.await(task) +      assert_receive {:render_with_user, _, _, ^notify} +      refute Streamer.filtered_by_user?(user, notify)      end      test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do -      task = -        Task.async(fn -> -          assert_receive {:text, _}, @streamer_timeout -        end) - -      Streamer.add_socket( -        "user:notification", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) - +      Streamer.get_topic_and_add_socket("user:notification", user)        Streamer.stream("user:notification", notify) -      Task.await(task) +      assert_receive {:render_with_user, _, _, ^notify} +      refute Streamer.filtered_by_user?(user, notify)      end      test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{ @@ -65,18 +132,12 @@ defmodule Pleroma.Web.StreamerTest do        blocked = insert(:user)        {:ok, _user_relationship} = User.block(user, blocked) -      {:ok, activity} = CommonAPI.post(user, %{"status" => ":("}) -      {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked) - -      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) +      Streamer.get_topic_and_add_socket("user:notification", user) -      Streamer.add_socket( -        "user:notification", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +      {:ok, activity} = CommonAPI.post(user, %{status: ":("}) +      {:ok, _} = CommonAPI.favorite(blocked, activity.id) -      Streamer.stream("user:notification", notif) -      Task.await(task) +      refute_receive _      end      test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{ @@ -84,121 +145,116 @@ defmodule Pleroma.Web.StreamerTest do      } do        user2 = insert(:user) -      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) -      {:ok, activity} = CommonAPI.add_mute(user, activity) -      {:ok, notif, _} = CommonAPI.favorite(activity.id, user2) +      {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) +      {:ok, _} = CommonAPI.add_mute(user, activity) -      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) +      Streamer.get_topic_and_add_socket("user:notification", user) -      Streamer.add_socket( -        "user:notification", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) -      Streamer.stream("user:notification", notif) -      Task.await(task) +      refute_receive _ +      assert Streamer.filtered_by_user?(user, favorite_activity)      end -    test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{ +    test "it sends favorite to 'user:notification' stream'", %{        user: user      } do        user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) -      {:ok, user} = User.block_domain(user, "hecking-lewd-place.com") -      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) -      {:ok, notif, _} = CommonAPI.favorite(activity.id, user2) +      {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) +      Streamer.get_topic_and_add_socket("user:notification", user) +      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) -      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) +      assert_receive {:render_with_user, _, "notification.json", notif} +      assert notif.activity.id == favorite_activity.id +      refute Streamer.filtered_by_user?(user, notif) +    end -      Streamer.add_socket( -        "user:notification", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +    test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{ +      user: user +    } do +      user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) -      Streamer.stream("user:notification", notif) -      Task.await(task) +      {:ok, user} = User.block_domain(user, "hecking-lewd-place.com") +      {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) +      Streamer.get_topic_and_add_socket("user:notification", user) +      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) + +      refute_receive _ +      assert Streamer.filtered_by_user?(user, favorite_activity)      end      test "it sends follow activities to the 'user:notification' stream", %{        user: user      } do +      user_url = user.ap_id        user2 = insert(:user) -      task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end) -      Process.sleep(@streamer_start_wait) +      body = +        File.read!("test/fixtures/users_mock/localhost.json") +        |> String.replace("{{nickname}}", user.nickname) +        |> Jason.encode!() -      Streamer.add_socket( -        "user:notification", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +      Tesla.Mock.mock_global(fn +        %{method: :get, url: ^user_url} -> +          %Tesla.Env{status: 200, body: body} +      end) -      {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user) +      Streamer.get_topic_and_add_socket("user:notification", user) +      {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) -      # We don't directly pipe the notification to the streamer as it's already -      # generated as a side effect of CommonAPI.follow(). -      Task.await(task) +      assert_receive {:render_with_user, _, "notification.json", notif} +      assert notif.activity.id == follow_activity.id +      refute Streamer.filtered_by_user?(user, notif)      end    end -  test "it sends to public" do +  test "it sends to public authenticated" do      user = insert(:user)      other_user = insert(:user) -    task = -      Task.async(fn -> -        assert_receive {:text, _}, @streamer_timeout -      end) +    Streamer.get_topic_and_add_socket("public", other_user) -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user -    } - -    {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) - -    topics = %{ -      "public" => [fake_socket] -    } - -    Worker.push_to_socket(topics, "public", activity) - -    Task.await(task) +    {:ok, activity} = CommonAPI.post(user, %{status: "Test"}) +    assert_receive {:render_with_user, _, _, ^activity} +    refute Streamer.filtered_by_user?(user, activity) +  end -    task = -      Task.async(fn -> -        expected_event = -          %{ -            "event" => "delete", -            "payload" => activity.id -          } -          |> Jason.encode!() +  test "works for deletions" do +    user = insert(:user) +    other_user = insert(:user) +    {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"}) -        assert_receive {:text, received_event}, @streamer_timeout -        assert received_event == expected_event -      end) +    Streamer.get_topic_and_add_socket("public", user) -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user -    } +    {:ok, _} = CommonAPI.delete(activity.id, other_user) +    activity_id = activity.id +    assert_receive {:text, event} +    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event) +  end -    {:ok, activity} = CommonAPI.delete(activity.id, other_user) +  test "it sends to public unauthenticated" do +    user = insert(:user) -    topics = %{ -      "public" => [fake_socket] -    } +    Streamer.get_topic_and_add_socket("public", nil) -    Worker.push_to_socket(topics, "public", activity) +    {:ok, activity} = CommonAPI.post(user, %{status: "Test"}) +    activity_id = activity.id +    assert_receive {:text, event} +    assert %{"event" => "update", "payload" => payload} = Jason.decode!(event) +    assert %{"id" => ^activity_id} = Jason.decode!(payload) -    Task.await(task) +    {:ok, _} = CommonAPI.delete(activity.id, user) +    assert_receive {:text, event} +    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)    end    describe "thread_containment" do -    test "it doesn't send to user if recipients invalid and thread containment is enabled" do +    test "it filters to user if recipients invalid and thread containment is enabled" do        Pleroma.Config.put([:instance, :skip_thread_containment], false)        author = insert(:user)        user = insert(:user) -      User.follow(user, author, "accept") +      User.follow(user, author, :follow_accept)        activity =          insert(:note_activity, @@ -209,19 +265,17 @@ defmodule Pleroma.Web.StreamerTest do              )          ) -      task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) -      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} -      topics = %{"public" => [fake_socket]} -      Worker.push_to_socket(topics, "public", activity) - -      Task.await(task) +      Streamer.get_topic_and_add_socket("public", user) +      Streamer.stream("public", activity) +      assert_receive {:render_with_user, _, _, ^activity} +      assert Streamer.filtered_by_user?(user, activity)      end      test "it sends message if recipients invalid and thread containment is disabled" do        Pleroma.Config.put([:instance, :skip_thread_containment], true)        author = insert(:user)        user = insert(:user) -      User.follow(user, author, "accept") +      User.follow(user, author, :follow_accept)        activity =          insert(:note_activity, @@ -232,19 +286,18 @@ defmodule Pleroma.Web.StreamerTest do              )          ) -      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) -      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} -      topics = %{"public" => [fake_socket]} -      Worker.push_to_socket(topics, "public", activity) +      Streamer.get_topic_and_add_socket("public", user) +      Streamer.stream("public", activity) -      Task.await(task) +      assert_receive {:render_with_user, _, _, ^activity} +      refute Streamer.filtered_by_user?(user, activity)      end      test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do        Pleroma.Config.put([:instance, :skip_thread_containment], false)        author = insert(:user)        user = insert(:user, skip_thread_containment: true) -      User.follow(user, author, "accept") +      User.follow(user, author, :follow_accept)        activity =          insert(:note_activity, @@ -255,255 +308,168 @@ defmodule Pleroma.Web.StreamerTest do              )          ) -      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) -      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} -      topics = %{"public" => [fake_socket]} -      Worker.push_to_socket(topics, "public", activity) +      Streamer.get_topic_and_add_socket("public", user) +      Streamer.stream("public", activity) -      Task.await(task) +      assert_receive {:render_with_user, _, _, ^activity} +      refute Streamer.filtered_by_user?(user, activity)      end    end    describe "blocks" do -    test "it doesn't send messages involving blocked users" do +    test "it filters messages involving blocked users" do        user = insert(:user)        blocked_user = insert(:user)        {:ok, _user_relationship} = User.block(user, blocked_user) -      {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) - -      task = -        Task.async(fn -> -          refute_receive {:text, _}, 1_000 -        end) - -      fake_socket = %StreamerSocket{ -        transport_pid: task.pid, -        user: user -      } - -      topics = %{ -        "public" => [fake_socket] -      } - -      Worker.push_to_socket(topics, "public", activity) - -      Task.await(task) +      Streamer.get_topic_and_add_socket("public", user) +      {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"}) +      assert_receive {:render_with_user, _, _, ^activity} +      assert Streamer.filtered_by_user?(user, activity)      end -    test "it doesn't send messages transitively involving blocked users" do +    test "it filters messages transitively involving blocked users" do        blocker = insert(:user)        blockee = insert(:user)        friend = insert(:user) -      task = -        Task.async(fn -> -          refute_receive {:text, _}, 1_000 -        end) - -      fake_socket = %StreamerSocket{ -        transport_pid: task.pid, -        user: blocker -      } - -      topics = %{ -        "public" => [fake_socket] -      } +      Streamer.get_topic_and_add_socket("public", blocker)        {:ok, _user_relationship} = User.block(blocker, blockee) -      {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"}) +      {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"}) -      Worker.push_to_socket(topics, "public", activity_one) +      assert_receive {:render_with_user, _, _, ^activity_one} +      assert Streamer.filtered_by_user?(blocker, activity_one) -      {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"}) +      {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"}) -      Worker.push_to_socket(topics, "public", activity_two) +      assert_receive {:render_with_user, _, _, ^activity_two} +      assert Streamer.filtered_by_user?(blocker, activity_two) -      {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"}) +      {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"}) -      Worker.push_to_socket(topics, "public", activity_three) - -      Task.await(task) +      assert_receive {:render_with_user, _, _, ^activity_three} +      assert Streamer.filtered_by_user?(blocker, activity_three)      end    end -  test "it doesn't send unwanted DMs to list" do -    user_a = insert(:user) -    user_b = insert(:user) -    user_c = insert(:user) - -    {:ok, user_a} = User.follow(user_a, user_b) - -    {:ok, list} = List.create("Test", user_a) -    {:ok, list} = List.follow(list, user_b) - -    {:ok, activity} = -      CommonAPI.post(user_b, %{ -        "status" => "@#{user_c.nickname} Test", -        "visibility" => "direct" -      }) - -    task = -      Task.async(fn -> -        refute_receive {:text, _}, 1_000 -      end) - -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user_a -    } - -    topics = %{ -      "list:#{list.id}" => [fake_socket] -    } - -    Worker.handle_call({:stream, "list", activity}, self(), topics) - -    Task.await(task) -  end - -  test "it doesn't send unwanted private posts to list" do -    user_a = insert(:user) -    user_b = insert(:user) +  describe "lists" do +    test "it doesn't send unwanted DMs to list" do +      user_a = insert(:user) +      user_b = insert(:user) +      user_c = insert(:user) -    {:ok, list} = List.create("Test", user_a) -    {:ok, list} = List.follow(list, user_b) +      {:ok, user_a} = User.follow(user_a, user_b) -    {:ok, activity} = -      CommonAPI.post(user_b, %{ -        "status" => "Test", -        "visibility" => "private" -      }) +      {:ok, list} = List.create("Test", user_a) +      {:ok, list} = List.follow(list, user_b) -    task = -      Task.async(fn -> -        refute_receive {:text, _}, 1_000 -      end) +      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user_a -    } +      {:ok, _activity} = +        CommonAPI.post(user_b, %{ +          status: "@#{user_c.nickname} Test", +          visibility: "direct" +        }) -    topics = %{ -      "list:#{list.id}" => [fake_socket] -    } +      refute_receive _ +    end -    Worker.handle_call({:stream, "list", activity}, self(), topics) +    test "it doesn't send unwanted private posts to list" do +      user_a = insert(:user) +      user_b = insert(:user) -    Task.await(task) -  end +      {:ok, list} = List.create("Test", user_a) +      {:ok, list} = List.follow(list, user_b) -  test "it sends wanted private posts to list" do -    user_a = insert(:user) -    user_b = insert(:user) +      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) -    {:ok, user_a} = User.follow(user_a, user_b) +      {:ok, _activity} = +        CommonAPI.post(user_b, %{ +          status: "Test", +          visibility: "private" +        }) -    {:ok, list} = List.create("Test", user_a) -    {:ok, list} = List.follow(list, user_b) +      refute_receive _ +    end -    {:ok, activity} = -      CommonAPI.post(user_b, %{ -        "status" => "Test", -        "visibility" => "private" -      }) +    test "it sends wanted private posts to list" do +      user_a = insert(:user) +      user_b = insert(:user) -    task = -      Task.async(fn -> -        assert_receive {:text, _}, 1_000 -      end) +      {:ok, user_a} = User.follow(user_a, user_b) -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user_a -    } +      {:ok, list} = List.create("Test", user_a) +      {:ok, list} = List.follow(list, user_b) -    Streamer.add_socket( -      "list:#{list.id}", -      fake_socket -    ) +      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) -    Worker.handle_call({:stream, "list", activity}, self(), %{}) +      {:ok, activity} = +        CommonAPI.post(user_b, %{ +          status: "Test", +          visibility: "private" +        }) -    Task.await(task) +      assert_receive {:render_with_user, _, _, ^activity} +      refute Streamer.filtered_by_user?(user_a, activity) +    end    end -  test "it doesn't send muted reblogs" do -    user1 = insert(:user) -    user2 = insert(:user) -    user3 = insert(:user) -    CommonAPI.hide_reblogs(user1, user2) - -    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) -    {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2) - -    task = -      Task.async(fn -> -        refute_receive {:text, _}, 1_000 -      end) - -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user1 -    } - -    topics = %{ -      "public" => [fake_socket] -    } - -    Worker.push_to_socket(topics, "public", announce_activity) +  describe "muted reblogs" do +    test "it filters muted reblogs" do +      user1 = insert(:user) +      user2 = insert(:user) +      user3 = insert(:user) +      CommonAPI.follow(user1, user2) +      CommonAPI.hide_reblogs(user1, user2) -    Task.await(task) -  end +      {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"}) -  test "it does send non-reblog notification for reblog-muted actors" do -    user1 = insert(:user) -    user2 = insert(:user) -    user3 = insert(:user) -    CommonAPI.hide_reblogs(user1, user2) +      Streamer.get_topic_and_add_socket("user", user1) +      {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2) +      assert_receive {:render_with_user, _, _, ^announce_activity} +      assert Streamer.filtered_by_user?(user1, announce_activity) +    end -    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) -    {:ok, favorite_activity, _} = CommonAPI.favorite(create_activity.id, user2) +    test "it filters reblog notification for reblog-muted actors" do +      user1 = insert(:user) +      user2 = insert(:user) +      CommonAPI.follow(user1, user2) +      CommonAPI.hide_reblogs(user1, user2) -    task = -      Task.async(fn -> -        assert_receive {:text, _}, 1_000 -      end) +      {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) +      Streamer.get_topic_and_add_socket("user", user1) +      {:ok, _favorite_activity, _} = CommonAPI.repeat(create_activity.id, user2) -    fake_socket = %StreamerSocket{ -      transport_pid: task.pid, -      user: user1 -    } +      assert_receive {:render_with_user, _, "notification.json", notif} +      assert Streamer.filtered_by_user?(user1, notif) +    end -    topics = %{ -      "public" => [fake_socket] -    } +    test "it send non-reblog notification for reblog-muted actors" do +      user1 = insert(:user) +      user2 = insert(:user) +      CommonAPI.follow(user1, user2) +      CommonAPI.hide_reblogs(user1, user2) -    Worker.push_to_socket(topics, "public", favorite_activity) +      {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) +      Streamer.get_topic_and_add_socket("user", user1) +      {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) -    Task.await(task) +      assert_receive {:render_with_user, _, "notification.json", notif} +      refute Streamer.filtered_by_user?(user1, notif) +    end    end -  test "it doesn't send posts from muted threads" do +  test "it filters posts from muted threads" do      user = insert(:user)      user2 = insert(:user) +    Streamer.get_topic_and_add_socket("user", user2)      {:ok, user2, user, _activity} = CommonAPI.follow(user2, user) - -    {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) - -    {:ok, activity} = CommonAPI.add_mute(user2, activity) - -    task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) - -    Streamer.add_socket( -      "user", -      %{transport_pid: task.pid, assigns: %{user: user2}} -    ) - -    Streamer.stream("user", activity) -    Task.await(task) +    {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) +    {:ok, _} = CommonAPI.add_mute(user2, activity) +    assert_receive {:render_with_user, _, _, ^activity} +    assert Streamer.filtered_by_user?(user2, activity)    end    describe "direct streams" do @@ -515,103 +481,88 @@ defmodule Pleroma.Web.StreamerTest do        user = insert(:user)        another_user = insert(:user) -      task = -        Task.async(fn -> -          assert_receive {:text, received_event}, @streamer_timeout - -          assert %{"event" => "conversation", "payload" => received_payload} = -                   Jason.decode!(received_event) - -          assert %{"last_status" => last_status} = Jason.decode!(received_payload) -          [participation] = Participation.for_user(user) -          assert last_status["pleroma"]["direct_conversation_id"] == participation.id -        end) - -      Streamer.add_socket( -        "direct", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +      Streamer.get_topic_and_add_socket("direct", user)        {:ok, _create_activity} =          CommonAPI.post(another_user, %{ -          "status" => "hey @#{user.nickname}", -          "visibility" => "direct" +          status: "hey @#{user.nickname}", +          visibility: "direct"          }) -      Task.await(task) +      assert_receive {:text, received_event} + +      assert %{"event" => "conversation", "payload" => received_payload} = +               Jason.decode!(received_event) + +      assert %{"last_status" => last_status} = Jason.decode!(received_payload) +      [participation] = Participation.for_user(user) +      assert last_status["pleroma"]["direct_conversation_id"] == participation.id      end      test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do        user = insert(:user)        another_user = insert(:user) +      Streamer.get_topic_and_add_socket("direct", user) +        {:ok, create_activity} =          CommonAPI.post(another_user, %{ -          "status" => "hi @#{user.nickname}", -          "visibility" => "direct" +          status: "hi @#{user.nickname}", +          visibility: "direct"          }) -      task = -        Task.async(fn -> -          assert_receive {:text, received_event}, @streamer_timeout -          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) +      create_activity_id = create_activity.id +      assert_receive {:render_with_user, _, _, ^create_activity} +      assert_receive {:text, received_conversation1} +      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) -          refute_receive {:text, _}, @streamer_timeout -        end) +      {:ok, _} = CommonAPI.delete(create_activity_id, another_user) -      Process.sleep(@streamer_start_wait) +      assert_receive {:text, received_event} -      Streamer.add_socket( -        "direct", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +      assert %{"event" => "delete", "payload" => ^create_activity_id} = +               Jason.decode!(received_event) -      {:ok, _} = CommonAPI.delete(create_activity.id, another_user) - -      Task.await(task) +      refute_receive _      end      test "it sends conversation update to the 'direct' stream when a message is deleted" do        user = insert(:user)        another_user = insert(:user) +      Streamer.get_topic_and_add_socket("direct", user)        {:ok, create_activity} =          CommonAPI.post(another_user, %{ -          "status" => "hi @#{user.nickname}", -          "visibility" => "direct" +          status: "hi @#{user.nickname}", +          visibility: "direct"          })        {:ok, create_activity2} =          CommonAPI.post(another_user, %{ -          "status" => "hi @#{user.nickname}", -          "in_reply_to_status_id" => create_activity.id, -          "visibility" => "direct" +          status: "hi @#{user.nickname} 2", +          in_reply_to_status_id: create_activity.id, +          visibility: "direct"          }) -      task = -        Task.async(fn -> -          assert_receive {:text, received_event}, @streamer_timeout -          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) - -          assert_receive {:text, received_event}, @streamer_timeout +      assert_receive {:render_with_user, _, _, ^create_activity} +      assert_receive {:render_with_user, _, _, ^create_activity2} +      assert_receive {:text, received_conversation1} +      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) +      assert_receive {:text, received_conversation1} +      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) -          assert %{"event" => "conversation", "payload" => received_payload} = -                   Jason.decode!(received_event) - -          assert %{"last_status" => last_status} = Jason.decode!(received_payload) -          assert last_status["id"] == to_string(create_activity.id) -        end) +      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user) -      Process.sleep(@streamer_start_wait) +      assert_receive {:text, received_event} +      assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) -      Streamer.add_socket( -        "direct", -        %{transport_pid: task.pid, assigns: %{user: user}} -      ) +      assert_receive {:text, received_event} -      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user) +      assert %{"event" => "conversation", "payload" => received_payload} = +               Jason.decode!(received_event) -      Task.await(task) +      assert %{"last_status" => last_status} = Jason.decode!(received_payload) +      assert last_status["id"] == to_string(create_activity.id)      end    end  end | 
