diff options
26 files changed, 448 insertions, 889 deletions
diff --git a/.gitignore b/.gitignore index 3b0c7d361..4e71a7df0 100644 --- a/.gitignore +++ b/.gitignore @@ -43,7 +43,3 @@ docs/generated_config.md  # Code test coverage  /cover  /Elixir.*.coverdata - -.idea -pleroma.iml - diff --git a/config/config.exs b/config/config.exs index b1b98af93..ab6e00c98 100644 --- a/config/config.exs +++ b/config/config.exs @@ -331,10 +331,6 @@ config :pleroma, :activitypub,    follow_handshake_timeout: 500,    sign_object_fetches: true -config :pleroma, :streamer, -  workers: 3, -  overflow_workers: 2 -  config :pleroma, :user, deny_follow_blocked: true  config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex deleted file mode 100644 index 010897abc..000000000 --- a/lib/pleroma/activity/ir/topics.ex +++ /dev/null @@ -1,63 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Activity.Ir.Topics do -  alias Pleroma.Object -  alias Pleroma.Web.ActivityPub.Visibility - -  def get_activity_topics(activity) do -    activity -    |> Object.normalize() -    |> generate_topics(activity) -    |> List.flatten() -  end - -  defp generate_topics(%{data: %{"type" => "Answer"}}, _) do -    [] -  end - -  defp generate_topics(object, activity) do -    ["user", "list"] ++ visibility_tags(object, activity) -  end - -  defp visibility_tags(object, activity) do -    case Visibility.get_visibility(activity) do -      "public" -> -        if activity.local do -          ["public", "public:local"] -        else -          ["public"] -        end -        |> item_creation_tags(object, activity) - -      "direct" -> -        ["direct"] - -      _ -> -        [] -    end -  end - -  defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do -    tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity) -  end - -  defp item_creation_tags(tags, _, _) do -    tags -  end - -  defp hashtags_to_topics(%{data: %{"tag" => tags}}) do -    tags -    |> Enum.filter(&is_bitstring(&1)) -    |> Enum.map(fn tag -> "hashtag:" <> tag end) -  end - -  defp hashtags_to_topics(_), do: [] - -  defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: [] - -  defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"] - -  defp attachment_topics(_object, _act), do: ["public:media"] -end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 3b37ce630..49094704b 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -141,7 +141,7 @@ defmodule Pleroma.Application do    defp streamer_child(:test), do: []    defp streamer_child(_) do -    [Pleroma.Web.Streamer.supervisor()] +    [Pleroma.Web.Streamer]    end    defp oauth_cleanup_child(true), diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index 8012389ac..b7c880c51 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -210,10 +210,8 @@ defmodule Pleroma.Notification do      unless skip?(activity, user) do        notification = %Notification{user_id: user.id, activity: activity}        {:ok, notification} = Repo.insert(notification) - -      ["user", "user:notification"] -      |> Streamer.stream(notification) - +      Streamer.stream("user", notification) +      Streamer.stream("user:notification", notification)        Push.send(notification)        notification      end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index bc5ae7fbf..41f6a0f1f 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,7 +4,6 @@  defmodule Pleroma.Web.ActivityPub.ActivityPub do    alias Pleroma.Activity -  alias Pleroma.Activity.Ir.Topics    alias Pleroma.Config    alias Pleroma.Conversation    alias Pleroma.Notification @@ -17,7 +16,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    alias Pleroma.User    alias Pleroma.Web.ActivityPub.MRF    alias Pleroma.Web.ActivityPub.Transmogrifier -  alias Pleroma.Web.Streamer    alias Pleroma.Web.WebFinger    alias Pleroma.Workers.BackgroundWorker @@ -189,7 +187,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do        participations        |> Repo.preload(:user) -    Streamer.stream("participation", participations) +    Enum.each(participations, fn participation -> +      Pleroma.Web.Streamer.stream("participation", participation) +    end)    end    def stream_out_participations(%Object{data: %{"context" => context}}, user) do @@ -208,15 +208,41 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    def stream_out_participations(_, _), do: :noop -  def stream_out(%Activity{data: %{"type" => data_type}} = activity) -      when data_type in ["Create", "Announce", "Delete"] do -    activity -    |> Topics.get_activity_topics() -    |> Streamer.stream(activity) -  end - -  def stream_out(_activity) do -    :noop +  def stream_out(activity) do +    if activity.data["type"] in ["Create", "Announce", "Delete"] do +      object = Object.normalize(activity) +      # Do not stream out poll replies +      unless object.data["type"] == "Answer" do +        Pleroma.Web.Streamer.stream("user", activity) +        Pleroma.Web.Streamer.stream("list", activity) + +        if get_visibility(activity) == "public" do +          Pleroma.Web.Streamer.stream("public", activity) + +          if activity.local do +            Pleroma.Web.Streamer.stream("public:local", activity) +          end + +          if activity.data["type"] in ["Create"] do +            object.data +            |> Map.get("tag", []) +            |> Enum.filter(fn tag -> is_bitstring(tag) end) +            |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) + +            if object.data["attachment"] != [] do +              Pleroma.Web.Streamer.stream("public:media", activity) + +              if activity.local do +                Pleroma.Web.Streamer.stream("public:local:media", activity) +              end +            end +          end +        else +          if get_visibility(activity) == "direct", +            do: Pleroma.Web.Streamer.stream("direct", activity) +        end +      end +    end    end    def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 3c26eb406..dbd3542ea 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    alias Pleroma.Repo    alias Pleroma.User    alias Pleroma.Web.OAuth.Token -  alias Pleroma.Web.Streamer    @behaviour :cowboy_websocket @@ -25,7 +24,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do    ]    @anonymous_streams ["public", "public:local", "hashtag"] -  # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. +  # Handled by periodic keepalive in Pleroma.Web.Streamer.    @timeout :infinity    def init(%{qs: qs} = req, state) do @@ -66,7 +65,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do        }, topic #{state.topic}"      ) -    Streamer.add_socket(state.topic, streamer_socket(state)) +    Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))      {:ok, state}    end @@ -81,7 +80,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do        }, topic #{state.topic || "?"}: #{inspect(reason)}"      ) -    Streamer.remove_socket(state.topic, streamer_socket(state)) +    Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))      :ok    end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex new file mode 100644 index 000000000..587c43f40 --- /dev/null +++ b/lib/pleroma/web/streamer.ex @@ -0,0 +1,318 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.Streamer do +  use GenServer +  require Logger +  alias Pleroma.Activity +  alias Pleroma.Config +  alias Pleroma.Conversation.Participation +  alias Pleroma.Notification +  alias Pleroma.Object +  alias Pleroma.User +  alias Pleroma.Web.ActivityPub.ActivityPub +  alias Pleroma.Web.ActivityPub.Visibility +  alias Pleroma.Web.CommonAPI +  alias Pleroma.Web.MastodonAPI.NotificationView + +  @keepalive_interval :timer.seconds(30) + +  def start_link(_) do +    GenServer.start_link(__MODULE__, %{}, name: __MODULE__) +  end + +  def add_socket(topic, socket) do +    GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) +  end + +  def remove_socket(topic, socket) do +    GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) +  end + +  def stream(topic, item) do +    GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) +  end + +  def init(args) do +    Process.send_after(self(), %{action: :ping}, @keepalive_interval) + +    {:ok, args} +  end + +  def handle_info(%{action: :ping}, topics) do +    topics +    |> Map.values() +    |> List.flatten() +    |> Enum.each(fn socket -> +      Logger.debug("Sending keepalive ping") +      send(socket.transport_pid, {:text, ""}) +    end) + +    Process.send_after(self(), %{action: :ping}, @keepalive_interval) + +    {:noreply, topics} +  end + +  def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do +    recipient_topics = +      User.get_recipients_from_activity(item) +      |> Enum.map(fn %{id: id} -> "direct:#{id}" end) + +    Enum.each(recipient_topics || [], fn user_topic -> +      Logger.debug("Trying to push direct message to #{user_topic}\n\n") +      push_to_socket(topics, user_topic, item) +    end) + +    {:noreply, topics} +  end + +  def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do +    user_topic = "direct:#{participation.user_id}" +    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") + +    push_to_socket(topics, user_topic, participation) + +    {:noreply, topics} +  end + +  def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do +    # filter the recipient list if the activity is not public, see #270. +    recipient_lists = +      case Visibility.is_public?(item) do +        true -> +          Pleroma.List.get_lists_from_activity(item) + +        _ -> +          Pleroma.List.get_lists_from_activity(item) +          |> Enum.filter(fn list -> +            owner = User.get_cached_by_id(list.user_id) + +            Visibility.visible_for_user?(item, owner) +          end) +      end + +    recipient_topics = +      recipient_lists +      |> Enum.map(fn %{id: id} -> "list:#{id}" end) + +    Enum.each(recipient_topics || [], fn list_topic -> +      Logger.debug("Trying to push message to #{list_topic}\n\n") +      push_to_socket(topics, list_topic, item) +    end) + +    {:noreply, topics} +  end + +  def handle_cast( +        %{action: :stream, topic: topic, item: %Notification{} = item}, +        topics +      ) +      when topic in ["user", "user:notification"] do +    topics +    |> Map.get("#{topic}:#{item.user_id}", []) +    |> Enum.each(fn socket -> +      with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), +           true <- should_send?(user, item) do +        send( +          socket.transport_pid, +          {:text, represent_notification(socket.assigns[:user], item)} +        ) +      end +    end) + +    {:noreply, topics} +  end + +  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do +    Logger.debug("Trying to push to users") + +    recipient_topics = +      User.get_recipients_from_activity(item) +      |> Enum.map(fn %{id: id} -> "user:#{id}" end) + +    Enum.each(recipient_topics, fn topic -> +      push_to_socket(topics, topic, item) +    end) + +    {:noreply, topics} +  end + +  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do +    Logger.debug("Trying to push to #{topic}") +    Logger.debug("Pushing item to #{topic}") +    push_to_socket(topics, topic, item) +    {:noreply, topics} +  end + +  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do +    topic = internal_topic(topic, socket) +    sockets_for_topic = sockets[topic] || [] +    sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) +    sockets = Map.put(sockets, topic, sockets_for_topic) +    Logger.debug("Got new conn for #{topic}") +    {:noreply, sockets} +  end + +  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do +    topic = internal_topic(topic, socket) +    sockets_for_topic = sockets[topic] || [] +    sockets_for_topic = List.delete(sockets_for_topic, socket) +    sockets = Map.put(sockets, topic, sockets_for_topic) +    Logger.debug("Removed conn for #{topic}") +    {:noreply, sockets} +  end + +  def handle_cast(m, state) do +    Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") +    {:noreply, state} +  end + +  defp represent_update(%Activity{} = activity, %User{} = user) do +    %{ +      event: "update", +      payload: +        Pleroma.Web.MastodonAPI.StatusView.render( +          "status.json", +          activity: activity, +          for: user +        ) +        |> Jason.encode!() +    } +    |> Jason.encode!() +  end + +  defp represent_update(%Activity{} = activity) do +    %{ +      event: "update", +      payload: +        Pleroma.Web.MastodonAPI.StatusView.render( +          "status.json", +          activity: activity +        ) +        |> Jason.encode!() +    } +    |> Jason.encode!() +  end + +  def represent_conversation(%Participation{} = participation) do +    %{ +      event: "conversation", +      payload: +        Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ +          participation: participation, +          for: participation.user +        }) +        |> Jason.encode!() +    } +    |> Jason.encode!() +  end + +  @spec represent_notification(User.t(), Notification.t()) :: binary() +  defp represent_notification(%User{} = user, %Notification{} = notify) do +    %{ +      event: "notification", +      payload: +        NotificationView.render( +          "show.json", +          %{notification: notify, for: user} +        ) +        |> Jason.encode!() +    } +    |> Jason.encode!() +  end + +  defp should_send?(%User{} = user, %Activity{} = item) do +    blocks = user.info.blocks || [] +    mutes = user.info.mutes || [] +    reblog_mutes = user.info.muted_reblogs || [] +    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) + +    with parent when not is_nil(parent) <- Object.normalize(item), +         true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), +         true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), +         %{host: item_host} <- URI.parse(item.actor), +         %{host: parent_host} <- URI.parse(parent.data["actor"]), +         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), +         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), +         true <- thread_containment(item, user), +         false <- CommonAPI.thread_muted?(user, item) do +      true +    else +      _ -> false +    end +  end + +  defp should_send?(%User{} = user, %Notification{activity: activity}) do +    should_send?(user, activity) +  end + +  def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do +    Enum.each(topics[topic] || [], fn socket -> +      # Get the current user so we have up-to-date blocks etc. +      if socket.assigns[:user] do +        user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) + +        if should_send?(user, item) do +          send(socket.transport_pid, {:text, represent_update(item, user)}) +        end +      else +        send(socket.transport_pid, {:text, represent_update(item)}) +      end +    end) +  end + +  def push_to_socket(topics, topic, %Participation{} = participation) do +    Enum.each(topics[topic] || [], fn socket -> +      send(socket.transport_pid, {:text, represent_conversation(participation)}) +    end) +  end + +  def push_to_socket(topics, topic, %Activity{ +        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} +      }) do +    Enum.each(topics[topic] || [], fn socket -> +      send( +        socket.transport_pid, +        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} +      ) +    end) +  end + +  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + +  def push_to_socket(topics, topic, item) do +    Enum.each(topics[topic] || [], fn socket -> +      # Get the current user so we have up-to-date blocks etc. +      if socket.assigns[:user] do +        user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) +        blocks = user.info.blocks || [] +        mutes = user.info.mutes || [] + +        with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), +             true <- thread_containment(item, user) do +          send(socket.transport_pid, {:text, represent_update(item, user)}) +        end +      else +        send(socket.transport_pid, {:text, represent_update(item)}) +      end +    end) +  end + +  defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do +    "#{topic}:#{socket.assigns[:user].id}" +  end + +  defp internal_topic(topic, _), do: topic + +  @spec thread_containment(Activity.t(), User.t()) :: boolean() +  defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true + +  defp thread_containment(activity, user) do +    if Config.get([:instance, :skip_thread_containment]) do +      true +    else +      ActivityPub.contain_activity(activity, user) +    end +  end +end diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex deleted file mode 100644 index f77cbb95c..000000000 --- a/lib/pleroma/web/streamer/ping.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Pleroma.Web.Streamer.Ping do -  use GenServer -  require Logger - -  alias Pleroma.Web.Streamer.State -  alias Pleroma.Web.Streamer.StreamerSocket - -  @keepalive_interval :timer.seconds(30) - -  def start_link(opts) do -    ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) -    GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) -  end - -  def init(%{ping_interval: ping_interval} = args) do -    Process.send_after(self(), :ping, ping_interval) -    {:ok, args} -  end - -  def handle_info(:ping, %{ping_interval: ping_interval} = state) do -    State.get_sockets() -    |> Map.values() -    |> List.flatten() -    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> -      Logger.debug("Sending keepalive ping") -      send(transport_pid, {:text, ""}) -    end) - -    Process.send_after(self(), :ping, ping_interval) - -    {:noreply, state} -  end -end diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex deleted file mode 100644 index 7b5199068..000000000 --- a/lib/pleroma/web/streamer/state.ex +++ /dev/null @@ -1,68 +0,0 @@ -defmodule Pleroma.Web.Streamer.State do -  use GenServer -  require Logger - -  alias Pleroma.Web.Streamer.StreamerSocket - -  def start_link(_) do -    GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) -  end - -  def add_socket(topic, socket) do -    GenServer.call(__MODULE__, {:add, socket, topic}) -  end - -  def remove_socket(topic, socket) do -    GenServer.call(__MODULE__, {:remove, socket, topic}) -  end - -  def get_sockets do -    %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) -    stream_sockets -  end - -  def init(init_arg) do -    {:ok, init_arg} -  end - -  def handle_call(:get_state, _from, state) do -    {:reply, state, state} -  end - -  def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do -    internal_topic = internal_topic(topic, socket) -    stream_socket = StreamerSocket.from_socket(socket) - -    sockets_for_topic = -      sockets -      |> Map.get(internal_topic, []) -      |> List.insert_at(0, stream_socket) -      |> Enum.uniq() - -    state = put_in(state, [:sockets, internal_topic], sockets_for_topic) -    Logger.debug("Got new conn for #{topic}") -    {:reply, state, state} -  end - -  def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do -    internal_topic = internal_topic(topic, socket) -    stream_socket = StreamerSocket.from_socket(socket) - -    sockets_for_topic = -      sockets -      |> Map.get(internal_topic, []) -      |> List.delete(stream_socket) - -    state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) -    {:reply, state, state} -  end - -  defp internal_topic(topic, socket) -       when topic in ~w[user user:notification direct] do -    "#{topic}:#{socket.assigns[:user].id}" -  end - -  defp internal_topic(topic, _) do -    topic -  end -end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex deleted file mode 100644 index 8cf719277..000000000 --- a/lib/pleroma/web/streamer/streamer.ex +++ /dev/null @@ -1,55 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer do -  alias Pleroma.Web.Streamer.State -  alias Pleroma.Web.Streamer.Worker - -  @timeout 60_000 -  @mix_env Mix.env() - -  def add_socket(topic, socket) do -    State.add_socket(topic, socket) -  end - -  def remove_socket(topic, socket) do -    State.remove_socket(topic, socket) -  end - -  def get_sockets do -    State.get_sockets() -  end - -  def stream(topics, items) do -    if should_send?() do -      Task.async(fn -> -        :poolboy.transaction( -          :streamer_worker, -          &Worker.stream(&1, topics, items), -          @timeout -        ) -      end) -    end -  end - -  def supervisor, do: Pleroma.Web.Streamer.Supervisor - -  defp should_send? do -    handle_should_send(@mix_env) -  end - -  defp handle_should_send(:test) do -    case Process.whereis(:streamer_worker) do -      nil -> -        false - -      pid -> -        Process.alive?(pid) -    end -  end - -  defp handle_should_send(_) do -    true -  end -end diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex deleted file mode 100644 index f006c0306..000000000 --- a/lib/pleroma/web/streamer/streamer_socket.ex +++ /dev/null @@ -1,31 +0,0 @@ -defmodule Pleroma.Web.Streamer.StreamerSocket do -  defstruct transport_pid: nil, user: nil - -  alias Pleroma.User -  alias Pleroma.Web.Streamer.StreamerSocket - -  def from_socket(%{ -        transport_pid: transport_pid, -        assigns: %{user: nil} -      }) do -    %StreamerSocket{ -      transport_pid: transport_pid -    } -  end - -  def from_socket(%{ -        transport_pid: transport_pid, -        assigns: %{user: %User{} = user} -      }) do -    %StreamerSocket{ -      transport_pid: transport_pid, -      user: user -    } -  end - -  def from_socket(%{transport_pid: transport_pid}) do -    %StreamerSocket{ -      transport_pid: transport_pid -    } -  end -end diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex deleted file mode 100644 index 6afe19323..000000000 --- a/lib/pleroma/web/streamer/supervisor.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Pleroma.Web.Streamer.Supervisor do -  use Supervisor - -  def start_link(opts) do -    Supervisor.start_link(__MODULE__, opts, name: __MODULE__) -  end - -  def init(args) do -    children = [ -      {Pleroma.Web.Streamer.State, args}, -      {Pleroma.Web.Streamer.Ping, args}, -      :poolboy.child_spec(:streamer_worker, poolboy_config()) -    ] - -    opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] -    Supervisor.init(children, opts) -  end - -  defp poolboy_config do -    opts = -      Pleroma.Config.get(:streamer, -        workers: 3, -        overflow_workers: 2 -      ) - -    [ -      {:name, {:local, :streamer_worker}}, -      {:worker_module, Pleroma.Web.Streamer.Worker}, -      {:size, opts[:workers]}, -      {:max_overflow, opts[:overflow_workers]} -    ] -  end -end diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex deleted file mode 100644 index 5804508eb..000000000 --- a/lib/pleroma/web/streamer/worker.ex +++ /dev/null @@ -1,220 +0,0 @@ -defmodule Pleroma.Web.Streamer.Worker do -  use GenServer - -  require Logger - -  alias Pleroma.Activity -  alias Pleroma.Config -  alias Pleroma.Conversation.Participation -  alias Pleroma.Notification -  alias Pleroma.Object -  alias Pleroma.User -  alias Pleroma.Web.ActivityPub.ActivityPub -  alias Pleroma.Web.ActivityPub.Visibility -  alias Pleroma.Web.CommonAPI -  alias Pleroma.Web.Streamer.State -  alias Pleroma.Web.Streamer.StreamerSocket -  alias Pleroma.Web.StreamerView - -  def start_link(_) do -    GenServer.start_link(__MODULE__, %{}, []) -  end - -  def init(init_arg) do -    {:ok, init_arg} -  end - -  def stream(pid, topics, items) do -    GenServer.call(pid, {:stream, topics, items}) -  end - -  def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do -    Enum.each(topics, fn t -> -      do_stream(%{topic: t, item: item}) -    end) - -    {:reply, state, state} -  end - -  def handle_call({:stream, topic, items}, _from, state) when is_list(items) do -    Enum.each(items, fn i -> -      do_stream(%{topic: topic, item: i}) -    end) - -    {:reply, state, state} -  end - -  def handle_call({:stream, topic, item}, _from, state) do -    do_stream(%{topic: topic, item: item}) - -    {:reply, state, state} -  end - -  defp do_stream(%{topic: "direct", item: item}) do -    recipient_topics = -      User.get_recipients_from_activity(item) -      |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - -    Enum.each(recipient_topics, fn user_topic -> -      Logger.debug("Trying to push direct message to #{user_topic}\n\n") -      push_to_socket(State.get_sockets(), user_topic, item) -    end) -  end - -  defp do_stream(%{topic: "participation", item: participation}) do -    user_topic = "direct:#{participation.user_id}" -    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - -    push_to_socket(State.get_sockets(), user_topic, participation) -  end - -  defp do_stream(%{topic: "list", item: item}) do -    # filter the recipient list if the activity is not public, see #270. -    recipient_lists = -      case Visibility.is_public?(item) do -        true -> -          Pleroma.List.get_lists_from_activity(item) - -        _ -> -          Pleroma.List.get_lists_from_activity(item) -          |> Enum.filter(fn list -> -            owner = User.get_cached_by_id(list.user_id) - -            Visibility.visible_for_user?(item, owner) -          end) -      end - -    recipient_topics = -      recipient_lists -      |> Enum.map(fn %{id: id} -> "list:#{id}" end) - -    Enum.each(recipient_topics, fn list_topic -> -      Logger.debug("Trying to push message to #{list_topic}\n\n") -      push_to_socket(State.get_sockets(), list_topic, item) -    end) -  end - -  defp do_stream(%{topic: topic, item: %Notification{} = item}) -       when topic in ["user", "user:notification"] do -    State.get_sockets() -    |> Map.get("#{topic}:#{item.user_id}", []) -    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> -      with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), -           true <- should_send?(user, item) do -        send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) -      end -    end) -  end - -  defp do_stream(%{topic: "user", item: item}) do -    Logger.debug("Trying to push to users") - -    recipient_topics = -      User.get_recipients_from_activity(item) -      |> Enum.map(fn %{id: id} -> "user:#{id}" end) - -    Enum.each(recipient_topics, fn topic -> -      push_to_socket(State.get_sockets(), topic, item) -    end) -  end - -  defp do_stream(%{topic: topic, item: item}) do -    Logger.debug("Trying to push to #{topic}") -    Logger.debug("Pushing item to #{topic}") -    push_to_socket(State.get_sockets(), topic, item) -  end - -  defp should_send?(%User{} = user, %Activity{} = item) do -    blocks = user.info.blocks || [] -    mutes = user.info.mutes || [] -    reblog_mutes = user.info.muted_reblogs || [] -    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) - -    with parent when not is_nil(parent) <- Object.normalize(item), -         true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), -         true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), -         %{host: item_host} <- URI.parse(item.actor), -         %{host: parent_host} <- URI.parse(parent.data["actor"]), -         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), -         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), -         true <- thread_containment(item, user), -         false <- CommonAPI.thread_muted?(user, item) do -      true -    else -      _ -> false -    end -  end - -  defp should_send?(%User{} = user, %Notification{activity: activity}) do -    should_send?(user, activity) -  end - -  def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do -    Enum.each(topics[topic] || [], fn %StreamerSocket{ -                                        transport_pid: transport_pid, -                                        user: socket_user -                                      } -> -      # Get the current user so we have up-to-date blocks etc. -      if socket_user do -        user = User.get_cached_by_ap_id(socket_user.ap_id) - -        if should_send?(user, item) do -          send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) -        end -      else -        send(transport_pid, {:text, StreamerView.render("update.json", item)}) -      end -    end) -  end - -  def push_to_socket(topics, topic, %Participation{} = participation) do -    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> -      send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) -    end) -  end - -  def push_to_socket(topics, topic, %Activity{ -        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} -      }) do -    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> -      send( -        transport_pid, -        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} -      ) -    end) -  end - -  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - -  def push_to_socket(topics, topic, item) do -    Enum.each(topics[topic] || [], fn %StreamerSocket{ -                                        transport_pid: transport_pid, -                                        user: socket_user -                                      } -> -      # Get the current user so we have up-to-date blocks etc. -      if socket_user do -        user = User.get_cached_by_ap_id(socket_user.ap_id) -        blocks = user.info.blocks || [] -        mutes = user.info.mutes || [] - -        with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), -             true <- thread_containment(item, user) do -          send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) -        end -      else -        send(transport_pid, {:text, StreamerView.render("update.json", item)}) -      end -    end) -  end - -  @spec thread_containment(Activity.t(), User.t()) :: boolean() -  defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true - -  defp thread_containment(activity, user) do -    if Config.get([:instance, :skip_thread_containment]) do -      true -    else -      ActivityPub.contain_activity(activity, user) -    end -  end -end diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex deleted file mode 100644 index b13030fa0..000000000 --- a/lib/pleroma/web/views/streamer_view.ex +++ /dev/null @@ -1,66 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.StreamerView do -  use Pleroma.Web, :view - -  alias Pleroma.Activity -  alias Pleroma.Conversation.Participation -  alias Pleroma.Notification -  alias Pleroma.User -  alias Pleroma.Web.MastodonAPI.NotificationView - -  def render("update.json", %Activity{} = activity, %User{} = user) do -    %{ -      event: "update", -      payload: -        Pleroma.Web.MastodonAPI.StatusView.render( -          "status.json", -          activity: activity, -          for: user -        ) -        |> Jason.encode!() -    } -    |> Jason.encode!() -  end - -  def render("notification.json", %User{} = user, %Notification{} = notify) do -    %{ -      event: "notification", -      payload: -        NotificationView.render( -          "show.json", -          %{notification: notify, for: user} -        ) -        |> Jason.encode!() -    } -    |> Jason.encode!() -  end - -  def render("update.json", %Activity{} = activity) do -    %{ -      event: "update", -      payload: -        Pleroma.Web.MastodonAPI.StatusView.render( -          "status.json", -          activity: activity -        ) -        |> Jason.encode!() -    } -    |> Jason.encode!() -  end - -  def render("conversation.json", %Participation{} = participation) do -    %{ -      event: "conversation", -      payload: -        Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ -          participation: participation, -          for: participation.user -        }) -        |> Jason.encode!() -    } -    |> Jason.encode!() -  end -end @@ -144,7 +144,6 @@ defmodule Pleroma.Mixfile do         git: "https://git.pleroma.social/pleroma/http_signatures.git",         ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"},        {:telemetry, "~> 0.3"}, -      {:poolboy, "~> 1.5"},        {:prometheus_ex, "~> 3.0"},        {:prometheus_plugs, "~> 1.1"},        {:prometheus_phoenix, "~> 1.3"}, @@ -73,7 +73,6 @@    "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},    "plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},    "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, -  "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"},    "postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},    "prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"},    "prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/activity/ir/topics_test.exs b/test/activity/ir/topics_test.exs deleted file mode 100644 index e75f83586..000000000 --- a/test/activity/ir/topics_test.exs +++ /dev/null @@ -1,141 +0,0 @@ -defmodule Pleroma.Activity.Ir.TopicsTest do -  use Pleroma.DataCase - -  alias Pleroma.Activity -  alias Pleroma.Activity.Ir.Topics -  alias Pleroma.Object - -  require Pleroma.Constants - -  describe "poll answer" do -    test "produce no topics" do -      activity = %Activity{object: %Object{data: %{"type" => "Answer"}}} - -      assert [] == Topics.get_activity_topics(activity) -    end -  end - -  describe "non poll answer" do -    test "always add user and list topics" do -      activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}} -      topics = Topics.get_activity_topics(activity) - -      assert Enum.member?(topics, "user") -      assert Enum.member?(topics, "list") -    end -  end - -  describe "public visibility" do -    setup do -      activity = %Activity{ -        object: %Object{data: %{"type" => "Note"}}, -        data: %{"to" => [Pleroma.Constants.as_public()]} -      } - -      {:ok, activity: activity} -    end - -    test "produces public topic", %{activity: activity} do -      topics = Topics.get_activity_topics(activity) - -      assert Enum.member?(topics, "public") -    end - -    test "local action produces public:local topic", %{activity: activity} do -      activity = %{activity | local: true} -      topics = Topics.get_activity_topics(activity) - -      assert Enum.member?(topics, "public:local") -    end - -    test "non-local action does not produce public:local topic", %{activity: activity} do -      activity = %{activity | local: false} -      topics = Topics.get_activity_topics(activity) - -      refute Enum.member?(topics, "public:local") -    end -  end - -  describe "public visibility create events" do -    setup do -      activity = %Activity{ -        object: %Object{data: %{"type" => "Create", "attachment" => []}}, -        data: %{"to" => [Pleroma.Constants.as_public()]} -      } - -      {:ok, activity: activity} -    end - -    test "with no attachments doesn't produce public:media topics", %{activity: activity} do -      topics = Topics.get_activity_topics(activity) - -      refute Enum.member?(topics, "public:media") -      refute Enum.member?(topics, "public:local:media") -    end - -    test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do -      tagged_data = Map.put(data, "tag", ["foo", "bar"]) -      activity = %{activity | object: %{object | data: tagged_data}} - -      topics = Topics.get_activity_topics(activity) - -      assert Enum.member?(topics, "hashtag:foo") -      assert Enum.member?(topics, "hashtag:bar") -    end - -    test "only converts strinngs to hash tags", %{ -      activity: %{object: %{data: data} = object} = activity -    } do -      tagged_data = Map.put(data, "tag", [2]) -      activity = %{activity | object: %{object | data: tagged_data}} - -      topics = Topics.get_activity_topics(activity) - -      refute Enum.member?(topics, "hashtag:2") -    end -  end - -  describe "public visibility create events with attachments" do -    setup do -      activity = %Activity{ -        object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}}, -        data: %{"to" => [Pleroma.Constants.as_public()]} -      } - -      {:ok, activity: activity} -    end - -    test "produce public:media topics", %{activity: activity} do -      topics = Topics.get_activity_topics(activity) - -      assert Enum.member?(topics, "public:media") -    end - -    test "local produces public:local:media topics", %{activity: activity} do -      topics = Topics.get_activity_topics(activity) - -      assert Enum.member?(topics, "public:local:media") -    end - -    test "non-local doesn't produce public:local:media topics", %{activity: activity} do -      activity = %{activity | local: false} - -      topics = Topics.get_activity_topics(activity) - -      refute Enum.member?(topics, "public:local:media") -    end -  end - -  describe "non-public visibility" do -    test "produces direct topic" do -      activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}} -      topics = Topics.get_activity_topics(activity) - -      assert Enum.member?(topics, "direct") -      refute Enum.member?(topics, "public") -      refute Enum.member?(topics, "public:local") -      refute Enum.member?(topics, "public:media") -      refute Enum.member?(topics, "public:local:media") -    end -  end -end diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs index c04262808..63bf73412 100644 --- a/test/integration/mastodon_websocket_test.exs +++ b/test/integration/mastodon_websocket_test.exs @@ -11,6 +11,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do    alias Pleroma.Integration.WebsocketClient    alias Pleroma.Web.CommonAPI    alias Pleroma.Web.OAuth +  alias Pleroma.Web.Streamer    @path Pleroma.Web.Endpoint.url()          |> URI.parse() @@ -18,6 +19,16 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do          |> Map.put(:path, "/api/v1/streaming")          |> URI.to_string() +  setup do +    GenServer.start(Streamer, %{}, name: Streamer) + +    on_exit(fn -> +      if pid = Process.whereis(Streamer) do +        Process.exit(pid, :kill) +      end +    end) +  end +    def start_socket(qs \\ nil, headers \\ []) do      path =        case qs do @@ -42,14 +53,12 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do      end)    end -  @tag needs_streamer: true    test "allows public streams without authentication" do      assert {:ok, _} = start_socket("?stream=public")      assert {:ok, _} = start_socket("?stream=public:local")      assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")    end -  @tag needs_streamer: true    test "receives well formatted events" do      user = insert(:user)      {:ok, _} = start_socket("?stream=public") @@ -94,7 +103,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do        assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")      end -    @tag needs_streamer: true      test "accepts the 'user' stream", %{token: token} = _state do        assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") @@ -103,7 +111,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do               end) =~ ":badarg"      end -    @tag needs_streamer: true      test "accepts the 'user:notification' stream", %{token: token} = _state do        assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") @@ -112,7 +119,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do               end) =~ ":badarg"      end -    @tag needs_streamer: true      test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do        assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) diff --git a/test/notification_test.exs b/test/notification_test.exs index 3d2f9a8fc..3be9db09b 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -69,7 +69,16 @@ defmodule Pleroma.NotificationTest do    end    describe "create_notification" do -    @tag needs_streamer: true +    setup do +      GenServer.start(Streamer, %{}, name: Streamer) + +      on_exit(fn -> +        if pid = Process.whereis(Streamer) do +          Process.exit(pid, :kill) +        end +      end) +    end +      test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do        user = insert(:user)        task = Task.async(fn -> assert_receive {:text, _}, 4_000 end) diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index b39c70677..ec5892ff5 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -40,10 +40,6 @@ defmodule Pleroma.Web.ConnCase do        Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})      end -    if tags[:needs_streamer] do -      start_supervised(Pleroma.Web.Streamer.supervisor()) -    end -      {:ok, conn: Phoenix.ConnTest.build_conn()}    end  end diff --git a/test/support/data_case.ex b/test/support/data_case.ex index 17fa15214..f3d98e7e3 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -39,10 +39,6 @@ defmodule Pleroma.DataCase do        Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})      end -    if tags[:needs_streamer] do -      start_supervised(Pleroma.Web.Streamer.supervisor()) -    end -      :ok    end diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs index 4100108a5..d0118fefa 100644 --- a/test/web/activity_pub/activity_pub_test.exs +++ b/test/web/activity_pub/activity_pub_test.exs @@ -38,7 +38,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do          stream: fn _, _ -> nil end do          ActivityPub.stream_out_participations(conversation.participations) -        assert called(Pleroma.Web.Streamer.stream("participation", participations)) +        Enum.each(participations, fn participation -> +          assert called(Pleroma.Web.Streamer.stream("participation", participation)) +        end)        end      end    end diff --git a/test/web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs deleted file mode 100644 index 3d52c00e4..000000000 --- a/test/web/streamer/ping_test.exs +++ /dev/null @@ -1,36 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.PingTest do -  use Pleroma.DataCase - -  import Pleroma.Factory -  alias Pleroma.Web.Streamer - -  setup do -    start_supervised({Streamer.supervisor(), [ping_interval: 30]}) - -    :ok -  end - -  describe "sockets" do -    setup do -      user = insert(:user) -      {:ok, %{user: user}} -    end - -    test "it sends pings", %{user: user} do -      task = -        Task.async(fn -> -          assert_receive {:text, received_event}, 40 -          assert_receive {:text, received_event}, 40 -          assert_receive {:text, received_event}, 40 -        end) - -      Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}}) - -      Task.await(task) -    end -  end -end diff --git a/test/web/streamer/state_test.exs b/test/web/streamer/state_test.exs deleted file mode 100644 index d1aeac541..000000000 --- a/test/web/streamer/state_test.exs +++ /dev/null @@ -1,54 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.StateTest do -  use Pleroma.DataCase - -  import Pleroma.Factory -  alias Pleroma.Web.Streamer -  alias Pleroma.Web.Streamer.StreamerSocket - -  @moduletag needs_streamer: true - -  describe "sockets" do -    setup do -      user = insert(:user) -      user2 = insert(:user) -      {:ok, %{user: user, user2: user2}} -    end - -    test "it can add a socket", %{user: user} do -      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) - -      assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets()) -    end - -    test "it can add multiple sockets per user", %{user: user} do -      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) -      Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}}) - -      assert( -        %{ -          "public" => [ -            %StreamerSocket{transport_pid: 2}, -            %StreamerSocket{transport_pid: 1} -          ] -        } = Streamer.get_sockets() -      ) -    end - -    test "it will not add a duplicate socket", %{user: user} do -      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) -      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) - -      assert( -        %{ -          "activity" => [ -            %StreamerSocket{transport_pid: 1} -          ] -        } = Streamer.get_sockets() -      ) -    end -  end -end diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer_test.exs index 88847e20f..96fa7645f 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer_test.exs @@ -5,20 +5,24 @@  defmodule Pleroma.Web.StreamerTest do    use Pleroma.DataCase -  import Pleroma.Factory -    alias Pleroma.List    alias Pleroma.User    alias Pleroma.Web.CommonAPI    alias Pleroma.Web.Streamer -  alias Pleroma.Web.Streamer.StreamerSocket -  alias Pleroma.Web.Streamer.Worker +  import Pleroma.Factory -  @moduletag needs_streamer: true    clear_config_all([:instance, :skip_thread_containment])    describe "user streams" do      setup do +      GenServer.start(Streamer, %{}, name: Streamer) + +      on_exit(fn -> +        if pid = Process.whereis(Streamer) do +          Process.exit(pid, :kill) +        end +      end) +        user = insert(:user)        notify = insert(:notification, user: user, activity: build(:note_activity))        {:ok, %{user: user, notify: notify}} @@ -121,9 +125,11 @@ defmodule Pleroma.Web.StreamerTest do          assert_receive {:text, _}, 4_000        end) -    fake_socket = %StreamerSocket{ +    fake_socket = %{        transport_pid: task.pid, -      user: user +      assigns: %{ +        user: user +      }      }      {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) @@ -132,7 +138,7 @@ defmodule Pleroma.Web.StreamerTest do        "public" => [fake_socket]      } -    Worker.push_to_socket(topics, "public", activity) +    Streamer.push_to_socket(topics, "public", activity)      Task.await(task) @@ -149,9 +155,11 @@ defmodule Pleroma.Web.StreamerTest do          assert received_event == expected_event        end) -    fake_socket = %StreamerSocket{ +    fake_socket = %{        transport_pid: task.pid, -      user: user +      assigns: %{ +        user: user +      }      }      {:ok, activity} = CommonAPI.delete(activity.id, other_user) @@ -160,7 +168,7 @@ defmodule Pleroma.Web.StreamerTest do        "public" => [fake_socket]      } -    Worker.push_to_socket(topics, "public", activity) +    Streamer.push_to_socket(topics, "public", activity)      Task.await(task)    end @@ -181,9 +189,9 @@ defmodule Pleroma.Web.StreamerTest do          )        task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) -      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} +      fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}        topics = %{"public" => [fake_socket]} -      Worker.push_to_socket(topics, "public", activity) +      Streamer.push_to_socket(topics, "public", activity)        Task.await(task)      end @@ -203,9 +211,9 @@ defmodule Pleroma.Web.StreamerTest do          )        task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) -      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} +      fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}        topics = %{"public" => [fake_socket]} -      Worker.push_to_socket(topics, "public", activity) +      Streamer.push_to_socket(topics, "public", activity)        Task.await(task)      end @@ -225,9 +233,9 @@ defmodule Pleroma.Web.StreamerTest do          )        task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) -      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} +      fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}        topics = %{"public" => [fake_socket]} -      Worker.push_to_socket(topics, "public", activity) +      Streamer.push_to_socket(topics, "public", activity)        Task.await(task)      end @@ -243,9 +251,11 @@ defmodule Pleroma.Web.StreamerTest do          refute_receive {:text, _}, 1_000        end) -    fake_socket = %StreamerSocket{ +    fake_socket = %{        transport_pid: task.pid, -      user: user +      assigns: %{ +        user: user +      }      }      {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) @@ -254,7 +264,7 @@ defmodule Pleroma.Web.StreamerTest do        "public" => [fake_socket]      } -    Worker.push_to_socket(topics, "public", activity) +    Streamer.push_to_socket(topics, "public", activity)      Task.await(task)    end @@ -274,9 +284,11 @@ defmodule Pleroma.Web.StreamerTest do          refute_receive {:text, _}, 1_000        end) -    fake_socket = %StreamerSocket{ +    fake_socket = %{        transport_pid: task.pid, -      user: user_a +      assigns: %{ +        user: user_a +      }      }      {:ok, activity} = @@ -289,7 +301,7 @@ defmodule Pleroma.Web.StreamerTest do        "list:#{list.id}" => [fake_socket]      } -    Worker.handle_call({:stream, "list", activity}, self(), topics) +    Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)      Task.await(task)    end @@ -306,9 +318,11 @@ defmodule Pleroma.Web.StreamerTest do          refute_receive {:text, _}, 1_000        end) -    fake_socket = %StreamerSocket{ +    fake_socket = %{        transport_pid: task.pid, -      user: user_a +      assigns: %{ +        user: user_a +      }      }      {:ok, activity} = @@ -321,12 +335,12 @@ defmodule Pleroma.Web.StreamerTest do        "list:#{list.id}" => [fake_socket]      } -    Worker.handle_call({:stream, "list", activity}, self(), topics) +    Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)      Task.await(task)    end -  test "it sends wanted private posts to list" do +  test "it send wanted private posts to list" do      user_a = insert(:user)      user_b = insert(:user) @@ -340,9 +354,11 @@ defmodule Pleroma.Web.StreamerTest do          assert_receive {:text, _}, 1_000        end) -    fake_socket = %StreamerSocket{ +    fake_socket = %{        transport_pid: task.pid, -      user: user_a +      assigns: %{ +        user: user_a +      }      }      {:ok, activity} = @@ -351,12 +367,11 @@ defmodule Pleroma.Web.StreamerTest do          "visibility" => "private"        }) -    Streamer.add_socket( -      "list:#{list.id}", -      fake_socket -    ) +    topics = %{ +      "list:#{list.id}" => [fake_socket] +    } -    Worker.handle_call({:stream, "list", activity}, self(), %{}) +    Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)      Task.await(task)    end @@ -372,9 +387,11 @@ defmodule Pleroma.Web.StreamerTest do          refute_receive {:text, _}, 1_000        end) -    fake_socket = %StreamerSocket{ +    fake_socket = %{        transport_pid: task.pid, -      user: user1 +      assigns: %{ +        user: user1 +      }      }      {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) @@ -384,7 +401,7 @@ defmodule Pleroma.Web.StreamerTest do        "public" => [fake_socket]      } -    Worker.push_to_socket(topics, "public", announce_activity) +    Streamer.push_to_socket(topics, "public", announce_activity)      Task.await(task)    end @@ -400,8 +417,6 @@ defmodule Pleroma.Web.StreamerTest do      task = Task.async(fn -> refute_receive {:text, _}, 4_000 end) -    Process.sleep(4000) -      Streamer.add_socket(        "user",        %{transport_pid: task.pid, assigns: %{user: user2}} @@ -413,6 +428,14 @@ defmodule Pleroma.Web.StreamerTest do    describe "direct streams" do      setup do +      GenServer.start(Streamer, %{}, name: Streamer) + +      on_exit(fn -> +        if pid = Process.whereis(Streamer) do +          Process.exit(pid, :kill) +        end +      end) +        :ok      end @@ -457,8 +480,6 @@ defmodule Pleroma.Web.StreamerTest do            refute_receive {:text, _}, 4_000          end) -      Process.sleep(1000) -        Streamer.add_socket(          "direct",          %{transport_pid: task.pid, assigns: %{user: user}} @@ -500,8 +521,6 @@ defmodule Pleroma.Web.StreamerTest do            assert last_status["id"] == to_string(create_activity.id)          end) -      Process.sleep(1000) -        Streamer.add_socket(          "direct",          %{transport_pid: task.pid, assigns: %{user: user}}  | 
