From c62a4f1c173490ad64fdfbab0c005ca3523b6013 Mon Sep 17 00:00:00 2001 From: Tusooa Zhu Date: Fri, 19 Aug 2022 13:19:38 -0400 Subject: Disconnect streaming sessions when token is revoked --- test/pleroma/web/streamer_test.exs | 54 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) (limited to 'test') diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs index b788a9138..5426467e5 100644 --- a/test/pleroma/web/streamer_test.exs +++ b/test/pleroma/web/streamer_test.exs @@ -813,4 +813,58 @@ defmodule Pleroma.Web.StreamerTest do assert last_status["id"] == to_string(create_activity.id) end end + + describe "stop streaming if token got revoked" do + test "do not revoke other tokens" do + %{user: user, token: token} = oauth_access(["read"]) + %{token: token2} = oauth_access(["read"], user: user) + %{user: user2, token: user2_token} = oauth_access(["read"]) + + post_user = insert(:user) + CommonAPI.follow(user, post_user) + CommonAPI.follow(user2, post_user) + + Streamer.get_topic_and_add_socket("user", user, token) + Streamer.get_topic_and_add_socket("user", user, token2) + Streamer.get_topic_and_add_socket("user", user2, user2_token) + + {:ok, _} = + CommonAPI.post(post_user, %{ + status: "hi" + }) + + assert_receive {:render_with_user, _, "update.json", _} + assert_receive {:render_with_user, _, "update.json", _} + assert_receive {:render_with_user, _, "update.json", _} + + Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) + + assert_receive :close + refute_receive :close + end + + test "revoke all streams for this token" do + %{user: user, token: token} = oauth_access(["read"]) + + post_user = insert(:user) + CommonAPI.follow(user, post_user) + + Streamer.get_topic_and_add_socket("user", user, token) + Streamer.get_topic_and_add_socket("user", user, token) + + {:ok, _} = + CommonAPI.post(post_user, %{ + status: "hi" + }) + + assert_receive {:render_with_user, _, "update.json", _} + assert_receive {:render_with_user, _, "update.json", _} + + Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) + + assert_receive :close + assert_receive :close + refute_receive :close + end + end end -- cgit v1.2.3 From eb42e90c4f9ca35a6dc0e84e6f87b6f4b680173c Mon Sep 17 00:00:00 2001 From: Tusooa Zhu Date: Fri, 19 Aug 2022 13:56:39 -0400 Subject: Use Websockex to replace websocket_client --- .../integration/mastodon_websocket_test.exs | 14 +++++------ test/support/websocket_client.ex | 28 ++++++++++------------ 2 files changed, 20 insertions(+), 22 deletions(-) (limited to 'test') diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 43ec57893..1e0319144 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -33,16 +33,16 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do test "refuses invalid requests" do capture_log(fn -> - assert {:error, {404, _}} = start_socket() - assert {:error, {404, _}} = start_socket("?stream=ncjdk") + assert {:error, %WebSockex.RequestError{code: 404}} = start_socket() + assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk") Process.sleep(30) end) end test "requires authentication and a valid token for protected streams" do capture_log(fn -> - assert {:error, {401, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") - assert {:error, {401, _}} = start_socket("?stream=user") + assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") + assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user") Process.sleep(30) end) end @@ -102,7 +102,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") capture_log(fn -> - assert {:error, {401, _}} = start_socket("?stream=user") + assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user") Process.sleep(30) end) end @@ -111,7 +111,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") capture_log(fn -> - assert {:error, {401, _}} = start_socket("?stream=user:notification") + assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user:notification") Process.sleep(30) end) end @@ -120,7 +120,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) capture_log(fn -> - assert {:error, {401, _}} = + assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}]) Process.sleep(30) diff --git a/test/support/websocket_client.ex b/test/support/websocket_client.ex index 34b955474..2660f6151 100644 --- a/test/support/websocket_client.ex +++ b/test/support/websocket_client.ex @@ -5,18 +5,17 @@ defmodule Pleroma.Integration.WebsocketClient do # https://github.com/phoenixframework/phoenix/blob/master/test/support/websocket_client.exs + use WebSockex + @doc """ Starts the WebSocket server for given ws URL. Received Socket.Message's are forwarded to the sender pid """ def start_link(sender, url, headers \\ []) do - :crypto.start() - :ssl.start() - - :websocket_client.start_link( - String.to_charlist(url), + WebSockex.start_link( + url, __MODULE__, - [sender], + %{ sender: sender }, extra_headers: headers ) end @@ -36,27 +35,26 @@ defmodule Pleroma.Integration.WebsocketClient do end @doc false - def init([sender], _conn_state) do - {:ok, %{sender: sender}} - end - - @doc false - def websocket_handle(frame, _conn_state, state) do + @impl true + def handle_frame(frame, state) do send(state.sender, frame) {:ok, state} end @doc false - def websocket_info({:text, msg}, _conn_state, state) do + @impl true + def handle_info({:text, msg}, state) do {:reply, {:text, msg}, state} end - def websocket_info(:close, _conn_state, _state) do + @impl true + def handle_info(:close, _state) do {:close, <<>>, "done"} end @doc false - def websocket_terminate(_reason, _conn_state, _state) do + @impl true + def terminate(_reason, _state) do :ok end end -- cgit v1.2.3 From 3522852c6196cafa63804240f52dd593e09ba694 Mon Sep 17 00:00:00 2001 From: Tusooa Zhu Date: Fri, 19 Aug 2022 14:09:42 -0400 Subject: Test that server will disconnect websocket upon token revocation --- test/pleroma/integration/mastodon_websocket_test.exs | 18 +++++++++++++++++- test/support/websocket_client.ex | 6 ++++++ 2 files changed, 23 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 1e0319144..adb2d7004 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -91,7 +91,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do {:ok, token} = OAuth.Token.exchange_token(app, auth) - %{user: user, token: token} + %{app: app, user: user, token: token} end test "accepts valid tokens", state do @@ -126,5 +126,21 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do Process.sleep(30) end) end + + test "disconnect when token is revoked", %{app: app, user: user, token: token} do + assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") + assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") + + {:ok, auth} = OAuth.Authorization.create_authorization(app, user) + + {:ok, token2} = OAuth.Token.exchange_token(app, auth) + assert {:ok, _} = start_socket("?stream=user&access_token=#{token2.token}") + + OAuth.Token.Strategy.Revoke.revoke(token) + + assert_receive {:close, _} + assert_receive {:close, _} + refute_receive {:close, _} + end end end diff --git a/test/support/websocket_client.ex b/test/support/websocket_client.ex index 2660f6151..abe7d5eda 100644 --- a/test/support/websocket_client.ex +++ b/test/support/websocket_client.ex @@ -41,6 +41,12 @@ defmodule Pleroma.Integration.WebsocketClient do {:ok, state} end + @impl true + def handle_disconnect(conn_status, state) do + send(state.sender, {:close, conn_status}) + {:ok, state} + end + @doc false @impl true def handle_info({:text, msg}, state) do -- cgit v1.2.3 From f459c1260b43396fb7173e97e29ccef441a615ec Mon Sep 17 00:00:00 2001 From: Tusooa Zhu Date: Fri, 19 Aug 2022 14:10:07 -0400 Subject: Lint --- test/pleroma/integration/mastodon_websocket_test.exs | 8 ++++++-- test/support/websocket_client.ex | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) (limited to 'test') diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index adb2d7004..d44033842 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -41,7 +41,9 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do test "requires authentication and a valid token for protected streams" do capture_log(fn -> - assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") + assert {:error, %WebSockex.RequestError{code: 401}} = + start_socket("?stream=user&access_token=aaaaaaaaaaaa") + assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user") Process.sleep(30) end) @@ -111,7 +113,9 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") capture_log(fn -> - assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user:notification") + assert {:error, %WebSockex.RequestError{code: 401}} = + start_socket("?stream=user:notification") + Process.sleep(30) end) end diff --git a/test/support/websocket_client.ex b/test/support/websocket_client.ex index abe7d5eda..70d331999 100644 --- a/test/support/websocket_client.ex +++ b/test/support/websocket_client.ex @@ -15,7 +15,7 @@ defmodule Pleroma.Integration.WebsocketClient do WebSockex.start_link( url, __MODULE__, - %{ sender: sender }, + %{sender: sender}, extra_headers: headers ) end -- cgit v1.2.3 From 5a2c8ef4ccfbcc996fb812779730c78e2a3fbdcd Mon Sep 17 00:00:00 2001 From: Tusooa Zhu Date: Fri, 19 Aug 2022 19:58:16 -0400 Subject: Refactor streamer test --- test/pleroma/web/streamer_test.exs | 81 ++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 17 deletions(-) (limited to 'test') diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs index 5426467e5..7c4b9e288 100644 --- a/test/pleroma/web/streamer_test.exs +++ b/test/pleroma/web/streamer_test.exs @@ -815,7 +815,47 @@ defmodule Pleroma.Web.StreamerTest do end describe "stop streaming if token got revoked" do - test "do not revoke other tokens" do + setup do + child_proc = fn start, finalize -> + fn -> + start.() + + receive do + {StreamerTest, :ready} -> + assert_receive {:render_with_user, _, "update.json", _} + + receive do + {StreamerTest, :revoked} -> finalize.() + end + end + end + end + + starter = fn user, token -> + fn -> Streamer.get_topic_and_add_socket("user", user, token) end + end + + hit = fn -> assert_receive :close end + miss = fn -> refute_receive :close end + + send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end + + %{ + child_proc: child_proc, + starter: starter, + hit: hit, + miss: miss, + send_all: send_all + } + end + + test "do not revoke other tokens", %{ + child_proc: child_proc, + starter: starter, + hit: hit, + miss: miss, + send_all: send_all + } do %{user: user, token: token} = oauth_access(["read"]) %{token: token2} = oauth_access(["read"], user: user) %{user: user2, token: user2_token} = oauth_access(["read"]) @@ -824,47 +864,54 @@ defmodule Pleroma.Web.StreamerTest do CommonAPI.follow(user, post_user) CommonAPI.follow(user2, post_user) - Streamer.get_topic_and_add_socket("user", user, token) - Streamer.get_topic_and_add_socket("user", user, token2) - Streamer.get_topic_and_add_socket("user", user2, user2_token) + tasks = [ + Task.async(child_proc.(starter.(user, token), hit)), + Task.async(child_proc.(starter.(user, token2), miss)), + Task.async(child_proc.(starter.(user2, user2_token), miss)) + ] {:ok, _} = CommonAPI.post(post_user, %{ status: "hi" }) - assert_receive {:render_with_user, _, "update.json", _} - assert_receive {:render_with_user, _, "update.json", _} - assert_receive {:render_with_user, _, "update.json", _} + send_all.(tasks, {StreamerTest, :ready}) Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) - assert_receive :close - refute_receive :close + send_all.(tasks, {StreamerTest, :revoked}) + + Enum.each(tasks, &Task.await/1) end - test "revoke all streams for this token" do + test "revoke all streams for this token", %{ + child_proc: child_proc, + starter: starter, + hit: hit, + send_all: send_all + } do %{user: user, token: token} = oauth_access(["read"]) post_user = insert(:user) CommonAPI.follow(user, post_user) - Streamer.get_topic_and_add_socket("user", user, token) - Streamer.get_topic_and_add_socket("user", user, token) + tasks = [ + Task.async(child_proc.(starter.(user, token), hit)), + Task.async(child_proc.(starter.(user, token), hit)) + ] {:ok, _} = CommonAPI.post(post_user, %{ status: "hi" }) - assert_receive {:render_with_user, _, "update.json", _} - assert_receive {:render_with_user, _, "update.json", _} + send_all.(tasks, {StreamerTest, :ready}) Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) - assert_receive :close - assert_receive :close - refute_receive :close + send_all.(tasks, {StreamerTest, :revoked}) + + Enum.each(tasks, &Task.await/1) end end end -- cgit v1.2.3