diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/application.ex | 9 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub.ex | 32 | ||||
| -rw-r--r-- | lib/pleroma/web/mastodon_api/websocket_handler.ex | 47 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/ping.ex | 37 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/state.ex | 82 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/streamer.ex | 244 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/streamer_socket.ex | 35 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/supervisor.ex | 37 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer/worker.ex | 208 | 
9 files changed, 277 insertions, 454 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 308d8cffa..a00bc0624 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -173,7 +173,14 @@ defmodule Pleroma.Application do    defp streamer_child(env) when env in [:test, :benchmark], do: []    defp streamer_child(_) do -    [Pleroma.Web.Streamer.supervisor()] +    [ +      {Registry, +       [ +         name: Pleroma.Web.Streamer.registry(), +         keys: :duplicate, +         partitions: System.schedulers_online() +       ]} +    ]    end    defp chat_child(_env, true) do diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 099df5879..8baaf97ac 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -170,12 +170,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do        BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) -      Notification.create_notifications(activity) - -      conversation = create_or_bump_conversation(activity, map["actor"]) -      participations = get_participations(conversation) -      stream_out(activity) -      stream_out_participations(participations)        {:ok, activity}      else        %Activity{} = activity -> @@ -198,6 +192,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      end    end +  def notify_and_stream(activity) do +    Notification.create_notifications(activity) + +    conversation = create_or_bump_conversation(activity, activity.actor) +    participations = get_participations(conversation) +    stream_out(activity) +    stream_out_participations(participations) +  end +    defp create_or_bump_conversation(activity, actor) do      with {:ok, conversation} <- Conversation.create_or_bump_for(activity),           %User{} = user <- User.get_cached_by_ap_id(actor), @@ -274,6 +277,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           _ <- increase_poll_votes_if_vote(create_data),           {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},           {:ok, _actor} <- increase_note_count_if_public(actor, activity), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      else @@ -301,6 +305,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do               additional             ),           {:ok, activity} <- insert(listen_data, local), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      end @@ -325,6 +330,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do             %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}             |> Utils.maybe_put("id", activity_id),           {:ok, activity} <- insert(data, local), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      end @@ -344,6 +350,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           },           data <- Utils.maybe_put(data, "id", activity_id),           {:ok, activity} <- insert(data, local), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      end @@ -365,6 +372,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),           {:ok, activity} <- insert(reaction_data, local),           {:ok, object} <- add_emoji_reaction_to_object(activity, object), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity, object}      else @@ -391,6 +399,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           unreact_data <- make_undo_data(user, reaction_activity, activity_id),           {:ok, activity} <- insert(unreact_data, local),           {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity, object}      else @@ -413,6 +422,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           {:ok, unlike_activity} <- insert(unlike_data, local),           {:ok, _activity} <- Repo.delete(like_activity),           {:ok, object} <- remove_like_from_object(like_activity, object), +         _ <- notify_and_stream(unlike_activity),           :ok <- maybe_federate(unlike_activity) do        {:ok, unlike_activity, like_activity, object}      else @@ -442,6 +452,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           announce_data <- make_announce_data(user, object, activity_id, public),           {:ok, activity} <- insert(announce_data, local),           {:ok, object} <- add_announce_to_object(activity, object), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity, object}      else @@ -468,6 +479,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),           unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),           {:ok, unannounce_activity} <- insert(unannounce_data, local), +         _ <- notify_and_stream(unannounce_activity),           :ok <- maybe_federate(unannounce_activity),           {:ok, _activity} <- Repo.delete(announce_activity),           {:ok, object} <- remove_announce_from_object(announce_activity, object) do @@ -490,6 +502,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    defp do_follow(follower, followed, activity_id, local) do      with data <- make_follow_data(follower, followed, activity_id),           {:ok, activity} <- insert(data, local), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      else @@ -511,6 +524,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),           unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),           {:ok, activity} <- insert(unfollow_data, local), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      else @@ -540,6 +554,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      with true <- outgoing_blocks,           block_data <- make_block_data(blocker, blocked, activity_id),           {:ok, activity} <- insert(block_data, local), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      else @@ -560,6 +575,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),           unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),           {:ok, activity} <- insert(unblock_data, local), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(activity) do        {:ok, activity}      else @@ -594,6 +610,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      with flag_data <- make_flag_data(params, additional),           {:ok, activity} <- insert(flag_data, local),           {:ok, stripped_activity} <- strip_report_status_data(activity), +         _ <- notify_and_stream(activity),           :ok <- maybe_federate(stripped_activity) do        User.all_superusers()        |> Enum.filter(fn user -> not is_nil(user.email) end) @@ -617,7 +634,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      }      with true <- origin.ap_id in target.also_known_as, -         {:ok, activity} <- insert(params, local) do +         {:ok, activity} <- insert(params, local), +         _ <- notify_and_stream(activity) do        maybe_federate(activity)        BackgroundWorker.enqueue("move_following", %{ diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 5652a37c1..6ef3fe2dd 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -12,6 +12,11 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    @behaviour :cowboy_websocket +  # Cowboy timeout period. +  @timeout :timer.seconds(30) +  # Hibernate every X messages +  @hibernate_every 100 +    @streams [      "public",      "public:local", @@ -25,9 +30,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    ]    @anonymous_streams ["public", "public:local", "hashtag"] -  # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. -  @timeout :infinity -    def init(%{qs: qs} = req, state) do      with params <- :cow_qs.parse_qs(qs),           sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), @@ -42,7 +44,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do            req          end -      {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}} +      {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}}      else        {:error, code} ->          Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") @@ -57,7 +59,13 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    end    def websocket_init(state) do -    send(self(), :subscribe) +    Logger.debug( +      "#{__MODULE__} accepted websocket connection for user #{ +        (state.user || %{id: "anonymous"}).id +      }, topic #{state.topic}" +    ) + +    Streamer.add_socket(state.topic, state.user)      {:ok, state}    end @@ -66,19 +74,24 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do      {:ok, state}    end -  def websocket_info(:subscribe, state) do -    Logger.debug( -      "#{__MODULE__} accepted websocket connection for user #{ -        (state.user || %{id: "anonymous"}).id -      }, topic #{state.topic}" -    ) +  def websocket_info({:render_with_user, view, template, item}, state) do +    user = %User{} = User.get_cached_by_ap_id(state.user.ap_id) -    Streamer.add_socket(state.topic, streamer_socket(state)) -    {:ok, state} +    unless Streamer.filtered_by_user?(user, item) do +      websocket_info({:text, view.render(template, user, item)}, %{state | user: user}) +    else +      {:ok, state} +    end    end    def websocket_info({:text, message}, state) do -    {:reply, {:text, message}, state} +    # If the websocket processed X messages, force an hibernate/GC. +    # We don't hibernate at every message to balance CPU usage/latency with RAM usage. +    if state.count > @hibernate_every do +      {:reply, {:text, message}, %{state | count: 0}, :hibernate} +    else +      {:reply, {:text, message}, %{state | count: state.count + 1}} +    end    end    def terminate(reason, _req, state) do @@ -88,7 +101,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do        }, topic #{state.topic || "?"}: #{inspect(reason)}"      ) -    Streamer.remove_socket(state.topic, streamer_socket(state)) +    Streamer.remove_socket(state.topic)      :ok    end @@ -136,8 +149,4 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    end    defp expand_topic(topic, _), do: topic - -  defp streamer_socket(state) do -    %{transport_pid: self(), assigns: state} -  end  end diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex deleted file mode 100644 index 7a08202a9..000000000 --- a/lib/pleroma/web/streamer/ping.ex +++ /dev/null @@ -1,37 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.Ping do -  use GenServer -  require Logger - -  alias Pleroma.Web.Streamer.State -  alias Pleroma.Web.Streamer.StreamerSocket - -  @keepalive_interval :timer.seconds(30) - -  def start_link(opts) do -    ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) -    GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) -  end - -  def init(%{ping_interval: ping_interval} = args) do -    Process.send_after(self(), :ping, ping_interval) -    {:ok, args} -  end - -  def handle_info(:ping, %{ping_interval: ping_interval} = state) do -    State.get_sockets() -    |> Map.values() -    |> List.flatten() -    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> -      Logger.debug("Sending keepalive ping") -      send(transport_pid, {:text, ""}) -    end) - -    Process.send_after(self(), :ping, ping_interval) - -    {:noreply, state} -  end -end diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex deleted file mode 100644 index 999550b88..000000000 --- a/lib/pleroma/web/streamer/state.ex +++ /dev/null @@ -1,82 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.State do -  use GenServer -  require Logger - -  alias Pleroma.Web.Streamer.StreamerSocket - -  @env Mix.env() - -  def start_link(_) do -    GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) -  end - -  def add_socket(topic, socket) do -    GenServer.call(__MODULE__, {:add, topic, socket}) -  end - -  def remove_socket(topic, socket) do -    do_remove_socket(@env, topic, socket) -  end - -  def get_sockets do -    %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) -    stream_sockets -  end - -  def init(init_arg) do -    {:ok, init_arg} -  end - -  def handle_call(:get_state, _from, state) do -    {:reply, state, state} -  end - -  def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do -    internal_topic = internal_topic(topic, socket) -    stream_socket = StreamerSocket.from_socket(socket) - -    sockets_for_topic = -      sockets -      |> Map.get(internal_topic, []) -      |> List.insert_at(0, stream_socket) -      |> Enum.uniq() - -    state = put_in(state, [:sockets, internal_topic], sockets_for_topic) -    Logger.debug("Got new conn for #{topic}") -    {:reply, state, state} -  end - -  def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do -    internal_topic = internal_topic(topic, socket) -    stream_socket = StreamerSocket.from_socket(socket) - -    sockets_for_topic = -      sockets -      |> Map.get(internal_topic, []) -      |> List.delete(stream_socket) - -    state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) -    {:reply, state, state} -  end - -  defp do_remove_socket(:test, _, _) do -    :ok -  end - -  defp do_remove_socket(_env, topic, socket) do -    GenServer.call(__MODULE__, {:remove, topic, socket}) -  end - -  defp internal_topic(topic, socket) -       when topic in ~w[user user:notification direct] do -    "#{topic}:#{socket.assigns[:user].id}" -  end - -  defp internal_topic(topic, _) do -    topic -  end -end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 814d5a729..5ad4aa936 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -3,53 +3,241 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.Streamer do -  alias Pleroma.Web.Streamer.State -  alias Pleroma.Web.Streamer.Worker +  require Logger + +  alias Pleroma.Activity +  alias Pleroma.Config +  alias Pleroma.Conversation.Participation +  alias Pleroma.Notification +  alias Pleroma.Object +  alias Pleroma.User +  alias Pleroma.Web.ActivityPub.ActivityPub +  alias Pleroma.Web.ActivityPub.Visibility +  alias Pleroma.Web.CommonAPI +  alias Pleroma.Web.StreamerView -  @timeout 60_000    @mix_env Mix.env() +  @registry Pleroma.Web.StreamerRegistry + +  def registry, do: @registry -  def add_socket(topic, socket) do -    State.add_socket(topic, socket) +  def add_socket(topic, %User{} = user) do +    if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)    end -  def remove_socket(topic, socket) do -    State.remove_socket(topic, socket) +  def add_socket(topic, _) do +    if should_env_send?(), do: Registry.register(@registry, topic, false)    end -  def get_sockets do -    State.get_sockets() +  def remove_socket(topic) do +    if should_env_send?(), do: Registry.unregister(@registry, topic)    end -  def stream(topics, items) do -    if should_send?() do -      Task.async(fn -> -        :poolboy.transaction( -          :streamer_worker, -          &Worker.stream(&1, topics, items), -          @timeout -        ) +  def stream(topics, item) when is_list(topics) do +    if should_env_send?() do +      Enum.each(topics, fn t -> +        spawn(fn -> do_stream(t, item) end)        end)      end + +    :ok    end -  def supervisor, do: Pleroma.Web.Streamer.Supervisor +  def stream(topic, items) when is_list(items) do +    if should_env_send?() do +      Enum.each(items, fn i -> +        spawn(fn -> do_stream(topic, i) end) +      end) -  defp should_send? do -    handle_should_send(@mix_env) +      :ok +    end    end -  defp handle_should_send(:test) do -    case Process.whereis(:streamer_worker) do -      nil -> -        false +  def stream(topic, item) do +    if should_env_send?() do +      spawn(fn -> do_stream(topic, item) end) +    end + +    :ok +  end -      pid -> -        Process.alive?(pid) +  def filtered_by_user?(%User{} = user, %Activity{} = item) do +    %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} = +      User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute]) + +    recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids) +    recipients = MapSet.new(item.recipients) +    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) + +    with parent <- Object.normalize(item) || item, +         true <- +           Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), +         true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, +         true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)), +         true <- MapSet.disjoint?(recipients, recipient_blocks), +         %{host: item_host} <- URI.parse(item.actor), +         %{host: parent_host} <- URI.parse(parent.data["actor"]), +         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), +         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), +         true <- thread_containment(item, user), +         false <- CommonAPI.thread_muted?(user, item) do +      false +    else +      _ -> true      end    end -  defp handle_should_send(:benchmark), do: false +  def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do +    filtered_by_user?(user, activity) +  end + +  defp do_stream("direct", item) do +    recipient_topics = +      User.get_recipients_from_activity(item) +      |> Enum.map(fn %{id: id} -> "direct:#{id}" end) + +    Enum.each(recipient_topics, fn user_topic -> +      Logger.debug("Trying to push direct message to #{user_topic}\n\n") +      push_to_socket(user_topic, item) +    end) +  end + +  defp do_stream("participation", participation) do +    user_topic = "direct:#{participation.user_id}" +    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") -  defp handle_should_send(_), do: true +    push_to_socket(user_topic, participation) +  end + +  defp do_stream("list", item) do +    # filter the recipient list if the activity is not public, see #270. +    recipient_lists = +      case Visibility.is_public?(item) do +        true -> +          Pleroma.List.get_lists_from_activity(item) + +        _ -> +          Pleroma.List.get_lists_from_activity(item) +          |> Enum.filter(fn list -> +            owner = User.get_cached_by_id(list.user_id) + +            Visibility.visible_for_user?(item, owner) +          end) +      end + +    recipient_topics = +      recipient_lists +      |> Enum.map(fn %{id: id} -> "list:#{id}" end) + +    Enum.each(recipient_topics, fn list_topic -> +      Logger.debug("Trying to push message to #{list_topic}\n\n") +      push_to_socket(list_topic, item) +    end) +  end + +  defp do_stream(topic, %Notification{} = item) +       when topic in ["user", "user:notification"] do +    Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> +      Enum.each(list, fn {pid, _auth} -> +        send(pid, {:render_with_user, StreamerView, "notification.json", item}) +      end) +    end) +  end + +  defp do_stream("user", item) do +    Logger.debug("Trying to push to users") + +    recipient_topics = +      User.get_recipients_from_activity(item) +      |> Enum.map(fn %{id: id} -> "user:#{id}" end) + +    Enum.each(recipient_topics, fn topic -> +      push_to_socket(topic, item) +    end) +  end + +  defp do_stream(topic, item) do +    Logger.debug("Trying to push to #{topic}") +    Logger.debug("Pushing item to #{topic}") +    push_to_socket(topic, item) +  end + +  defp push_to_socket(topic, %Participation{} = participation) do +    rendered = StreamerView.render("conversation.json", participation) + +    Registry.dispatch(@registry, topic, fn list -> +      Enum.each(list, fn {pid, _} -> +        send(pid, {:text, rendered}) +      end) +    end) +  end + +  defp push_to_socket(topic, %Activity{ +         data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} +       }) do +    rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)}) + +    Registry.dispatch(@registry, topic, fn list -> +      Enum.each(list, fn {pid, _} -> +        send(pid, {:text, rendered}) +      end) +    end) +  end + +  defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + +  defp push_to_socket(topic, item) do +    anon_render = StreamerView.render("update.json", item) + +    Registry.dispatch(@registry, topic, fn list -> +      Enum.each(list, fn {pid, auth?} -> +        if auth? do +          send(pid, {:render_with_user, StreamerView, "update.json", item}) +        else +          send(pid, {:text, anon_render}) +        end +      end) +    end) +  end + +  defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true + +  defp thread_containment(activity, user) do +    if Config.get([:instance, :skip_thread_containment]) do +      true +    else +      ActivityPub.contain_activity(activity, user) +    end +  end + +  # In test environement, only return true if the registry is started. +  # In benchmark environment, returns false. +  # In any other environment, always returns true. +  cond do +    @mix_env == :test -> +      def should_env_send? do +        case Process.whereis(@registry) do +          nil -> +            false + +          pid -> +            Process.alive?(pid) +        end +      end + +    @mix_env == :benchmark -> +      def should_env_send?, do: false + +    true -> +      def should_env_send?, do: true +  end + +  defp user_topic(topic, user) +       when topic in ~w[user user:notification direct] do +    "#{topic}:#{user.id}" +  end + +  defp user_topic(topic, _) do +    topic +  end  end diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex deleted file mode 100644 index 7d5dcd34e..000000000 --- a/lib/pleroma/web/streamer/streamer_socket.ex +++ /dev/null @@ -1,35 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.StreamerSocket do -  defstruct transport_pid: nil, user: nil - -  alias Pleroma.User -  alias Pleroma.Web.Streamer.StreamerSocket - -  def from_socket(%{ -        transport_pid: transport_pid, -        assigns: %{user: nil} -      }) do -    %StreamerSocket{ -      transport_pid: transport_pid -    } -  end - -  def from_socket(%{ -        transport_pid: transport_pid, -        assigns: %{user: %User{} = user} -      }) do -    %StreamerSocket{ -      transport_pid: transport_pid, -      user: user -    } -  end - -  def from_socket(%{transport_pid: transport_pid}) do -    %StreamerSocket{ -      transport_pid: transport_pid -    } -  end -end diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex deleted file mode 100644 index bd9029bc0..000000000 --- a/lib/pleroma/web/streamer/supervisor.ex +++ /dev/null @@ -1,37 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.Supervisor do -  use Supervisor - -  def start_link(opts) do -    Supervisor.start_link(__MODULE__, opts, name: __MODULE__) -  end - -  def init(args) do -    children = [ -      {Pleroma.Web.Streamer.State, args}, -      {Pleroma.Web.Streamer.Ping, args}, -      :poolboy.child_spec(:streamer_worker, poolboy_config()) -    ] - -    opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] -    Supervisor.init(children, opts) -  end - -  defp poolboy_config do -    opts = -      Pleroma.Config.get(:streamer, -        workers: 3, -        overflow_workers: 2 -      ) - -    [ -      {:name, {:local, :streamer_worker}}, -      {:worker_module, Pleroma.Web.Streamer.Worker}, -      {:size, opts[:workers]}, -      {:max_overflow, opts[:overflow_workers]} -    ] -  end -end diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex deleted file mode 100644 index f6160fa4d..000000000 --- a/lib/pleroma/web/streamer/worker.ex +++ /dev/null @@ -1,208 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.Worker do -  use GenServer - -  require Logger - -  alias Pleroma.Activity -  alias Pleroma.Config -  alias Pleroma.Conversation.Participation -  alias Pleroma.Notification -  alias Pleroma.Object -  alias Pleroma.User -  alias Pleroma.Web.ActivityPub.ActivityPub -  alias Pleroma.Web.ActivityPub.Visibility -  alias Pleroma.Web.CommonAPI -  alias Pleroma.Web.Streamer.State -  alias Pleroma.Web.Streamer.StreamerSocket -  alias Pleroma.Web.StreamerView - -  def start_link(_) do -    GenServer.start_link(__MODULE__, %{}, []) -  end - -  def init(init_arg) do -    {:ok, init_arg} -  end - -  def stream(pid, topics, items) do -    GenServer.call(pid, {:stream, topics, items}) -  end - -  def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do -    Enum.each(topics, fn t -> -      do_stream(%{topic: t, item: item}) -    end) - -    {:reply, state, state} -  end - -  def handle_call({:stream, topic, items}, _from, state) when is_list(items) do -    Enum.each(items, fn i -> -      do_stream(%{topic: topic, item: i}) -    end) - -    {:reply, state, state} -  end - -  def handle_call({:stream, topic, item}, _from, state) do -    do_stream(%{topic: topic, item: item}) - -    {:reply, state, state} -  end - -  defp do_stream(%{topic: "direct", item: item}) do -    recipient_topics = -      User.get_recipients_from_activity(item) -      |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - -    Enum.each(recipient_topics, fn user_topic -> -      Logger.debug("Trying to push direct message to #{user_topic}\n\n") -      push_to_socket(State.get_sockets(), user_topic, item) -    end) -  end - -  defp do_stream(%{topic: "participation", item: participation}) do -    user_topic = "direct:#{participation.user_id}" -    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - -    push_to_socket(State.get_sockets(), user_topic, participation) -  end - -  defp do_stream(%{topic: "list", item: item}) do -    # filter the recipient list if the activity is not public, see #270. -    recipient_lists = -      case Visibility.is_public?(item) do -        true -> -          Pleroma.List.get_lists_from_activity(item) - -        _ -> -          Pleroma.List.get_lists_from_activity(item) -          |> Enum.filter(fn list -> -            owner = User.get_cached_by_id(list.user_id) - -            Visibility.visible_for_user?(item, owner) -          end) -      end - -    recipient_topics = -      recipient_lists -      |> Enum.map(fn %{id: id} -> "list:#{id}" end) - -    Enum.each(recipient_topics, fn list_topic -> -      Logger.debug("Trying to push message to #{list_topic}\n\n") -      push_to_socket(State.get_sockets(), list_topic, item) -    end) -  end - -  defp do_stream(%{topic: topic, item: %Notification{} = item}) -       when topic in ["user", "user:notification"] do -    State.get_sockets() -    |> Map.get("#{topic}:#{item.user_id}", []) -    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> -      with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), -           true <- should_send?(user, item) do -        send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) -      end -    end) -  end - -  defp do_stream(%{topic: "user", item: item}) do -    Logger.debug("Trying to push to users") - -    recipient_topics = -      User.get_recipients_from_activity(item) -      |> Enum.map(fn %{id: id} -> "user:#{id}" end) - -    Enum.each(recipient_topics, fn topic -> -      push_to_socket(State.get_sockets(), topic, item) -    end) -  end - -  defp do_stream(%{topic: topic, item: item}) do -    Logger.debug("Trying to push to #{topic}") -    Logger.debug("Pushing item to #{topic}") -    push_to_socket(State.get_sockets(), topic, item) -  end - -  defp should_send?(%User{} = user, %Activity{} = item) do -    %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} = -      User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute]) - -    recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids) -    recipients = MapSet.new(item.recipients) -    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) - -    with parent <- Object.normalize(item) || item, -         true <- -           Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), -         true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, -         true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)), -         true <- MapSet.disjoint?(recipients, recipient_blocks), -         %{host: item_host} <- URI.parse(item.actor), -         %{host: parent_host} <- URI.parse(parent.data["actor"]), -         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), -         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), -         true <- thread_containment(item, user), -         false <- CommonAPI.thread_muted?(user, item) do -      true -    else -      _ -> false -    end -  end - -  defp should_send?(%User{} = user, %Notification{activity: activity}) do -    should_send?(user, activity) -  end - -  def push_to_socket(topics, topic, %Participation{} = participation) do -    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> -      send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) -    end) -  end - -  def push_to_socket(topics, topic, %Activity{ -        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} -      }) do -    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> -      send( -        transport_pid, -        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} -      ) -    end) -  end - -  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - -  def push_to_socket(topics, topic, item) do -    Enum.each(topics[topic] || [], fn %StreamerSocket{ -                                        transport_pid: transport_pid, -                                        user: socket_user -                                      } -> -      # Get the current user so we have up-to-date blocks etc. -      if socket_user do -        user = User.get_cached_by_ap_id(socket_user.ap_id) - -        if should_send?(user, item) do -          send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) -        end -      else -        send(transport_pid, {:text, StreamerView.render("update.json", item)}) -      end -    end) -  end - -  @spec thread_containment(Activity.t(), User.t()) :: boolean() -  defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true - -  defp thread_containment(activity, user) do -    if Config.get([:instance, :skip_thread_containment]) do -      true -    else -      ActivityPub.contain_activity(activity, user) -    end -  end -end  | 
