summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/constants.ex4
-rw-r--r--lib/pleroma/web/api_spec.ex10
-rw-r--r--lib/pleroma/web/api_spec/operations/streaming_operation.ex464
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex131
-rw-r--r--lib/pleroma/web/streamer.ex34
-rw-r--r--lib/pleroma/web/views/streamer_view.ex61
6 files changed, 676 insertions, 28 deletions
diff --git a/lib/pleroma/constants.ex b/lib/pleroma/constants.ex
index ed60bcc37..77bc4bfac 100644
--- a/lib/pleroma/constants.ex
+++ b/lib/pleroma/constants.ex
@@ -94,4 +94,8 @@ defmodule Pleroma.Constants do
"application/activity+json"
]
)
+
+ const(public_streams,
+ do: ["public", "public:local", "public:media", "public:local:media"]
+ )
end
diff --git a/lib/pleroma/web/api_spec.ex b/lib/pleroma/web/api_spec.ex
index 2d56dc643..163226ce5 100644
--- a/lib/pleroma/web/api_spec.ex
+++ b/lib/pleroma/web/api_spec.ex
@@ -10,6 +10,14 @@ defmodule Pleroma.Web.ApiSpec do
@behaviour OpenApi
+ defp streaming_paths do
+ %{
+ "/api/v1/streaming" => %OpenApiSpex.PathItem{
+ get: Pleroma.Web.ApiSpec.StreamingOperation.streaming_operation()
+ }
+ }
+ end
+
@impl OpenApi
def spec(opts \\ []) do
%OpenApi{
@@ -45,7 +53,7 @@ defmodule Pleroma.Web.ApiSpec do
}
},
# populate the paths from a phoenix router
- paths: OpenApiSpex.Paths.from_router(Router),
+ paths: Map.merge(streaming_paths(), OpenApiSpex.Paths.from_router(Router)),
components: %OpenApiSpex.Components{
parameters: %{
"accountIdOrNickname" =>
diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex
new file mode 100644
index 000000000..b580bc2f0
--- /dev/null
+++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex
@@ -0,0 +1,464 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.ApiSpec.StreamingOperation do
+ alias OpenApiSpex.Operation
+ alias OpenApiSpex.Response
+ alias OpenApiSpex.Schema
+ alias Pleroma.Web.ApiSpec.NotificationOperation
+ alias Pleroma.Web.ApiSpec.Schemas.Chat
+ alias Pleroma.Web.ApiSpec.Schemas.Conversation
+ alias Pleroma.Web.ApiSpec.Schemas.FlakeID
+ alias Pleroma.Web.ApiSpec.Schemas.Status
+
+ require Pleroma.Constants
+
+ @spec open_api_operation(atom) :: Operation.t()
+ def open_api_operation(action) do
+ operation = String.to_existing_atom("#{action}_operation")
+ apply(__MODULE__, operation, [])
+ end
+
+ @spec streaming_operation() :: Operation.t()
+ def streaming_operation do
+ %Operation{
+ tags: ["Timelines"],
+ summary: "Establish streaming connection",
+ description: """
+ Receive statuses in real-time via WebSocket.
+
+ You can specify the access token on the query string or through the `sec-websocket-protocol` header. Using
+ the query string to authenticate is considered unsafe and should not be used unless you have to (e.g. to maintain
+ your client's compatibility with Mastodon).
+
+ You may specify a stream on the query string. If you do so and you are connecting to a stream that requires logged-in users,
+ you must specify the access token at the time of the connection (i.e. via query string or header).
+
+ Otherwise, you have the option to authenticate after you have established the connection through client-sent events.
+
+ The "Request body" section below describes what events clients can send through WebSocket, and the "Responses" section
+ describes what events server will send through WebSocket.
+ """,
+ security: [%{"oAuth" => ["read:statuses", "read:notifications"]}],
+ operationId: "WebsocketHandler.streaming",
+ parameters:
+ [
+ Operation.parameter(:connection, :header, %Schema{type: :string}, "connection header",
+ required: true
+ ),
+ Operation.parameter(:upgrade, :header, %Schema{type: :string}, "upgrade header",
+ required: true
+ ),
+ Operation.parameter(
+ :"sec-websocket-key",
+ :header,
+ %Schema{type: :string},
+ "sec-websocket-key header",
+ required: true
+ ),
+ Operation.parameter(
+ :"sec-websocket-version",
+ :header,
+ %Schema{type: :string},
+ "sec-websocket-version header",
+ required: true
+ )
+ ] ++ stream_params() ++ access_token_params(),
+ requestBody: request_body("Client-sent events", client_sent_events()),
+ responses: %{
+ 101 => switching_protocols_response(),
+ 200 =>
+ Operation.response(
+ "Server-sent events",
+ "application/json",
+ server_sent_events()
+ )
+ }
+ }
+ end
+
+ defp stream_params do
+ stream_specifier()
+ |> Enum.map(fn {name, schema} ->
+ Operation.parameter(name, :query, schema, get_schema(schema).description)
+ end)
+ end
+
+ defp access_token_params do
+ [
+ Operation.parameter(:access_token, :query, token(), token().description),
+ Operation.parameter(:"sec-websocket-protocol", :header, token(), token().description)
+ ]
+ end
+
+ defp switching_protocols_response do
+ %Response{
+ description: "Switching protocols",
+ headers: %{
+ "connection" => %OpenApiSpex.Header{required: true},
+ "upgrade" => %OpenApiSpex.Header{required: true},
+ "sec-websocket-accept" => %OpenApiSpex.Header{required: true}
+ }
+ }
+ end
+
+ defp server_sent_events do
+ %Schema{
+ oneOf: [
+ update_event(),
+ status_update_event(),
+ notification_event(),
+ chat_update_event(),
+ follow_relationships_update_event(),
+ conversation_event(),
+ delete_event(),
+ pleroma_respond_event()
+ ]
+ }
+ end
+
+ defp stream do
+ %Schema{
+ type: :array,
+ title: "Stream",
+ description: """
+ The stream identifier.
+ The first item is the name of the stream. If the stream needs a differentiator, the second item will be the corresponding identifier.
+ Currently, for the following stream types, there is a second element in the array:
+
+ - `list`: The second element is the id of the list, as a string.
+ - `hashtag`: The second element is the name of the hashtag.
+ - `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance.
+ """,
+ maxItems: 2,
+ minItems: 1,
+ items: %Schema{type: :string},
+ example: ["hashtag", "mew"]
+ }
+ end
+
+ defp get_schema(%Schema{} = schema), do: schema
+ defp get_schema(schema), do: schema.schema
+
+ defp server_sent_event_helper(name, description, type, payload, opts \\ []) do
+ payload_type = Keyword.get(opts, :payload_type, :json)
+ has_stream = Keyword.get(opts, :has_stream, true)
+
+ stream_properties =
+ if has_stream do
+ %{stream: stream()}
+ else
+ %{}
+ end
+
+ stream_example = if has_stream, do: %{"stream" => get_schema(stream()).example}, else: %{}
+
+ stream_required = if has_stream, do: [:stream], else: []
+
+ payload_schema =
+ if payload_type == :json do
+ %Schema{
+ title: "Event payload",
+ description: "JSON-encoded string of #{get_schema(payload).title}",
+ allOf: [payload]
+ }
+ else
+ payload
+ end
+
+ payload_example =
+ if payload_type == :json do
+ get_schema(payload).example |> Jason.encode!()
+ else
+ get_schema(payload).example
+ end
+
+ %Schema{
+ type: :object,
+ title: name,
+ description: description,
+ required: [:event, :payload] ++ stream_required,
+ properties:
+ %{
+ event: %Schema{
+ title: "Event type",
+ description: "Type of the event.",
+ type: :string,
+ required: true,
+ enum: [type]
+ },
+ payload: payload_schema
+ }
+ |> Map.merge(stream_properties),
+ example:
+ %{
+ "event" => type,
+ "payload" => payload_example
+ }
+ |> Map.merge(stream_example)
+ }
+ end
+
+ defp update_event do
+ server_sent_event_helper("New status", "A newly-posted status.", "update", Status)
+ end
+
+ defp status_update_event do
+ server_sent_event_helper("Edit", "A status that was just edited", "status.update", Status)
+ end
+
+ defp notification_event do
+ server_sent_event_helper(
+ "Notification",
+ "A new notification.",
+ "notification",
+ NotificationOperation.notification()
+ )
+ end
+
+ defp follow_relationships_update_event do
+ server_sent_event_helper(
+ "Follow relationships update",
+ "An update to follow relationships.",
+ "pleroma:follow_relationships_update",
+ %Schema{
+ type: :object,
+ title: "Follow relationships update",
+ required: [:state, :follower, :following],
+ properties: %{
+ state: %Schema{
+ type: :string,
+ description: "Follow state of the relationship.",
+ enum: ["follow_pending", "follow_accept", "follow_reject", "unfollow"]
+ },
+ follower: %Schema{
+ type: :object,
+ description: "Information about the follower.",
+ required: [:id, :follower_count, :following_count],
+ properties: %{
+ id: FlakeID,
+ follower_count: %Schema{type: :integer},
+ following_count: %Schema{type: :integer}
+ }
+ },
+ following: %Schema{
+ type: :object,
+ description: "Information about the following person.",
+ required: [:id, :follower_count, :following_count],
+ properties: %{
+ id: FlakeID,
+ follower_count: %Schema{type: :integer},
+ following_count: %Schema{type: :integer}
+ }
+ }
+ },
+ example: %{
+ "state" => "follow_pending",
+ "follower" => %{
+ "id" => "someUser1",
+ "follower_count" => 1,
+ "following_count" => 1
+ },
+ "following" => %{
+ "id" => "someUser2",
+ "follower_count" => 1,
+ "following_count" => 1
+ }
+ }
+ }
+ )
+ end
+
+ defp chat_update_event do
+ server_sent_event_helper(
+ "Chat update",
+ "A new chat message.",
+ "pleroma:chat_update",
+ Chat
+ )
+ end
+
+ defp conversation_event do
+ server_sent_event_helper(
+ "Conversation update",
+ "An update about a conversation",
+ "conversation",
+ Conversation
+ )
+ end
+
+ defp delete_event do
+ server_sent_event_helper(
+ "Delete",
+ "A status that was just deleted.",
+ "delete",
+ %Schema{
+ type: :string,
+ title: "Status id",
+ description: "Id of the deleted status",
+ allOf: [FlakeID],
+ example: "some-opaque-id"
+ },
+ payload_type: :string,
+ has_stream: false
+ )
+ end
+
+ defp pleroma_respond_event do
+ server_sent_event_helper(
+ "Server response",
+ "A response to a client-sent event.",
+ "pleroma:respond",
+ %Schema{
+ type: :object,
+ title: "Results",
+ required: [:result, :type],
+ properties: %{
+ result: %Schema{
+ type: :string,
+ title: "Result of the request",
+ enum: ["success", "error", "ignored"]
+ },
+ error: %Schema{
+ type: :string,
+ title: "Error code",
+ description: "An error identifier. Only appears if `result` is `error`."
+ },
+ type: %Schema{
+ type: :string,
+ description: "Type of the request."
+ }
+ },
+ example: %{"result" => "success", "type" => "pleroma:authenticate"}
+ },
+ has_stream: false
+ )
+ end
+
+ defp client_sent_events do
+ %Schema{
+ oneOf: [
+ subscribe_event(),
+ unsubscribe_event(),
+ authenticate_event()
+ ]
+ }
+ end
+
+ defp request_body(description, schema, opts \\ []) do
+ %OpenApiSpex.RequestBody{
+ description: description,
+ content: %{
+ "application/json" => %OpenApiSpex.MediaType{
+ schema: schema,
+ example: opts[:example],
+ examples: opts[:examples]
+ }
+ }
+ }
+ end
+
+ defp client_sent_event_helper(name, description, type, properties, opts) do
+ required = opts[:required] || []
+
+ %Schema{
+ type: :object,
+ title: name,
+ required: [:type] ++ required,
+ description: description,
+ properties:
+ %{
+ type: %Schema{type: :string, enum: [type], description: "Type of the event."}
+ }
+ |> Map.merge(properties),
+ example: opts[:example]
+ }
+ end
+
+ defp subscribe_event do
+ client_sent_event_helper(
+ "Subscribe",
+ "Subscribe to a stream.",
+ "subscribe",
+ stream_specifier(),
+ required: [:stream],
+ example: %{"type" => "subscribe", "stream" => "list", "list" => "1"}
+ )
+ end
+
+ defp unsubscribe_event do
+ client_sent_event_helper(
+ "Unsubscribe",
+ "Unsubscribe from a stream.",
+ "unsubscribe",
+ stream_specifier(),
+ required: [:stream],
+ example: %{
+ "type" => "unsubscribe",
+ "stream" => "public:remote:media",
+ "instance" => "example.org"
+ }
+ )
+ end
+
+ defp authenticate_event do
+ client_sent_event_helper(
+ "Authenticate",
+ "Authenticate via an access token.",
+ "pleroma:authenticate",
+ %{
+ token: token()
+ },
+ required: [:token]
+ )
+ end
+
+ defp token do
+ %Schema{
+ type: :string,
+ description: "An OAuth access token with corresponding permissions.",
+ example: "some token"
+ }
+ end
+
+ defp stream_specifier do
+ %{
+ stream: %Schema{
+ type: :string,
+ description: "The name of the stream.",
+ enum:
+ Pleroma.Constants.public_streams() ++
+ [
+ "public:remote",
+ "public:remote:media",
+ "user",
+ "user:pleroma_chat",
+ "user:notification",
+ "direct",
+ "list",
+ "hashtag"
+ ]
+ },
+ list: %Schema{
+ type: :string,
+ title: "List id",
+ description: "The id of the list. Required when `stream` is `list`.",
+ example: "some-id"
+ },
+ tag: %Schema{
+ type: :string,
+ title: "Hashtag name",
+ description: "The name of the hashtag. Required when `stream` is `hashtag`.",
+ example: "mew"
+ },
+ instance: %Schema{
+ type: :string,
+ title: "Domain name",
+ description:
+ "Domain name of the instance. Required when `stream` is `public:remote` or `public:remote:media`.",
+ example: "example.org"
+ }
+ }
+ end
+end
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 88444106d..07c2b62e3 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer
+ alias Pleroma.Web.StreamerView
@behaviour :cowboy_websocket
@@ -32,8 +33,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
+ topics =
+ if topic do
+ [topic]
+ else
+ []
+ end
+
{:cowboy_websocket, req,
- %{user: user, topic: topic, oauth_token: oauth_token, count: 0, timer: nil},
+ %{user: user, topics: topics, oauth_token: oauth_token, count: 0, timer: nil},
%{idle_timeout: @timeout}}
else
{:error, :bad_topic} ->
@@ -50,10 +58,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
def websocket_init(state) do
Logger.debug(
- "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}"
+ "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics}"
)
- Streamer.add_socket(state.topic, state.oauth_token)
+ Enum.each(state.topics, fn topic -> Streamer.add_socket(topic, state.oauth_token) end)
{:ok, %{state | timer: timer()}}
end
@@ -66,16 +74,26 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
# We only receive pings for now
def websocket_handle(:ping, state), do: {:ok, state}
+ def websocket_handle({:text, text}, state) do
+ with {:ok, %{} = event} <- Jason.decode(text) do
+ handle_client_event(event, state)
+ else
+ _ ->
+ Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}")
+ {:ok, state}
+ end
+ end
+
def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
- def websocket_info({:render_with_user, view, template, item}, state) do
+ def websocket_info({:render_with_user, view, template, item, topic}, state) do
user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
unless Streamer.filtered_by_user?(user, item) do
- websocket_info({:text, view.render(template, item, user)}, %{state | user: user})
+ websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user})
else
{:ok, state}
end
@@ -109,10 +127,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
def terminate(reason, _req, state) do
Logger.debug(
- "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}"
+ "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics || "?"}: #{inspect(reason)}"
)
- Streamer.remove_socket(state.topic)
+ Enum.each(state.topics, fn topic -> Streamer.remove_socket(topic) end)
:ok
end
@@ -137,4 +155,103 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
defp timer do
Process.send_after(self(), :tick, @tick)
end
+
+ defp handle_client_event(%{"type" => "subscribe", "stream" => _topic} = params, state) do
+ with {_, {:ok, topic}} <-
+ {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
+ {_, false} <- {:subscribed, topic in state.topics} do
+ Streamer.add_socket(topic, state.oauth_token)
+
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "success"})}
+ ], %{state | topics: [topic | state.topics]}}
+ else
+ {:subscribed, true} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "ignored"})}
+ ], state}
+
+ {:topic, {:error, error}} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "subscribe",
+ result: "error",
+ error: error
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params, state) do
+ with {_, {:ok, topic}} <-
+ {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
+ {_, true} <- {:subscribed, topic in state.topics} do
+ Streamer.remove_socket(topic)
+
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "success"})}
+ ], %{state | topics: List.delete(state.topics, topic)}}
+ else
+ {:subscribed, false} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "ignored"})}
+ ], state}
+
+ {:topic, {:error, error}} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "unsubscribe",
+ result: "error",
+ error: error
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(
+ %{"type" => "pleroma:authenticate", "token" => access_token} = _params,
+ state
+ ) do
+ with {:auth, nil, nil} <- {:auth, state.user, state.oauth_token},
+ {:ok, user, oauth_token} <- authenticate_request(access_token, nil) do
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "pleroma:authenticate",
+ result: "success"
+ })}
+ ], %{state | user: user, oauth_token: oauth_token}}
+ else
+ {:auth, _, _} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "pleroma:authenticate",
+ result: "error",
+ error: :already_authenticated
+ })}
+ ], state}
+
+ _ ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "pleroma:authenticate",
+ result: "error",
+ error: :unauthorized
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(params, state) do
+ Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}")
+ {[], state}
+ end
end
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index b9a04cc76..48ca82421 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Web.Streamer do
require Logger
+ require Pleroma.Constants
alias Pleroma.Activity
alias Pleroma.Chat.MessageReference
@@ -24,7 +25,7 @@ defmodule Pleroma.Web.Streamer do
def registry, do: @registry
- @public_streams ["public", "public:local", "public:media", "public:local:media"]
+ @public_streams Pleroma.Constants.public_streams()
@local_streams ["public:local", "public:local:media"]
@user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@@ -59,10 +60,14 @@ defmodule Pleroma.Web.Streamer do
end
@doc "Expand and authorizes a stream"
- @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
- {:ok, topic :: String.t()} | {:error, :bad_topic}
+ @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, Map.t()) ::
+ {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
def get_topic(stream, user, oauth_token, params \\ %{})
+ def get_topic(nil = _stream, _user, _oauth_token, _params) do
+ {:ok, nil}
+ end
+
# Allow all public steams if the instance allows unauthenticated access.
# Otherwise, only allow users with valid oauth tokens.
def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
@@ -219,8 +224,8 @@ defmodule Pleroma.Web.Streamer do
end
defp do_stream("follow_relationship", item) do
- text = StreamerView.render("follow_relationships_update.json", item)
user_topic = "user:#{item.follower.id}"
+ text = StreamerView.render("follow_relationships_update.json", item, user_topic)
Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
@@ -266,9 +271,11 @@ defmodule Pleroma.Web.Streamer do
defp do_stream(topic, %Notification{} = item)
when topic in ["user", "user:notification"] do
- Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+ user_topic = "#{topic}:#{item.user_id}"
+
+ Registry.dispatch(@registry, user_topic, fn list ->
Enum.each(list, fn {pid, _auth} ->
- send(pid, {:render_with_user, StreamerView, "notification.json", item})
+ send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
end)
end)
end
@@ -277,7 +284,7 @@ defmodule Pleroma.Web.Streamer do
when topic in ["user", "user:pleroma_chat"] do
topic = "#{topic}:#{user.id}"
- text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+ text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _auth} ->
@@ -305,7 +312,7 @@ defmodule Pleroma.Web.Streamer do
end
defp push_to_socket(topic, %Participation{} = participation) do
- rendered = StreamerView.render("conversation.json", participation)
+ rendered = StreamerView.render("conversation.json", participation, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _} ->
@@ -333,12 +340,15 @@ defmodule Pleroma.Web.Streamer do
Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
|> Map.put(:object, item.object)
- anon_render = StreamerView.render("status_update.json", create_activity)
+ anon_render = StreamerView.render("status_update.json", create_activity, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} ->
if auth? do
- send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity})
+ send(
+ pid,
+ {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
+ )
else
send(pid, {:text, anon_render})
end
@@ -347,12 +357,12 @@ defmodule Pleroma.Web.Streamer do
end
defp push_to_socket(topic, item) do
- anon_render = StreamerView.render("update.json", item)
+ anon_render = StreamerView.render("update.json", item, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} ->
if auth? do
- send(pid, {:render_with_user, StreamerView, "update.json", item})
+ send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
else
send(pid, {:text, anon_render})
end
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
index 6a55242b0..f97570b0a 100644
--- a/lib/pleroma/web/views/streamer_view.ex
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -11,8 +11,11 @@ defmodule Pleroma.Web.StreamerView do
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView
- def render("update.json", %Activity{} = activity, %User{} = user) do
+ require Pleroma.Constants
+
+ def render("update.json", %Activity{} = activity, %User{} = user, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -25,8 +28,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("status_update.json", %Activity{} = activity, %User{} = user) do
+ def render("status_update.json", %Activity{} = activity, %User{} = user, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "status.update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -39,8 +43,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("notification.json", %Notification{} = notify, %User{} = user) do
+ def render("notification.json", %Notification{} = notify, %User{} = user, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "notification",
payload:
NotificationView.render(
@@ -52,8 +57,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("update.json", %Activity{} = activity) do
+ def render("update.json", %Activity{} = activity, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -65,8 +71,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("status_update.json", %Activity{} = activity) do
+ def render("status_update.json", %Activity{} = activity, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "status.update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -78,7 +85,7 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("chat_update.json", %{chat_message_reference: cm_ref}) do
+ def render("chat_update.json", %{chat_message_reference: cm_ref}, topic) do
# Explicitly giving the cmr for the object here, so we don't accidentally
# send a later 'last_message' that was inserted between inserting this and
# streaming it out
@@ -93,6 +100,7 @@ defmodule Pleroma.Web.StreamerView do
)
%{
+ stream: render("stream.json", %{topic: topic}),
event: "pleroma:chat_update",
payload:
representation
@@ -101,8 +109,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("follow_relationships_update.json", item) do
+ def render("follow_relationships_update.json", item, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "pleroma:follow_relationships_update",
payload:
%{
@@ -123,8 +132,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("conversation.json", %Participation{} = participation) do
+ def render("conversation.json", %Participation{} = participation, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
@@ -135,4 +145,39 @@ defmodule Pleroma.Web.StreamerView do
}
|> Jason.encode!()
end
+
+ def render("pleroma_respond.json", %{type: type, result: result} = params) do
+ %{
+ event: "pleroma:respond",
+ payload:
+ %{
+ result: result,
+ type: type
+ }
+ |> Map.merge(maybe_error(params))
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("stream.json", %{topic: "user:pleroma_chat:" <> _}), do: ["user:pleroma_chat"]
+ def render("stream.json", %{topic: "user:notification:" <> _}), do: ["user:notification"]
+ def render("stream.json", %{topic: "user:" <> _}), do: ["user"]
+ def render("stream.json", %{topic: "direct:" <> _}), do: ["direct"]
+ def render("stream.json", %{topic: "list:" <> id}), do: ["list", id]
+ def render("stream.json", %{topic: "hashtag:" <> tag}), do: ["hashtag", tag]
+
+ def render("stream.json", %{topic: "public:remote:media:" <> instance}),
+ do: ["public:remote:media", instance]
+
+ def render("stream.json", %{topic: "public:remote:" <> instance}),
+ do: ["public:remote", instance]
+
+ def render("stream.json", %{topic: stream}) when stream in Pleroma.Constants.public_streams(),
+ do: [stream]
+
+ defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"}
+ defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"}
+ defp maybe_error(%{error: :already_authenticated}), do: %{error: "already_authenticated"}
+ defp maybe_error(_), do: %{}
end