diff options
23 files changed, 32 insertions, 1443 deletions
diff --git a/config/config.exs b/config/config.exs index 0b8a75aad..1ac140ed0 100644 --- a/config/config.exs +++ b/config/config.exs @@ -129,7 +129,6 @@ config :pleroma, Pleroma.Web.Endpoint,      dispatch: [        {:_,         [ -         {"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []},           {"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []},           {"/websocket", Phoenix.Endpoint.CowboyWebSocket,            {Phoenix.Transports.WebSocket, diff --git a/config/description.exs b/config/description.exs index 0552b37e0..a663d8127 100644 --- a/config/description.exs +++ b/config/description.exs @@ -274,19 +274,6 @@ config :pleroma, :config_description, [    },    %{      group: :pleroma, -    key: :fed_sockets, -    type: :group, -    description: "Websocket based federation", -    children: [ -      %{ -        key: :enabled, -        type: :boolean, -        description: "Enable FedSockets" -      } -    ] -  }, -  %{ -    group: :pleroma,      key: Pleroma.Emails.Mailer,      type: :group,      description: "Mailer-related settings", diff --git a/docs/configuration/cheatsheet.md b/docs/configuration/cheatsheet.md index ebf95ebc9..4d18ac30a 100644 --- a/docs/configuration/cheatsheet.md +++ b/docs/configuration/cheatsheet.md @@ -220,18 +220,6 @@ config :pleroma, :mrf_user_allowlist, %{  * `total_user_limit`: the number of scheduled activities a user is allowed to create in total (Default: `300`)  * `enabled`: whether scheduled activities are sent to the job queue to be executed -## FedSockets -FedSockets is an experimental feature allowing for Pleroma backends to federate using a persistant websocket connection as opposed to making each federation a seperate http connection. This feature is currently off by default. It is configurable throught he following options. - -### :fedsockets -* `enabled`: Enables FedSockets for this instance. `false` by default. -* `connection_duration`: Time an idle websocket is kept open. -* `rejection_duration`: Failures to connect via FedSockets will not be retried for this period of time. -* `fed_socket_fetches` and `fed_socket_rejections`: Settings passed to `cachex` for the fetch registry, and rejection stacks. See `Pleroma.Web.FedSockets` for more details. - - -## Frontends -  ### :frontend_configurations  This can be used to configure a keyword list that keeps the configuration data for any kind of frontend. By default, settings for `pleroma_fe` and `masto_fe` are configured. You can find the documentation for `pleroma_fe` configuration into [Pleroma-FE configuration and customization for instance administrators](/frontend/CONFIGURATION/#options). diff --git a/installation/pleroma.nginx b/installation/pleroma.nginx index d613befd2..9890cb2b1 100644 --- a/installation/pleroma.nginx +++ b/installation/pleroma.nginx @@ -93,9 +93,4 @@ server {          chunked_transfer_encoding on;          proxy_pass         http://phoenix;      } - -    location /api/fedsocket/v1 { -        proxy_request_buffering off; -        proxy_pass http://phoenix/api/fedsocket/v1; -    }  } diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 7c4cd9626..8f08a6222 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -207,8 +207,7 @@ defmodule Pleroma.Application do           name: Pleroma.Web.Streamer.registry(),           keys: :duplicate,           partitions: System.schedulers_online() -       ]}, -      Pleroma.Web.FedSockets.Supervisor +       ]}      ]    end diff --git a/lib/pleroma/object/fetcher.ex b/lib/pleroma/object/fetcher.ex index ae4301738..20d8f687d 100644 --- a/lib/pleroma/object/fetcher.ex +++ b/lib/pleroma/object/fetcher.ex @@ -12,7 +12,6 @@ defmodule Pleroma.Object.Fetcher do    alias Pleroma.Web.ActivityPub.ObjectValidator    alias Pleroma.Web.ActivityPub.Transmogrifier    alias Pleroma.Web.Federator -  alias Pleroma.Web.FedSockets    require Logger    require Pleroma.Constants @@ -183,16 +182,16 @@ defmodule Pleroma.Object.Fetcher do      end    end -  def fetch_and_contain_remote_object_from_id(prm, opts \\ []) +  def fetch_and_contain_remote_object_from_id(id) -  def fetch_and_contain_remote_object_from_id(%{"id" => id}, opts), -    do: fetch_and_contain_remote_object_from_id(id, opts) +  def fetch_and_contain_remote_object_from_id(%{"id" => id}), +    do: fetch_and_contain_remote_object_from_id(id) -  def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do +  def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do      Logger.debug("Fetching object #{id} via AP")      with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")}, -         {:ok, body} <- get_object(id, opts), +         {:ok, body} <- get_object(id),           {:ok, data} <- safe_json_decode(body),           :ok <- Containment.contain_origin_from_id(id, data) do        {:ok, data} @@ -208,22 +207,10 @@ defmodule Pleroma.Object.Fetcher do      end    end -  def fetch_and_contain_remote_object_from_id(_id, _opts), +  def fetch_and_contain_remote_object_from_id(_id),      do: {:error, "id must be a string"} -  defp get_object(id, opts) do -    with false <- Keyword.get(opts, :force_http, false), -         {:ok, fedsocket} <- FedSockets.get_or_create_fed_socket(id) do -      Logger.debug("fetching via fedsocket - #{inspect(id)}") -      FedSockets.fetch(fedsocket, id) -    else -      _other -> -        Logger.debug("fetching via http - #{inspect(id)}") -        get_object_http(id) -    end -  end - -  defp get_object_http(id) do +  defp get_object(id) do      date = Pleroma.Signature.signed_date()      headers = diff --git a/lib/pleroma/signature.ex b/lib/pleroma/signature.ex index e388993b7..3aa6909d2 100644 --- a/lib/pleroma/signature.ex +++ b/lib/pleroma/signature.ex @@ -39,7 +39,7 @@ defmodule Pleroma.Signature do    def fetch_public_key(conn) do      with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),           {:ok, actor_id} <- key_id_to_actor_id(kid), -         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do +         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do        {:ok, public_key}      else        e -> @@ -50,8 +50,8 @@ defmodule Pleroma.Signature do    def refetch_public_key(conn) do      with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),           {:ok, actor_id} <- key_id_to_actor_id(kid), -         {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id, force_http: true), -         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do +         {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id), +         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do        {:ok, public_key}      else        e -> diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 8e4ec8064..a240579f3 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1772,12 +1772,12 @@ defmodule Pleroma.User do    def html_filter_policy(_), do: Config.get([:markup, :scrub_policy]) -  def fetch_by_ap_id(ap_id, opts \\ []), do: ActivityPub.make_user_from_ap_id(ap_id, opts) +  def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id) -  def get_or_fetch_by_ap_id(ap_id, opts \\ []) do +  def get_or_fetch_by_ap_id(ap_id) do      cached_user = get_cached_by_ap_id(ap_id) -    maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id, opts) +    maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id)      case {cached_user, maybe_fetched_user} do        {_, {:ok, %User{} = user}} -> @@ -1850,8 +1850,8 @@ defmodule Pleroma.User do    def public_key(_), do: {:error, "key not found"} -  def get_public_key_for_ap_id(ap_id, opts \\ []) do -    with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id, opts), +  def get_public_key_for_ap_id(ap_id) do +    with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id),           {:ok, public_key} <- public_key(user) do        {:ok, public_key}      else diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index d8f685d38..35f71b7ae 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -1289,12 +1289,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    def fetch_follow_information_for_user(user) do      with {:ok, following_data} <- -           Fetcher.fetch_and_contain_remote_object_from_id(user.following_address, -             force_http: true -           ), +           Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),           {:ok, hide_follows} <- collection_private(following_data),           {:ok, followers_data} <- -           Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true), +           Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),           {:ok, hide_followers} <- collection_private(followers_data) do        {:ok,         %{ @@ -1368,8 +1366,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      end    end -  def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do -    with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts), +  def fetch_and_prepare_user_from_ap_id(ap_id) do +    with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),           {:ok, data} <- user_data_from_user_object(data) do        {:ok, maybe_update_follow_information(data)}      else @@ -1412,13 +1410,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      end    end -  def make_user_from_ap_id(ap_id, opts \\ []) do +  def make_user_from_ap_id(ap_id) do      user = User.get_cached_by_ap_id(ap_id)      if user && !User.ap_enabled?(user) do        Transmogrifier.upgrade_user_from_ap_id(ap_id)      else -      with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do +      with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do          if user do            user            |> User.remote_user_changeset(data) diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index a2930c1cd..5ab3562bf 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -13,7 +13,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do    alias Pleroma.User    alias Pleroma.Web.ActivityPub.Relay    alias Pleroma.Web.ActivityPub.Transmogrifier -  alias Pleroma.Web.FedSockets    require Pleroma.Constants @@ -50,28 +49,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do    """    def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do      Logger.debug("Federating #{id} to #{inbox}") - -    case FedSockets.get_or_create_fed_socket(inbox) do -      {:ok, fedsocket} -> -        Logger.debug("publishing via fedsockets - #{inspect(inbox)}") -        FedSockets.publish(fedsocket, json) - -      _ -> -        Logger.debug("publishing via http - #{inspect(inbox)}") -        http_publish(inbox, actor, json, params) -    end -  end - -  def publish_one(%{actor_id: actor_id} = params) do -    actor = User.get_cached_by_id(actor_id) - -    params -    |> Map.delete(:actor_id) -    |> Map.put(:actor, actor) -    |> publish_one() -  end - -  defp http_publish(inbox, actor, json, params) do      uri = %{path: path} = URI.parse(inbox)      digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) @@ -110,6 +87,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do      end    end +  def publish_one(%{actor_id: actor_id} = params) do +    actor = User.get_cached_by_id(actor_id) + +    params +    |> Map.delete(:actor_id) +    |> Map.put(:actor, actor) +    |> publish_one() +  end +    defp signature_host(%URI{port: port, scheme: scheme, host: host}) do      if port == URI.default_port(scheme) do        host diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 0bcd1db22..565d32433 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -1008,7 +1008,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do    def upgrade_user_from_ap_id(ap_id) do      with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id), -         {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id, force_http: true), +         {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id),           {:ok, user} <- update_user(user, data) do        TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})        {:ok, user} diff --git a/lib/pleroma/web/fed_sockets.ex b/lib/pleroma/web/fed_sockets.ex deleted file mode 100644 index 1fd5899c8..000000000 --- a/lib/pleroma/web/fed_sockets.ex +++ /dev/null @@ -1,185 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets do -  @moduledoc """ -  This documents the FedSockets framework. A framework for federating -  ActivityPub objects between servers via persistant WebSocket connections. - -  FedSockets allow servers to authenticate on first contact and maintain that -  connection, eliminating the need to authenticate every time data needs to be shared. - -  ## Protocol -  FedSockets currently support 2 types of data transfer: -    * `publish` method which doesn't require a response -    * `fetch` method requires a response be sent - -    ### Publish -    The publish operation sends a json encoded map of the shape: -      %{action: :publish, data: json} -    and accepts (but does not require) a reply of form: -      %{"action" => "publish_reply"} - -    The outgoing params represent -      * data: ActivityPub object encoded into json - - -    ### Fetch -    The fetch operation sends a json encoded map of the shape: -      %{action: :fetch, data: id, uuid: fetch_uuid} -    and requires a reply of form: -      %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} - -    The outgoing params represent -      * id: an ActivityPub object URI -      * uuid: a unique uuid generated by the sender - -    The reply params represent -      * data: an ActivityPub object encoded into json -      * uuid: the uuid sent along with the fetch request - -  ## Examples -  Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module. - -  A typical publish operation can be performed through the following code, and a fetch operation in a similar manner. - -    case FedSockets.get_or_create_fed_socket(inbox) do -      {:ok, fedsocket} -> -        FedSockets.publish(fedsocket, json) - -      _ -> -        alternative_publish(inbox, actor, json, params) -    end - -  ## Configuration -  FedSockets have the following config settings - -  config :pleroma, :fed_sockets, -  enabled: true, -  ping_interval: :timer.seconds(15), -  connection_duration: :timer.hours(1), -  rejection_duration: :timer.hours(1), -  fed_socket_fetches: [ -    default: 12_000, -    interval: 3_000, -    lazy: false -  ] -    * enabled - turn FedSockets on or off with this flag. Can be toggled at runtime. -    * connection_duration - How long a FedSocket can sit idle before it's culled. -    * rejection_duration - After failing to make a FedSocket connection a host will be excluded -    from further connections for this amount of time -    * fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry -    * fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry - -    Cachex options are -      * default: the minimum amount of time a fetch can wait before it times out. -      * interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed -      * lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement - -  """ -  require Logger - -  alias Pleroma.Web.FedSockets.FedRegistry -  alias Pleroma.Web.FedSockets.FedSocket -  alias Pleroma.Web.FedSockets.SocketInfo - -  @doc """ -  returns a FedSocket for the given origin. Will reuse an existing one or create a new one. - -  address is expected to be a fully formed URL such as: -  "http://www.example.com" or "http://www.example.com:8080" - -  It can and usually does include additional path parameters, -  but these are ignored as the FedSockets are organized by host and port info alone. -  """ -  def get_or_create_fed_socket(address) do -    with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)}, -         {:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)}, -         {:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do -      Logger.debug("fedsocket created for - #{inspect(address)}") -      {:ok, fed_socket} -    else -      {:cache, {:ok, socket}} -> -        Logger.debug("fedsocket found in cache - #{inspect(address)}") -        {:ok, socket} - -      {:cache, {:error, :rejected} = e} -> -        e - -      {:connect, {:error, _host}} -> -        Logger.debug("set host rejected for - #{inspect(address)}") -        FedRegistry.set_host_rejected(address) -        {:error, :rejected} - -      {_, {:error, :disabled}} -> -        {:error, :disabled} - -      {_, {:error, reason}} -> -        Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}") -        {:error, reason} -    end -  end - -  @doc """ -  returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist. - -  address is expected to be a fully formed URL such as: -    "http://www.example.com" or "http://www.example.com:8080" -  """ -  def get_fed_socket(address) do -    origin = SocketInfo.origin(address) - -    with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)}, -         {:ok, socket} <- FedRegistry.get_fed_socket(origin) do -      {:ok, socket} -    else -      {:config, _} -> -        {:error, :disabled} - -      {:error, :rejected} -> -        Logger.debug("FedSocket previously rejected - #{inspect(origin)}") -        {:error, :rejected} - -      {:error, reason} -> -        {:error, reason} -    end -  end - -  @doc """ -  Sends the supplied data via the publish protocol. -  It will not block waiting for a reply. -  Returns :ok but this is not an indication of a successful transfer. - -  the data is expected to be JSON encoded binary data. -  """ -  def publish(%SocketInfo{} = fed_socket, json) do -    FedSocket.publish(fed_socket, json) -  end - -  @doc """ -  Sends the supplied data via the fetch protocol. -  It will block waiting for a reply or timeout. - -  Returns {:ok, object} where object is the requested object (or nil) -          {:error, :timeout} in the event the message was not responded to - -  the id is expected to be the URI of an ActivityPub object. -  """ -  def fetch(%SocketInfo{} = fed_socket, id) do -    FedSocket.fetch(fed_socket, id) -  end - -  @doc """ -  Disconnect all and restart FedSockets. -  This is mainly used in development and testing but could be useful in production. -  """ -  def reset do -    FedRegistry -    |> Process.whereis() -    |> Process.exit(:testing) -  end - -  def uri_for_origin(origin), -    do: "ws://#{origin}/api/fedsocket/v1" -end diff --git a/lib/pleroma/web/fed_sockets/fed_registry.ex b/lib/pleroma/web/fed_sockets/fed_registry.ex deleted file mode 100644 index e00ea69c0..000000000 --- a/lib/pleroma/web/fed_sockets/fed_registry.ex +++ /dev/null @@ -1,185 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.FedRegistry do -  @moduledoc """ -  The FedRegistry stores the active FedSockets for quick retrieval. - -  The storage and retrieval portion of the FedRegistry is done in process through -  elixir's `Registry` module for speed and its ability to monitor for terminated processes. - -  Dropped connections will be caught by `Registry` and deleted. Since the next -  message will initiate a new connection there is no reason to try and reconnect at that point. - -  Normally outside modules should have no need to call or use the FedRegistry themselves. -  """ - -  alias Pleroma.Web.FedSockets.FedSocket -  alias Pleroma.Web.FedSockets.SocketInfo - -  require Logger - -  @default_rejection_duration 15 * 60 * 1000 -  @rejections :fed_socket_rejections - -  @doc """ -  Retrieves a FedSocket from the Registry given it's origin. - -  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080" - -  Will return: -    * {:ok, fed_socket} for working FedSockets -    * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval -    * {:error, some_reason} usually :missing for unknown origins -  """ -  def get_fed_socket(origin) do -    case get_registry_data(origin) do -      {:error, reason} -> -        {:error, reason} - -      {:ok, %{state: :connected} = socket_info} -> -        {:ok, socket_info} -    end -  end - -  @doc """ -  Adds a connected FedSocket to the Registry. - -  Always returns {:ok, fed_socket} -  """ -  def add_fed_socket(origin, pid \\ nil) do -    origin -    |> SocketInfo.build(pid) -    |> SocketInfo.connect() -    |> add_socket_info -  end - -  defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do -    case Registry.register(FedSockets.Registry, origin, socket_info) do -      {:ok, _owner} -> -        clear_prior_rejection(origin) -        Logger.debug("fedsocket added: #{inspect(origin)}") - -        {:ok, socket_info} - -      {:error, {:already_registered, _pid}} -> -        FedSocket.close(socket_info) -        existing_socket_info = Registry.lookup(FedSockets.Registry, origin) - -        {:ok, existing_socket_info} - -      _ -> -        {:error, :error_adding_socket} -    end -  end - -  @doc """ -  Mark this origin as having rejected a connection attempt. -  This will keep it from getting additional connection attempts -  for a period of time specified in the config. - -  Always returns {:ok, new_reg_data} -  """ -  def set_host_rejected(uri) do -    new_reg_data = -      uri -      |> SocketInfo.origin() -      |> get_or_create_registry_data() -      |> set_to_rejected() -      |> save_registry_data() - -    {:ok, new_reg_data} -  end - -  @doc """ -  Retrieves the FedRegistryData from the Registry given it's origin. - -  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080" - -  Will return: -    * {:ok, fed_registry_data} for known origins -    * {:error, :missing} for uniknown origins -    * {:error, :cache_error} indicating some low level runtime issues -  """ -  def get_registry_data(origin) do -    case Registry.lookup(FedSockets.Registry, origin) do -      [] -> -        if is_rejected?(origin) do -          Logger.debug("previously rejected fedsocket requested") -          {:error, :rejected} -        else -          {:error, :missing} -        end - -      [{_pid, %{state: :connected} = socket_info}] -> -        {:ok, socket_info} - -      _ -> -        {:error, :cache_error} -    end -  end - -  @doc """ -  Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo -  """ -  def list_all do -    (list_all_connected() ++ list_all_rejected()) -    |> Enum.into(%{}) -  end - -  defp list_all_connected do -    FedSockets.Registry -    |> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}]) -  end - -  defp list_all_rejected do -    {:ok, keys} = Cachex.keys(@rejections) - -    {:ok, registry_data} = -      Cachex.execute(@rejections, fn worker -> -        Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end) -      end) - -    registry_data -  end - -  defp clear_prior_rejection(origin), -    do: Cachex.del(@rejections, origin) - -  defp is_rejected?(origin) do -    case Cachex.get(@rejections, origin) do -      {:ok, nil} -> -        false - -      {:ok, _} -> -        true -    end -  end - -  defp get_or_create_registry_data(origin) do -    case get_registry_data(origin) do -      {:error, :missing} -> -        %SocketInfo{origin: origin} - -      {:ok, socket_info} -> -        socket_info -    end -  end - -  defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do -    {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end) -    socket_info -  end - -  defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do -    rejection_expiration = -      Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration) - -    {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration) -    socket_info -  end - -  defp set_to_rejected(%SocketInfo{} = socket_info), -    do: %SocketInfo{socket_info | state: :rejected} -end diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex deleted file mode 100644 index 98d64e65a..000000000 --- a/lib/pleroma/web/fed_sockets/fed_socket.ex +++ /dev/null @@ -1,137 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.FedSocket do -  @moduledoc """ -  The FedSocket module abstracts the actions to be taken taken on connections regardless of -  whether the connection started as inbound or outbound. - - -  Normally outside modules will have no need to call the FedSocket module directly. -  """ - -  alias Pleroma.Object -  alias Pleroma.Object.Containment -  alias Pleroma.User -  alias Pleroma.Web.ActivityPub.ObjectView -  alias Pleroma.Web.ActivityPub.UserView -  alias Pleroma.Web.ActivityPub.Visibility -  alias Pleroma.Web.FedSockets.FetchRegistry -  alias Pleroma.Web.FedSockets.IngesterWorker -  alias Pleroma.Web.FedSockets.OutgoingHandler -  alias Pleroma.Web.FedSockets.SocketInfo - -  require Logger - -  @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103" - -  def connect_to_host(uri) do -    case OutgoingHandler.start_link(uri) do -      {:ok, pid} -> -        {:ok, pid} - -      error -> -        {:error, error} -    end -  end - -  def close(%SocketInfo{pid: socket_pid}), -    do: Process.send(socket_pid, :close, []) - -  def publish(%SocketInfo{pid: socket_pid}, json) do -    %{action: :publish, data: json} -    |> Jason.encode!() -    |> send_packet(socket_pid) -  end - -  def fetch(%SocketInfo{pid: socket_pid}, id) do -    fetch_uuid = FetchRegistry.register_fetch(id) - -    %{action: :fetch, data: id, uuid: fetch_uuid} -    |> Jason.encode!() -    |> send_packet(socket_pid) - -    wait_for_fetch_to_return(fetch_uuid, 0) -  end - -  def receive_package(%SocketInfo{} = fed_socket, json) do -    json -    |> Jason.decode!() -    |> process_package(fed_socket) -  end - -  defp wait_for_fetch_to_return(uuid, cntr) do -    case FetchRegistry.check_fetch(uuid) do -      {:error, :waiting} -> -        Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc()) -        wait_for_fetch_to_return(uuid, cntr + 1) - -      {:error, :missing} -> -        Logger.error("FedSocket fetch timed out - #{inspect(uuid)}") -        {:error, :timeout} - -      {:ok, _fr} -> -        FetchRegistry.pop_fetch(uuid) -    end -  end - -  defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do -    if Containment.contain_origin(origin, data) do -      IngesterWorker.enqueue("ingest", %{"object" => data}) -    end - -    {:reply, %{"action" => "publish_reply", "status" => "processed"}} -  end - -  defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do -    FetchRegistry.register_fetch_received(uuid, data) -    {:noreply, nil} -  end - -  defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do -    {:ok, data} = render_fetched_data(ap_id, uuid) -    {:reply, data} -  end - -  defp process_package(%{"action" => "publish_reply"}, _fed_socket) do -    {:noreply, nil} -  end - -  defp process_package(other, _fed_socket) do -    Logger.warn("unknown json packages received #{inspect(other)}") -    {:noreply, nil} -  end - -  defp render_fetched_data(ap_id, uuid) do -    {:ok, -     %{ -       "action" => "fetch_reply", -       "status" => "processed", -       "uuid" => uuid, -       "data" => represent_item(ap_id) -     }} -  end - -  defp represent_item(ap_id) do -    case User.get_by_ap_id(ap_id) do -      nil -> -        object = Object.get_cached_by_ap_id(ap_id) - -        if Visibility.is_public?(object) do -          Phoenix.View.render_to_string(ObjectView, "object.json", object: object) -        else -          nil -        end - -      user -> -        Phoenix.View.render_to_string(UserView, "user.json", user: user) -    end -  end - -  defp send_packet(data, socket_pid) do -    Process.send(socket_pid, {:send, data}, []) -  end - -  def shake, do: @shake -end diff --git a/lib/pleroma/web/fed_sockets/fetch_registry.ex b/lib/pleroma/web/fed_sockets/fetch_registry.ex deleted file mode 100644 index 7897f0fc6..000000000 --- a/lib/pleroma/web/fed_sockets/fetch_registry.ex +++ /dev/null @@ -1,151 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.FetchRegistry do -  @moduledoc """ -  The FetchRegistry acts as a broker for fetch requests and return values. -  This allows calling processes to block while waiting for a reply. -  It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing -  multi threaded processes to avoid bottlenecking. - -  Normally outside modules will have no need to call or use the FetchRegistry themselves. - -  The `Cachex` parameters can be controlled from the config. Since exact timeout intervals -  aren't necessary the following settings are used by default: - -  config :pleroma, :fed_sockets, -    fed_socket_fetches: [ -      default: 12_000, -      interval: 3_000, -      lazy: false -    ] - -  """ - -  defmodule FetchRegistryData do -    defstruct uuid: nil, -              sent_json: nil, -              received_json: nil, -              sent_at: nil, -              received_at: nil -  end - -  alias Ecto.UUID - -  require Logger - -  @fetches :fed_socket_fetches - -  @doc """ -  Registers a json request wth the FetchRegistry and returns the identifying UUID. -  """ -  def register_fetch(json) do -    %FetchRegistryData{uuid: uuid} = -      json -      |> new_registry_data -      |> save_registry_data - -    uuid -  end - -  @doc """ -  Reports on the status of a Fetch given the identifying UUID. - -  Will return -    * {:ok, fetched_object} if a fetch has completed -    * {:error, :waiting} if a fetch is still pending -    * {:error, other_error} usually :missing to indicate a fetch that has timed out -  """ -  def check_fetch(uuid) do -    case get_registry_data(uuid) do -      {:ok, %FetchRegistryData{received_at: nil}} -> -        {:error, :waiting} - -      {:ok, %FetchRegistryData{} = reg_data} -> -        {:ok, reg_data} - -      e -> -        e -    end -  end - -  @doc """ -  Retrieves the response to a fetch given the identifying UUID. -  The completed fetch will be deleted from the FetchRegistry - -  Will return -    * {:ok, fetched_object} if a fetch has completed -    * {:error, :waiting} if a fetch is still pending -    * {:error, other_error} usually :missing to indicate a fetch that has timed out -  """ -  def pop_fetch(uuid) do -    case check_fetch(uuid) do -      {:ok, %FetchRegistryData{received_json: received_json}} -> -        delete_registry_data(uuid) -        {:ok, received_json} - -      e -> -        e -    end -  end - -  @doc """ -  This is called to register a fetch has returned. -  It expects the result data along with the UUID that was sent in the request - -  Will return the fetched object or :error -  """ -  def register_fetch_received(uuid, data) do -    case get_registry_data(uuid) do -      {:ok, %FetchRegistryData{received_at: nil} = reg_data} -> -        reg_data -        |> set_fetch_received(data) -        |> save_registry_data() - -      {:ok, %FetchRegistryData{} = reg_data} -> -        Logger.warn("tried to add fetched data twice - #{uuid}") -        reg_data - -      {:error, _} -> -        Logger.warn("Error adding fetch to registry - #{uuid}") -        :error -    end -  end - -  defp new_registry_data(json) do -    %FetchRegistryData{ -      uuid: UUID.generate(), -      sent_json: json, -      sent_at: :erlang.monotonic_time(:millisecond) -    } -  end - -  defp get_registry_data(origin) do -    case Cachex.get(@fetches, origin) do -      {:ok, nil} -> -        {:error, :missing} - -      {:ok, reg_data} -> -        {:ok, reg_data} - -      _ -> -        {:error, :cache_error} -    end -  end - -  defp set_fetch_received(%FetchRegistryData{} = reg_data, data), -    do: %FetchRegistryData{ -      reg_data -      | received_at: :erlang.monotonic_time(:millisecond), -        received_json: data -    } - -  defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do -    {:ok, true} = Cachex.put(@fetches, uuid, reg_data) -    reg_data -  end - -  defp delete_registry_data(origin), -    do: {:ok, true} = Cachex.del(@fetches, origin) -end diff --git a/lib/pleroma/web/fed_sockets/incoming_handler.ex b/lib/pleroma/web/fed_sockets/incoming_handler.ex deleted file mode 100644 index 49d0d9d84..000000000 --- a/lib/pleroma/web/fed_sockets/incoming_handler.ex +++ /dev/null @@ -1,88 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.IncomingHandler do -  require Logger - -  alias Pleroma.Web.FedSockets.FedRegistry -  alias Pleroma.Web.FedSockets.FedSocket -  alias Pleroma.Web.FedSockets.SocketInfo - -  import HTTPSignatures, only: [validate_conn: 1, split_signature: 1] - -  @behaviour :cowboy_websocket - -  def init(req, state) do -    shake = FedSocket.shake() - -    with true <- Pleroma.Config.get([:fed_sockets, :enabled]), -         sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil), -         headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req), -         true <- validate_conn(%{req_headers: headers}), -         %{"keyId" => origin} <- split_signature(headers["signature"]) do -      req = -        if is_nil(sec_protocol) do -          req -        else -          :cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req) -        end - -      {:cowboy_websocket, req, %{origin: origin}, %{}} -    else -      _ -> -        {:ok, req, state} -    end -  end - -  def websocket_init(%{origin: origin}) do -    case FedRegistry.add_fed_socket(origin) do -      {:ok, socket_info} -> -        {:ok, socket_info} - -      e -> -        Logger.error("FedSocket websocket_init failed - #{inspect(e)}") -        {:error, inspect(e)} -    end -  end - -  # Use the ping to  check if the connection should be expired -  def websocket_handle(:ping, socket_info) do -    if SocketInfo.expired?(socket_info) do -      {:stop, socket_info} -    else -      {:ok, socket_info, :hibernate} -    end -  end - -  def websocket_handle({:text, data}, socket_info) do -    socket_info = SocketInfo.touch(socket_info) - -    case FedSocket.receive_package(socket_info, data) do -      {:noreply, _} -> -        {:ok, socket_info} - -      {:reply, reply} -> -        {:reply, {:text, Jason.encode!(reply)}, socket_info} - -      {:error, reason} -> -        Logger.error("incoming error - receive_package: #{inspect(reason)}") -        {:ok, socket_info} -    end -  end - -  def websocket_info({:send, message}, socket_info) do -    socket_info = SocketInfo.touch(socket_info) - -    {:reply, {:text, message}, socket_info} -  end - -  def websocket_info(:close, state) do -    {:stop, state} -  end - -  def websocket_info(message, state) do -    Logger.debug("#{__MODULE__} unknown message #{inspect(message)}") -    {:ok, state} -  end -end diff --git a/lib/pleroma/web/fed_sockets/ingester_worker.ex b/lib/pleroma/web/fed_sockets/ingester_worker.ex deleted file mode 100644 index 325f2a4ab..000000000 --- a/lib/pleroma/web/fed_sockets/ingester_worker.ex +++ /dev/null @@ -1,33 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.IngesterWorker do -  use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue" -  require Logger - -  alias Pleroma.Web.Federator - -  @impl Oban.Worker -  def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do -    try do -      ingestee -      |> Jason.decode!() -      |> do_ingestion() -    rescue -      e -> -        Logger.error("IngesterWorker error - #{inspect(e)}") -        e -    end -  end - -  defp do_ingestion(params) do -    case Federator.incoming_ap_doc(params) do -      {:error, reason} -> -        {:error, reason} - -      {:ok, object} -> -        {:ok, object} -    end -  end -end diff --git a/lib/pleroma/web/fed_sockets/outgoing_handler.ex b/lib/pleroma/web/fed_sockets/outgoing_handler.ex deleted file mode 100644 index e235a7c43..000000000 --- a/lib/pleroma/web/fed_sockets/outgoing_handler.ex +++ /dev/null @@ -1,151 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.OutgoingHandler do -  use GenServer - -  require Logger - -  alias Pleroma.Application -  alias Pleroma.Web.ActivityPub.InternalFetchActor -  alias Pleroma.Web.FedSockets -  alias Pleroma.Web.FedSockets.FedRegistry -  alias Pleroma.Web.FedSockets.FedSocket -  alias Pleroma.Web.FedSockets.SocketInfo - -  def start_link(uri) do -    GenServer.start_link(__MODULE__, %{uri: uri}) -  end - -  def init(%{uri: uri}) do -    case initiate_connection(uri) do -      {:ok, ws_origin, conn_pid} -> -        FedRegistry.add_fed_socket(ws_origin, conn_pid) - -      {:error, reason} -> -        Logger.debug("Outgoing connection failed - #{inspect(reason)}") -        :ignore -    end -  end - -  def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do -    socket_info = SocketInfo.touch(socket_info) - -    case FedSocket.receive_package(socket_info, data) do -      {:noreply, _} -> -        {:noreply, socket_info} - -      {:reply, reply} -> -        :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)}) -        {:noreply, socket_info} - -      {:error, reason} -> -        Logger.error("incoming error - receive_package: #{inspect(reason)}") -        {:noreply, socket_info} -    end -  end - -  def handle_info(:close, state) do -    Logger.debug("Sending close frame !!!!!!!") -    {:close, state} -  end - -  def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do -    {:stop, :normal, state} -  end - -  def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do -    socket_info = SocketInfo.touch(socket_info) -    :gun.ws_send(conn_pid, {:text, data}) -    {:noreply, socket_info} -  end - -  def handle_info({:gun_ws, _, _, :pong}, state) do -    {:noreply, state, :hibernate} -  end - -  def handle_info(msg, state) do -    Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}") -    {:noreply, state} -  end - -  def terminate(reason, state) do -    Logger.debug( -      "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}" -    ) - -    {:ok, state} -  end - -  def initiate_connection(uri) do -    ws_uri = -      uri -      |> SocketInfo.origin() -      |> FedSockets.uri_for_origin() - -    %{host: host, port: port, path: path} = URI.parse(ws_uri) - -    with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}), -         {:ok, _} <- :gun.await_up(conn_pid), -         reference <- -           :gun.get(conn_pid, to_charlist(path), [ -             {'user-agent', to_charlist(Application.user_agent())} -           ]), -         {:response, :fin, 204, _} <- :gun.await(conn_pid, reference), -         headers <- build_headers(uri), -         ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do -      receive do -        {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> -          {:ok, ws_uri, conn_pid} -      after -        15_000 -> -          Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") -          {:error, :timeout} -      end -    else -      {:response, :nofin, 404, _} -> -        {:error, :fedsockets_not_supported} - -      e -> -        Logger.debug("Fedsocket error connecting to #{inspect(uri)}") -        {:error, e} -    end -  end - -  defp build_headers(uri) do -    host_for_sig = uri |> URI.parse() |> host_signature() - -    shake = FedSocket.shake() -    digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64()) -    date = Pleroma.Signature.signed_date() -    shake_size = byte_size(shake) - -    signature_opts = %{ -      "(request-target)": shake, -      "content-length": to_charlist("#{shake_size}"), -      date: date, -      digest: digest, -      host: host_for_sig -    } - -    signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts) - -    [ -      {'signature', to_charlist(signature)}, -      {'date', date}, -      {'digest', to_charlist(digest)}, -      {'content-length', to_charlist("#{shake_size}")}, -      {to_charlist("(request-target)"), to_charlist(shake)}, -      {'user-agent', to_charlist(Application.user_agent())} -    ] -  end - -  defp host_signature(%{host: host, scheme: scheme, port: port}) do -    if port == URI.default_port(scheme) do -      host -    else -      "#{host}:#{port}" -    end -  end -end diff --git a/lib/pleroma/web/fed_sockets/socket_info.ex b/lib/pleroma/web/fed_sockets/socket_info.ex deleted file mode 100644 index d6fdffe1a..000000000 --- a/lib/pleroma/web/fed_sockets/socket_info.ex +++ /dev/null @@ -1,52 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.SocketInfo do -  defstruct origin: nil, -            pid: nil, -            conn_pid: nil, -            state: :default, -            connected_until: nil - -  alias Pleroma.Web.FedSockets.SocketInfo -  @default_connection_duration 15 * 60 * 1000 - -  def build(uri, conn_pid \\ nil) do -    uri -    |> build_origin() -    |> build_pids(conn_pid) -    |> touch() -  end - -  def touch(%SocketInfo{} = socket_info), -    do: %{socket_info | connected_until: new_ttl()} - -  def connect(%SocketInfo{} = socket_info), -    do: %{socket_info | state: :connected} - -  def expired?(%{connected_until: connected_until}), -    do: connected_until < :erlang.monotonic_time(:millisecond) - -  def origin(uri), -    do: build_origin(uri).origin - -  defp build_pids(socket_info, conn_pid), -    do: struct(socket_info, pid: self(), conn_pid: conn_pid) - -  defp build_origin(uri) when is_binary(uri), -    do: uri |> URI.parse() |> build_origin - -  defp build_origin(%{host: host, port: nil, scheme: scheme}), -    do: build_origin(%{host: host, port: URI.default_port(scheme)}) - -  defp build_origin(%{host: host, port: port}), -    do: %SocketInfo{origin: "#{host}:#{port}"} - -  defp new_ttl do -    connection_duration = -      Pleroma.Config.get([:fed_sockets, :connection_duration], @default_connection_duration) - -    :erlang.monotonic_time(:millisecond) + connection_duration -  end -end diff --git a/lib/pleroma/web/fed_sockets/supervisor.ex b/lib/pleroma/web/fed_sockets/supervisor.ex deleted file mode 100644 index a5f4bebfb..000000000 --- a/lib/pleroma/web/fed_sockets/supervisor.ex +++ /dev/null @@ -1,59 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.Supervisor do -  use Supervisor -  import Cachex.Spec - -  def start_link(opts) do -    Supervisor.start_link(__MODULE__, opts, name: __MODULE__) -  end - -  def init(args) do -    children = [ -      build_cache(:fed_socket_fetches, args), -      build_cache(:fed_socket_rejections, args), -      {Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]} -    ] - -    opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor] -    Supervisor.init(children, opts) -  end - -  defp build_cache(name, args) do -    opts = get_opts(name, args) - -    %{ -      id: String.to_atom("#{name}_cache"), -      start: {Cachex, :start_link, [name, opts]}, -      type: :worker -    } -  end - -  defp get_opts(cache_name, args) -       when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do -    default = get_opts_or_config(args, cache_name, :default, 15_000) -    interval = get_opts_or_config(args, cache_name, :interval, 3_000) -    lazy = get_opts_or_config(args, cache_name, :lazy, false) - -    [expiration: expiration(default: default, interval: interval, lazy: lazy)] -  end - -  defp get_opts(name, args) do -    Keyword.get(args, name, []) -  end - -  defp get_opts_or_config(args, name, key, default) do -    args -    |> Keyword.get(name, []) -    |> Keyword.get(key) -    |> case do -      nil -> -        Pleroma.Config.get([:fed_sockets, name, key], default) - -      value -> -        value -    end -  end -end diff --git a/test/pleroma/web/fed_sockets/fed_registry_test.exs b/test/pleroma/web/fed_sockets/fed_registry_test.exs deleted file mode 100644 index 73aaced46..000000000 --- a/test/pleroma/web/fed_sockets/fed_registry_test.exs +++ /dev/null @@ -1,124 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.FedRegistryTest do -  use ExUnit.Case - -  alias Pleroma.Web.FedSockets -  alias Pleroma.Web.FedSockets.FedRegistry -  alias Pleroma.Web.FedSockets.SocketInfo - -  @good_domain "http://good.domain" -  @good_domain_origin "good.domain:80" - -  setup do -    start_supervised({Pleroma.Web.FedSockets.Supervisor, []}) -    build_test_socket(@good_domain) -    Process.sleep(10) - -    :ok -  end - -  describe "add_fed_socket/1 without conflicting sockets" do -    test "can be added" do -      Process.sleep(10) -      assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin) -      assert origin == "good.domain:80" -    end - -    test "multiple origins can be added" do -      build_test_socket("http://anothergood.domain") -      Process.sleep(10) - -      assert {:ok, %SocketInfo{origin: origin_1}} = -               FedRegistry.get_fed_socket(@good_domain_origin) - -      assert {:ok, %SocketInfo{origin: origin_2}} = -               FedRegistry.get_fed_socket("anothergood.domain:80") - -      assert origin_1 == "good.domain:80" -      assert origin_2 == "anothergood.domain:80" -      assert FedRegistry.list_all() |> Enum.count() == 2 -    end -  end - -  describe "add_fed_socket/1 when duplicate sockets conflict" do -    setup do -      build_test_socket(@good_domain) -      build_test_socket(@good_domain) -      Process.sleep(10) -      :ok -    end - -    test "will be ignored" do -      assert {:ok, %SocketInfo{origin: origin, pid: _pid_one}} = -               FedRegistry.get_fed_socket(@good_domain_origin) - -      assert origin == "good.domain:80" - -      assert FedRegistry.list_all() |> Enum.count() == 1 -    end - -    test "the newer process will be closed" do -      pid_two = build_test_socket(@good_domain) - -      assert {:ok, %SocketInfo{origin: origin, pid: _pid_one}} = -               FedRegistry.get_fed_socket(@good_domain_origin) - -      assert origin == "good.domain:80" -      Process.sleep(10) - -      refute Process.alive?(pid_two) - -      assert FedRegistry.list_all() |> Enum.count() == 1 -    end -  end - -  describe "get_fed_socket/1" do -    test "returns missing for unknown hosts" do -      assert {:error, :missing} = FedRegistry.get_fed_socket("not_a_dmoain") -    end - -    test "returns rejected for hosts previously rejected" do -      "rejected.domain:80" -      |> FedSockets.uri_for_origin() -      |> FedRegistry.set_host_rejected() - -      assert {:error, :rejected} = FedRegistry.get_fed_socket("rejected.domain:80") -    end - -    test "can retrieve a previously added SocketInfo" do -      build_test_socket(@good_domain) -      Process.sleep(10) -      assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin) -      assert origin == "good.domain:80" -    end - -    test "removes references to SocketInfos when the process crashes" do -      assert {:ok, %SocketInfo{origin: origin, pid: pid}} = -               FedRegistry.get_fed_socket(@good_domain_origin) - -      assert origin == "good.domain:80" - -      Process.exit(pid, :testing) -      Process.sleep(100) -      assert {:error, :missing} = FedRegistry.get_fed_socket(@good_domain_origin) -    end -  end - -  def build_test_socket(uri) do -    Kernel.spawn(fn -> fed_socket_almost(uri) end) -  end - -  def fed_socket_almost(origin) do -    FedRegistry.add_fed_socket(origin) - -    receive do -      :close -> -        :ok -    after -      5_000 -> :timeout -    end -  end -end diff --git a/test/pleroma/web/fed_sockets/fetch_registry_test.exs b/test/pleroma/web/fed_sockets/fetch_registry_test.exs deleted file mode 100644 index 7bd2d995a..000000000 --- a/test/pleroma/web/fed_sockets/fetch_registry_test.exs +++ /dev/null @@ -1,67 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.FetchRegistryTest do -  use ExUnit.Case - -  alias Pleroma.Web.FedSockets.FetchRegistry -  alias Pleroma.Web.FedSockets.FetchRegistry.FetchRegistryData - -  @json_message "hello" -  @json_reply "hello back" - -  setup do -    start_supervised( -      {Pleroma.Web.FedSockets.Supervisor, -       [ -         ping_interval: 8, -         connection_duration: 15, -         rejection_duration: 5, -         fed_socket_fetches: [default: 10, interval: 10] -       ]} -    ) - -    :ok -  end - -  test "fetches can be stored" do -    uuid = FetchRegistry.register_fetch(@json_message) - -    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid) -  end - -  test "fetches can return" do -    uuid = FetchRegistry.register_fetch(@json_message) -    task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end) - -    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid) -    Task.await(task) - -    assert {:ok, %FetchRegistryData{received_json: received_json}} = -             FetchRegistry.check_fetch(uuid) - -    assert received_json == @json_reply -  end - -  test "fetches are deleted once popped from stack" do -    uuid = FetchRegistry.register_fetch(@json_message) -    task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end) -    Task.await(task) - -    assert {:ok, %FetchRegistryData{received_json: received_json}} = -             FetchRegistry.check_fetch(uuid) - -    assert received_json == @json_reply -    assert {:ok, @json_reply} = FetchRegistry.pop_fetch(uuid) - -    assert {:error, :missing} = FetchRegistry.check_fetch(uuid) -  end - -  test "fetches can time out" do -    uuid = FetchRegistry.register_fetch(@json_message) -    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid) -    Process.sleep(500) -    assert {:error, :missing} = FetchRegistry.check_fetch(uuid) -  end -end diff --git a/test/pleroma/web/fed_sockets/socket_info_test.exs b/test/pleroma/web/fed_sockets/socket_info_test.exs deleted file mode 100644 index db3d6edcd..000000000 --- a/test/pleroma/web/fed_sockets/socket_info_test.exs +++ /dev/null @@ -1,118 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.SocketInfoTest do -  use ExUnit.Case - -  alias Pleroma.Web.FedSockets -  alias Pleroma.Web.FedSockets.SocketInfo - -  describe "uri_for_origin" do -    test "provides the fed_socket URL given the origin information" do -      endpoint = "example.com:4000" -      assert FedSockets.uri_for_origin(endpoint) =~ "ws://" -      assert FedSockets.uri_for_origin(endpoint) =~ endpoint -    end -  end - -  describe "origin" do -    test "will provide the origin field given a url" do -      endpoint = "example.com:4000" -      assert SocketInfo.origin("ws://#{endpoint}") == endpoint -      assert SocketInfo.origin("http://#{endpoint}") == endpoint -      assert SocketInfo.origin("https://#{endpoint}") == endpoint -    end - -    test "will proide the origin field given a uri" do -      endpoint = "example.com:4000" -      uri = URI.parse("http://#{endpoint}") - -      assert SocketInfo.origin(uri) == endpoint -    end -  end - -  describe "touch" do -    test "will update the TTL" do -      endpoint = "example.com:4000" -      socket = SocketInfo.build("ws://#{endpoint}") -      Process.sleep(2) -      touched_socket = SocketInfo.touch(socket) - -      assert socket.connected_until < touched_socket.connected_until -    end -  end - -  describe "expired?" do -    setup do -      start_supervised( -        {Pleroma.Web.FedSockets.Supervisor, -         [ -           ping_interval: 8, -           connection_duration: 5, -           rejection_duration: 5, -           fed_socket_rejections: [lazy: true] -         ]} -      ) - -      :ok -    end - -    test "tests if the TTL is exceeded" do -      endpoint = "example.com:4000" -      socket = SocketInfo.build("ws://#{endpoint}") -      refute SocketInfo.expired?(socket) -      Process.sleep(10) - -      assert SocketInfo.expired?(socket) -    end -  end - -  describe "creating outgoing connection records" do -    test "can be passed a string" do -      assert %{conn_pid: :pid, origin: _origin} = SocketInfo.build("example.com:4000", :pid) -    end - -    test "can be passed a URI" do -      uri = URI.parse("http://example.com:4000") -      assert %{conn_pid: :pid, origin: origin} = SocketInfo.build(uri, :pid) -      assert origin =~ "example.com:4000" -    end - -    test "will include the port number" do -      assert %{conn_pid: :pid, origin: origin} = SocketInfo.build("http://example.com:4000", :pid) - -      assert origin =~ ":4000" -    end - -    test "will provide the port if missing" do -      assert %{conn_pid: :pid, origin: "example.com:80"} = -               SocketInfo.build("http://example.com", :pid) - -      assert %{conn_pid: :pid, origin: "example.com:443"} = -               SocketInfo.build("https://example.com", :pid) -    end -  end - -  describe "creating incoming connection records" do -    test "can be passed a string" do -      assert %{pid: _, origin: _origin} = SocketInfo.build("example.com:4000") -    end - -    test "can be passed a URI" do -      uri = URI.parse("example.com:4000") -      assert %{pid: _, origin: _origin} = SocketInfo.build(uri) -    end - -    test "will include the port number" do -      assert %{pid: _, origin: origin} = SocketInfo.build("http://example.com:4000") - -      assert origin =~ ":4000" -    end - -    test "will provide the port if missing" do -      assert %{pid: _, origin: "example.com:80"} = SocketInfo.build("http://example.com") -      assert %{pid: _, origin: "example.com:443"} = SocketInfo.build("https://example.com") -    end -  end -end  | 
