diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/application.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/bbs/authenticator.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/mfa.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/plugs/authentication_plug.ex | 7 | ||||
-rw-r--r-- | lib/pleroma/user.ex | 22 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/builder.ex | 10 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/object_validators/delete_validator.ex | 1 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/transmogrifier.ex | 21 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/utils.ex | 8 | ||||
-rw-r--r-- | lib/pleroma/web/auth/totp_authenticator.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/web/common_api/common_api.ex | 29 | ||||
-rw-r--r-- | lib/pleroma/web/mastodon_api/views/account_view.ex | 5 | ||||
-rw-r--r-- | lib/pleroma/web/mastodon_api/websocket_handler.ex | 95 | ||||
-rw-r--r-- | lib/pleroma/web/mongooseim/mongoose_im_controller.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/web/streamer/streamer.ex | 73 |
16 files changed, 189 insertions, 100 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index a00bc0624..9d3d92b38 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -56,7 +56,7 @@ defmodule Pleroma.Application do if (major == 22 and minor < 2) or major < 22 do raise " !!!OTP VERSION WARNING!!! - You are using gun adapter with OTP version #{version}, which doesn't support correct handling of unordered certificates chains. + You are using gun adapter with OTP version #{version}, which doesn't support correct handling of unordered certificates chains. Please update your Erlang/OTP to at least 22.2. " end else diff --git a/lib/pleroma/bbs/authenticator.ex b/lib/pleroma/bbs/authenticator.ex index e5b37f33e..d4494b003 100644 --- a/lib/pleroma/bbs/authenticator.ex +++ b/lib/pleroma/bbs/authenticator.ex @@ -4,7 +4,6 @@ defmodule Pleroma.BBS.Authenticator do use Sshd.PasswordAuthenticator - alias Comeonin.Pbkdf2 alias Pleroma.User def authenticate(username, password) do @@ -12,7 +11,7 @@ defmodule Pleroma.BBS.Authenticator do password = to_string(password) with %User{} = user <- User.get_by_nickname(username) do - Pbkdf2.checkpw(password, user.password_hash) + Pbkdf2.verify_pass(password, user.password_hash) else _e -> false end diff --git a/lib/pleroma/mfa.ex b/lib/pleroma/mfa.ex index d353a4dad..2b77f5426 100644 --- a/lib/pleroma/mfa.ex +++ b/lib/pleroma/mfa.ex @@ -7,7 +7,6 @@ defmodule Pleroma.MFA do The MFA context. """ - alias Comeonin.Pbkdf2 alias Pleroma.User alias Pleroma.MFA.BackupCodes @@ -72,7 +71,7 @@ defmodule Pleroma.MFA do @spec generate_backup_codes(User.t()) :: {:ok, list(binary)} | {:error, String.t()} def generate_backup_codes(%User{} = user) do with codes <- BackupCodes.generate(), - hashed_codes <- Enum.map(codes, &Pbkdf2.hashpwsalt/1), + hashed_codes <- Enum.map(codes, &Pbkdf2.hash_pwd_salt/1), changeset <- Changeset.cast_backup_codes(user, hashed_codes), {:ok, _} <- User.update_and_set_cache(changeset) do {:ok, codes} diff --git a/lib/pleroma/plugs/authentication_plug.ex b/lib/pleroma/plugs/authentication_plug.ex index 0061c69dc..ae4a235bd 100644 --- a/lib/pleroma/plugs/authentication_plug.ex +++ b/lib/pleroma/plugs/authentication_plug.ex @@ -3,7 +3,6 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Plugs.AuthenticationPlug do - alias Comeonin.Pbkdf2 alias Pleroma.Plugs.OAuthScopesPlug alias Pleroma.User @@ -18,7 +17,7 @@ defmodule Pleroma.Plugs.AuthenticationPlug do end def checkpw(password, "$pbkdf2" <> _ = password_hash) do - Pbkdf2.checkpw(password, password_hash) + Pbkdf2.verify_pass(password, password_hash) end def checkpw(_password, _password_hash) do @@ -37,7 +36,7 @@ defmodule Pleroma.Plugs.AuthenticationPlug do } = conn, _ ) do - if Pbkdf2.checkpw(password, password_hash) do + if Pbkdf2.verify_pass(password, password_hash) do conn |> assign(:user, auth_user) |> OAuthScopesPlug.skip_plug() @@ -47,7 +46,7 @@ defmodule Pleroma.Plugs.AuthenticationPlug do end def call(%{assigns: %{auth_credentials: %{password: _}}} = conn, _) do - Pbkdf2.dummy_checkpw() + Pbkdf2.no_user_verify() conn end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 2a6a23fec..cba391072 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -9,7 +9,6 @@ defmodule Pleroma.User do import Ecto.Query import Ecto, only: [assoc: 2] - alias Comeonin.Pbkdf2 alias Ecto.Multi alias Pleroma.Activity alias Pleroma.Config @@ -1554,10 +1553,23 @@ defmodule Pleroma.User do |> Stream.run() end - defp delete_activity(%{data: %{"type" => "Create", "object" => object}}, user) do - {:ok, delete_data, _} = Builder.delete(user, object) + defp delete_activity(%{data: %{"type" => "Create", "object" => object}} = activity, user) do + with {_, %Object{}} <- {:find_object, Object.get_by_ap_id(object)}, + {:ok, delete_data, _} <- Builder.delete(user, object) do + Pipeline.common_pipeline(delete_data, local: user.local) + else + {:find_object, nil} -> + # We have the create activity, but not the object, it was probably pruned. + # Insert a tombstone and try again + with {:ok, tombstone_data, _} <- Builder.tombstone(user.ap_id, object), + {:ok, _tombstone} <- Object.create(tombstone_data) do + delete_activity(activity, user) + end - Pipeline.common_pipeline(delete_data, local: user.local) + e -> + Logger.error("Could not delete #{object} created by #{activity.data["ap_id"]}") + Logger.error("Error: #{inspect(e)}") + end end defp delete_activity(%{data: %{"type" => type}} = activity, user) @@ -1913,7 +1925,7 @@ defmodule Pleroma.User do defp put_password_hash( %Ecto.Changeset{valid?: true, changes: %{password: password}} = changeset ) do - change(changeset, password_hash: Pbkdf2.hashpwsalt(password)) + change(changeset, password_hash: Pbkdf2.hash_pwd_salt(password)) end defp put_password_hash(changeset), do: changeset diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 4955243ab..d752f4f04 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -439,7 +439,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp do_block(blocker, blocked, activity_id, local) do - outgoing_blocks = Config.get([:activitypub, :outgoing_blocks]) unfollow_blocked = Config.get([:activitypub, :unfollow_blocked]) if unfollow_blocked do @@ -447,8 +446,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do if follow_activity, do: unfollow(blocker, blocked, nil, local) end - with true <- outgoing_blocks, - block_data <- make_block_data(blocker, blocked, activity_id), + with block_data <- make_block_data(blocker, blocked, activity_id), {:ok, activity} <- insert(block_data, local), _ <- notify_and_stream(activity), :ok <- maybe_federate(activity) do diff --git a/lib/pleroma/web/activity_pub/builder.ex b/lib/pleroma/web/activity_pub/builder.ex index 922a444a9..4a247ad0c 100644 --- a/lib/pleroma/web/activity_pub/builder.ex +++ b/lib/pleroma/web/activity_pub/builder.ex @@ -62,6 +62,16 @@ defmodule Pleroma.Web.ActivityPub.Builder do }, []} end + @spec tombstone(String.t(), String.t()) :: {:ok, map(), keyword()} + def tombstone(actor, id) do + {:ok, + %{ + "id" => id, + "actor" => actor, + "type" => "Tombstone" + }, []} + end + @spec like(User.t(), Object.t()) :: {:ok, map(), keyword()} def like(actor, object) do with {:ok, data, meta} <- object_action(actor, object) do diff --git a/lib/pleroma/web/activity_pub/object_validators/delete_validator.ex b/lib/pleroma/web/activity_pub/object_validators/delete_validator.ex index e06de3dff..f42c03510 100644 --- a/lib/pleroma/web/activity_pub/object_validators/delete_validator.ex +++ b/lib/pleroma/web/activity_pub/object_validators/delete_validator.ex @@ -51,6 +51,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.DeleteValidator do Page Question Video + Tombstone } def validate_data(cng) do cng diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index be7b57f13..80701bb63 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -14,7 +14,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Builder alias Pleroma.Web.ActivityPub.ObjectValidator + alias Pleroma.Web.ActivityPub.ObjectValidators.Types alias Pleroma.Web.ActivityPub.Pipeline alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility @@ -590,6 +592,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"), %User{local: true} = follower <- User.get_cached_by_ap_id(follow_activity.data["actor"]), {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do + User.update_follower_count(followed) + User.update_following_count(follower) + ActivityPub.accept(%{ to: follow_activity.data["to"], type: "Accept", @@ -599,7 +604,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do activity_id: id }) else - _e -> :error + _e -> + :error end end @@ -720,6 +726,19 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do ) do with {:ok, activity, _} <- Pipeline.common_pipeline(data, local: false) do {:ok, activity} + else + {:error, {:validate_object, _}} = e -> + # Check if we have a create activity for this + with {:ok, object_id} <- Types.ObjectID.cast(data["object"]), + %Activity{data: %{"actor" => actor}} <- + Activity.create_by_object_ap_id(object_id) |> Repo.one(), + # We have one, insert a tombstone and retry + {:ok, tombstone_data, _} <- Builder.tombstone(actor, object_id), + {:ok, _tombstone} <- Object.create(tombstone_data) do + handle_incoming(data) + else + _ -> e + end end end diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 09b80fa57..f2375bcc4 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do alias Ecto.Changeset alias Ecto.UUID alias Pleroma.Activity + alias Pleroma.Config alias Pleroma.Notification alias Pleroma.Object alias Pleroma.Repo @@ -169,8 +170,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do Enqueues an activity for federation if it's local """ @spec maybe_federate(any()) :: :ok - def maybe_federate(%Activity{local: true} = activity) do - if Pleroma.Config.get!([:instance, :federating]) do + def maybe_federate(%Activity{local: true, data: %{"type" => type}} = activity) do + outgoing_blocks = Config.get([:activitypub, :outgoing_blocks]) + + with true <- Config.get!([:instance, :federating]), + true <- type != "Block" || outgoing_blocks do Pleroma.Web.Federator.publish(activity) end diff --git a/lib/pleroma/web/auth/totp_authenticator.ex b/lib/pleroma/web/auth/totp_authenticator.ex index 98aca9a51..04e489c83 100644 --- a/lib/pleroma/web/auth/totp_authenticator.ex +++ b/lib/pleroma/web/auth/totp_authenticator.ex @@ -3,7 +3,6 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Auth.TOTPAuthenticator do - alias Comeonin.Pbkdf2 alias Pleroma.MFA alias Pleroma.MFA.TOTP alias Pleroma.User @@ -31,7 +30,7 @@ defmodule Pleroma.Web.Auth.TOTPAuthenticator do code ) when is_list(codes) and is_binary(code) do - hash_code = Enum.find(codes, fn hash -> Pbkdf2.checkpw(code, hash) end) + hash_code = Enum.find(codes, fn hash -> Pbkdf2.verify_pass(code, hash) end) if hash_code do MFA.invalidate_backup_code(user, hash_code) diff --git a/lib/pleroma/web/common_api/common_api.ex b/lib/pleroma/web/common_api/common_api.ex index c538a634f..fbef05e83 100644 --- a/lib/pleroma/web/common_api/common_api.ex +++ b/lib/pleroma/web/common_api/common_api.ex @@ -83,16 +83,35 @@ defmodule Pleroma.Web.CommonAPI do end def delete(activity_id, user) do - with {_, %Activity{data: %{"object" => _}} = activity} <- - {:find_activity, Activity.get_by_id_with_object(activity_id)}, - %Object{} = object <- Object.normalize(activity), + with {_, %Activity{data: %{"object" => _, "type" => "Create"}} = activity} <- + {:find_activity, Activity.get_by_id(activity_id)}, + {_, %Object{} = object, _} <- + {:find_object, Object.normalize(activity, false), activity}, true <- User.superuser?(user) || user.ap_id == object.data["actor"], {:ok, delete_data, _} <- Builder.delete(user, object.data["id"]), {:ok, delete, _} <- Pipeline.common_pipeline(delete_data, local: true) do {:ok, delete} else - {:find_activity, _} -> {:error, :not_found} - _ -> {:error, dgettext("errors", "Could not delete")} + {:find_activity, _} -> + {:error, :not_found} + + {:find_object, nil, %Activity{data: %{"actor" => actor, "object" => object}}} -> + # We have the create activity, but not the object, it was probably pruned. + # Insert a tombstone and try again + with {:ok, tombstone_data, _} <- Builder.tombstone(actor, object), + {:ok, _tombstone} <- Object.create(tombstone_data) do + delete(activity_id, user) + else + _ -> + Logger.error( + "Could not insert tombstone for missing object on deletion. Object is #{object}." + ) + + {:error, dgettext("errors", "Could not delete")} + end + + _ -> + {:error, dgettext("errors", "Could not delete")} end end diff --git a/lib/pleroma/web/mastodon_api/views/account_view.ex b/lib/pleroma/web/mastodon_api/views/account_view.ex index 420bd586f..b7cdb52b1 100644 --- a/lib/pleroma/web/mastodon_api/views/account_view.ex +++ b/lib/pleroma/web/mastodon_api/views/account_view.ex @@ -261,7 +261,10 @@ defmodule Pleroma.Web.MastodonAPI.AccountView do defp prepare_user_bio(%User{bio: ""}), do: "" defp prepare_user_bio(%User{bio: bio}) when is_binary(bio) do - bio |> String.replace(~r(<br */?>), "\n") |> Pleroma.HTML.strip_tags() + bio + |> String.replace(~r(<br */?>), "\n") + |> Pleroma.HTML.strip_tags() + |> HtmlEntities.decode() end defp prepare_user_bio(_), do: "" diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index e2ffd02d0..94e4595d8 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -12,31 +12,19 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do @behaviour :cowboy_websocket + # Client ping period. + @tick :timer.seconds(30) # Cowboy timeout period. - @timeout :timer.seconds(30) + @timeout :timer.seconds(60) # Hibernate every X messages @hibernate_every 100 - @streams [ - "public", - "public:local", - "public:media", - "public:local:media", - "user", - "user:notification", - "direct", - "list", - "hashtag" - ] - @anonymous_streams ["public", "public:local", "hashtag"] - def init(%{qs: qs} = req, state) do - with params <- :cow_qs.parse_qs(qs), + with params <- Enum.into(:cow_qs.parse_qs(qs), %{}), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), - access_token <- List.keyfind(params, "access_token", 0), - {_, stream} <- List.keyfind(params, "stream", 0), - {:ok, user} <- allow_request(stream, [access_token, sec_websocket]), - topic when is_binary(topic) <- expand_topic(stream, params) do + access_token <- Map.get(params, "access_token"), + {:ok, user} <- authenticate_request(access_token, sec_websocket), + {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do req = if sec_websocket do :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) @@ -44,16 +32,17 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do req end - {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}} + {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil}, + %{idle_timeout: @timeout}} else - {:error, code} -> - Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") - {:ok, req} = :cowboy_req.reply(code, req) + {:error, :bad_topic} -> + Logger.debug("#{__MODULE__} bad topic #{inspect(req)}") + {:ok, req} = :cowboy_req.reply(404, req) {:ok, req, state} - error -> - Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}") - {:ok, req} = :cowboy_req.reply(400, req) + {:error, :unauthorized} -> + Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}") + {:ok, req} = :cowboy_req.reply(401, req) {:ok, req, state} end end @@ -66,11 +55,18 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do ) Streamer.add_socket(state.topic, state.user) - {:ok, state} + {:ok, %{state | timer: timer()}} + end + + # Client's Pong frame. + def websocket_handle(:pong, state) do + if state.timer, do: Process.cancel_timer(state.timer) + {:ok, %{state | timer: timer()}} end # We never receive messages. - def websocket_handle(_frame, state) do + def websocket_handle(frame, state) do + Logger.error("#{__MODULE__} received frame: #{inspect(frame)}") {:ok, state} end @@ -94,6 +90,14 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end end + # Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received. + # As we hibernate there, reset the count to 0. + # If the client misses :pong, Cowboy will automatically timeout the connection after + # `@idle_timeout`. + def websocket_info(:tick, state) do + {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate} + end + def terminate(reason, _req, state) do Logger.debug( "#{__MODULE__} terminating websocket connection for user #{ @@ -106,47 +110,24 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end # Public streams without authentication. - defp allow_request(stream, [nil, nil]) when stream in @anonymous_streams do + defp authenticate_request(nil, nil) do {:ok, nil} end # Authenticated streams. - defp allow_request(stream, [access_token, sec_websocket]) when stream in @streams do - token = - with {"access_token", token} <- access_token do - token - else - _ -> sec_websocket - end + defp authenticate_request(access_token, sec_websocket) do + token = access_token || sec_websocket with true <- is_bitstring(token), %Token{user_id: user_id} <- Repo.get_by(Token, token: token), user = %User{} <- User.get_cached_by_id(user_id) do {:ok, user} else - _ -> {:error, 403} + _ -> {:error, :unauthorized} end end - # Not authenticated. - defp allow_request(stream, _) when stream in @streams, do: {:error, 403} - - # No matching stream. - defp allow_request(_, _), do: {:error, 404} - - defp expand_topic("hashtag", params) do - case List.keyfind(params, "tag", 0) do - {_, tag} -> "hashtag:#{tag}" - _ -> nil - end - end - - defp expand_topic("list", params) do - case List.keyfind(params, "list", 0) do - {_, list} -> "list:#{list}" - _ -> nil - end + defp timer do + Process.send_after(self(), :tick, @tick) end - - defp expand_topic(topic, _), do: topic end diff --git a/lib/pleroma/web/mongooseim/mongoose_im_controller.ex b/lib/pleroma/web/mongooseim/mongoose_im_controller.ex index 1ed6ee521..0814b3bc3 100644 --- a/lib/pleroma/web/mongooseim/mongoose_im_controller.ex +++ b/lib/pleroma/web/mongooseim/mongoose_im_controller.ex @@ -5,7 +5,6 @@ defmodule Pleroma.Web.MongooseIM.MongooseIMController do use Pleroma.Web, :controller - alias Comeonin.Pbkdf2 alias Pleroma.Plugs.RateLimiter alias Pleroma.Repo alias Pleroma.User @@ -28,7 +27,7 @@ defmodule Pleroma.Web.MongooseIM.MongooseIMController do def check_password(conn, %{"user" => username, "pass" => password}) do with %User{password_hash: password_hash, deactivated: false} <- Repo.get_by(User, nickname: username, local: true), - true <- Pbkdf2.checkpw(password, password_hash) do + true <- Pbkdf2.verify_pass(password, password_hash) do conn |> json(true) else diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 5ad4aa936..49a400df7 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -21,12 +21,68 @@ defmodule Pleroma.Web.Streamer do def registry, do: @registry - def add_socket(topic, %User{} = user) do - if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true) + @public_streams ["public", "public:local", "public:media", "public:local:media"] + @user_streams ["user", "user:notification", "direct"] + + @doc "Expands and authorizes a stream, and registers the process for streaming." + @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} + def get_topic_and_add_socket(stream, user, params \\ %{}) do + case get_topic(stream, user, params) do + {:ok, topic} -> add_socket(topic, user) + error -> error + end + end + + @doc "Expand and authorizes a stream" + @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} + def get_topic(stream, user, params \\ %{}) + + # Allow all public steams. + def get_topic(stream, _, _) when stream in @public_streams do + {:ok, stream} end - def add_socket(topic, _) do - if should_env_send?(), do: Registry.register(@registry, topic, false) + # Allow all hashtags streams. + def get_topic("hashtag", _, %{"tag" => tag}) do + {:ok, "hashtag:" <> tag} + end + + # Expand user streams. + def get_topic(stream, %User{} = user, _) when stream in @user_streams do + {:ok, stream <> ":" <> to_string(user.id)} + end + + def get_topic(stream, _, _) when stream in @user_streams do + {:error, :unauthorized} + end + + # List streams. + def get_topic("list", %User{} = user, %{"list" => id}) do + if Pleroma.List.get(id, user) do + {:ok, "list:" <> to_string(id)} + else + {:error, :bad_topic} + end + end + + def get_topic("list", _, _) do + {:error, :unauthorized} + end + + def get_topic(_, _, _) do + {:error, :bad_topic} + end + + @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic." + def add_socket(topic, user) do + if should_env_send?() do + auth? = if user, do: true + Registry.register(@registry, topic, auth?) + end + + {:ok, topic} end def remove_socket(topic) do @@ -231,13 +287,4 @@ defmodule Pleroma.Web.Streamer do true -> def should_env_send?, do: true end - - defp user_topic(topic, user) - when topic in ~w[user user:notification direct] do - "#{topic}:#{user.id}" - end - - defp user_topic(topic, _) do - topic - end end |