diff options
Diffstat (limited to 'lib')
31 files changed, 1477 insertions, 269 deletions
| diff --git a/lib/mix/tasks/pleroma/benchmark.ex b/lib/mix/tasks/pleroma/benchmark.ex index a4885b70c..dd2b9c8f2 100644 --- a/lib/mix/tasks/pleroma/benchmark.ex +++ b/lib/mix/tasks/pleroma/benchmark.ex @@ -74,4 +74,43 @@ defmodule Mix.Tasks.Pleroma.Benchmark do        inputs: inputs      )    end + +  def run(["adapters"]) do +    start_pleroma() + +    :ok = +      Pleroma.Gun.Conn.open( +        "https://httpbin.org/stream-bytes/1500", +        :gun_connections +      ) + +    Process.sleep(1_500) + +    Benchee.run( +      %{ +        "Without conn and without pool" => fn -> +          {:ok, %Tesla.Env{}} = +            Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], +              adapter: [pool: :no_pool, receive_conn: false] +            ) +        end, +        "Without conn and with pool" => fn -> +          {:ok, %Tesla.Env{}} = +            Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], +              adapter: [receive_conn: false] +            ) +        end, +        "With reused conn and without pool" => fn -> +          {:ok, %Tesla.Env{}} = +            Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], +              adapter: [pool: :no_pool] +            ) +        end, +        "With reused conn and with pool" => fn -> +          {:ok, %Tesla.Env{}} = Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500") +        end +      }, +      parallel: 10 +    ) +  end  end diff --git a/lib/mix/tasks/pleroma/emoji.ex b/lib/mix/tasks/pleroma/emoji.ex index 2b03a3009..429d763c7 100644 --- a/lib/mix/tasks/pleroma/emoji.ex +++ b/lib/mix/tasks/pleroma/emoji.ex @@ -4,13 +4,13 @@  defmodule Mix.Tasks.Pleroma.Emoji do    use Mix.Task +  import Mix.Pleroma    @shortdoc "Manages emoji packs"    @moduledoc File.read!("docs/administration/CLI_tasks/emoji.md")    def run(["ls-packs" | args]) do -    Mix.Pleroma.start_pleroma() -    Application.ensure_all_started(:hackney) +    start_pleroma()      {options, [], []} = parse_global_opts(args) @@ -36,8 +36,7 @@ defmodule Mix.Tasks.Pleroma.Emoji do    end    def run(["get-packs" | args]) do -    Mix.Pleroma.start_pleroma() -    Application.ensure_all_started(:hackney) +    start_pleroma()      {options, pack_names, []} = parse_global_opts(args) @@ -135,7 +134,7 @@ defmodule Mix.Tasks.Pleroma.Emoji do    end    def run(["gen-pack", src]) do -    Application.ensure_all_started(:hackney) +    start_pleroma()      proposed_name = Path.basename(src) |> Path.rootname()      name = String.trim(IO.gets("Pack name [#{proposed_name}]: ")) diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 18854b850..c8a0617a5 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -3,8 +3,12 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Application do -  import Cachex.Spec    use Application + +  import Cachex.Spec + +  alias Pleroma.Config +    require Logger    @name Mix.Project.config()[:name] @@ -18,9 +22,9 @@ defmodule Pleroma.Application do    def repository, do: @repository    def user_agent do -    case Pleroma.Config.get([:http, :user_agent], :default) do +    case Config.get([:http, :user_agent], :default) do        :default -> -        info = "#{Pleroma.Web.base_url()} <#{Pleroma.Config.get([:instance, :email], "")}>" +        info = "#{Pleroma.Web.base_url()} <#{Config.get([:instance, :email], "")}>"          named_version() <> "; " <> info        custom -> @@ -32,27 +36,49 @@ defmodule Pleroma.Application do    # for more information on OTP Applications    def start(_type, _args) do      Pleroma.HTML.compile_scrubbers() -    Pleroma.Config.DeprecationWarnings.warn() +    Config.DeprecationWarnings.warn()      Pleroma.Plugs.HTTPSecurityPlug.warn_if_disabled()      Pleroma.Repo.check_migrations_applied!()      setup_instrumenters()      load_custom_modules() +    if adapter() == Tesla.Adapter.Gun do +      if version = Pleroma.OTPVersion.version() do +        [major, minor] = +          version +          |> String.split(".") +          |> Enum.map(&String.to_integer/1) +          |> Enum.take(2) + +        if (major == 22 and minor < 2) or major < 22 do +          raise " +            !!!OTP VERSION WARNING!!! +            You are using gun adapter with OTP version #{version}, which doesn't support correct handling of unordered certificates chains. +            " +        end +      else +        raise " +          !!!OTP VERSION WARNING!!! +          To support correct handling of unordered certificates chains - OTP version must be > 22.2. +          " +      end +    end +      # Define workers and child supervisors to be supervised      children =        [          Pleroma.Repo, -        Pleroma.Config.TransferTask, +        Config.TransferTask,          Pleroma.Emoji,          Pleroma.Captcha,          Pleroma.Plugs.RateLimiter.Supervisor        ] ++          cachex_children() ++ -        hackney_pool_children() ++ +        http_pools_children(Config.get(:env)) ++          [            Pleroma.Stats,            Pleroma.JobQueueMonitor, -          {Oban, Pleroma.Config.get(Oban)} +          {Oban, Config.get(Oban)}          ] ++          task_children(@env) ++          streamer_child(@env) ++ @@ -69,7 +95,7 @@ defmodule Pleroma.Application do    end    def load_custom_modules do -    dir = Pleroma.Config.get([:modules, :runtime_dir]) +    dir = Config.get([:modules, :runtime_dir])      if dir && File.exists?(dir) do        dir @@ -110,20 +136,6 @@ defmodule Pleroma.Application do      Pleroma.Web.Endpoint.Instrumenter.setup()    end -  def enabled_hackney_pools do -    [:media] ++ -      if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do -        [:federation] -      else -        [] -      end ++ -      if Pleroma.Config.get([Pleroma.Upload, :proxy_remote]) do -        [:upload] -      else -        [] -      end -  end -    defp cachex_children do      [        build_cachex("used_captcha", ttl_interval: seconds_valid_interval()), @@ -145,7 +157,7 @@ defmodule Pleroma.Application do      do: expiration(default: :timer.seconds(6 * 60 * 60), interval: :timer.seconds(60))    defp seconds_valid_interval, -    do: :timer.seconds(Pleroma.Config.get!([Pleroma.Captcha, :seconds_valid])) +    do: :timer.seconds(Config.get!([Pleroma.Captcha, :seconds_valid]))    defp build_cachex(type, opts),      do: %{ @@ -154,7 +166,7 @@ defmodule Pleroma.Application do        type: :worker      } -  defp chat_enabled?, do: Pleroma.Config.get([:chat, :enabled]) +  defp chat_enabled?, do: Config.get([:chat, :enabled])    defp streamer_child(:test), do: [] @@ -168,13 +180,6 @@ defmodule Pleroma.Application do    defp chat_child(_, _), do: [] -  defp hackney_pool_children do -    for pool <- enabled_hackney_pools() do -      options = Pleroma.Config.get([:hackney_pools, pool]) -      :hackney_pool.child_spec(pool, options) -    end -  end -    defp task_children(:test) do      [        %{ @@ -199,4 +204,35 @@ defmodule Pleroma.Application do        }      ]    end + +  # start hackney and gun pools in tests +  defp http_pools_children(:test) do +    hackney_options = Config.get([:hackney_pools, :federation]) +    hackney_pool = :hackney_pool.child_spec(:federation, hackney_options) +    [hackney_pool, Pleroma.Pool.Supervisor] +  end + +  defp http_pools_children(_), do: http_pools(adapter()) + +  defp http_pools(Tesla.Adapter.Hackney) do +    pools = [:federation, :media] + +    pools = +      if Config.get([Pleroma.Upload, :proxy_remote]) do +        [:upload | pools] +      else +        pools +      end + +    for pool <- pools do +      options = Config.get([:hackney_pools, pool]) +      :hackney_pool.child_spec(pool, options) +    end +  end + +  defp http_pools(Tesla.Adapter.Gun), do: [Pleroma.Pool.Supervisor] + +  defp http_pools(_), do: [] + +  defp adapter, do: Application.get_env(:tesla, :adapter)  end diff --git a/lib/pleroma/config/config_db.ex b/lib/pleroma/config/config_db.ex index 2b43d4c36..4097ee5b7 100644 --- a/lib/pleroma/config/config_db.ex +++ b/lib/pleroma/config/config_db.ex @@ -278,8 +278,6 @@ defmodule Pleroma.ConfigDB do      }    end -  defp do_convert({:partial_chain, entity}), do: %{"tuple" => [":partial_chain", inspect(entity)]} -    defp do_convert(entity) when is_tuple(entity) do      value =        entity @@ -323,15 +321,6 @@ defmodule Pleroma.ConfigDB do      {:proxy_url, {do_transform_string(type), parse_host(host), port}}    end -  defp do_transform(%{"tuple" => [":partial_chain", entity]}) do -    {partial_chain, []} = -      entity -      |> String.replace(~r/[^\w|^{:,[|^,|^[|^\]^}|^\/|^\.|^"]^\s/, "") -      |> Code.eval_string() - -    {:partial_chain, partial_chain} -  end -    defp do_transform(%{"tuple" => entity}) do      Enum.reduce(entity, {}, fn val, acc -> Tuple.append(acc, do_transform(val)) end)    end diff --git a/lib/pleroma/config/transfer_task.ex b/lib/pleroma/config/transfer_task.ex index 435fc7450..bf1b943d8 100644 --- a/lib/pleroma/config/transfer_task.ex +++ b/lib/pleroma/config/transfer_task.ex @@ -18,7 +18,10 @@ defmodule Pleroma.Config.TransferTask do      {:pleroma, Oban},      {:pleroma, :rate_limit},      {:pleroma, :markup}, -    {:plerome, :streamer} +    {:pleroma, :streamer}, +    {:pleroma, :pools}, +    {:pleroma, :connections_pool}, +    {:tesla, :adapter}    ]    @reboot_time_subkeys [ @@ -78,6 +81,28 @@ defmodule Pleroma.Config.TransferTask do      end    end +  defp group_for_restart(:logger, key, _, merged_value) do +    # change logger configuration in runtime, without restart +    if Keyword.keyword?(merged_value) and +         key not in [:compile_time_application, :backends, :compile_time_purge_matching] do +      Logger.configure_backend(key, merged_value) +    else +      Logger.configure([{key, merged_value}]) +    end + +    nil +  end + +  defp group_for_restart(:tesla, _, _, _), do: :pleroma + +  defp group_for_restart(group, _, _, _) when group != :pleroma, do: group + +  defp group_for_restart(group, key, value, _) do +    if pleroma_need_restart?(group, key, value) do +      group +    end +  end +    defp merge_and_update(setting) do      try do        key = ConfigDB.from_string(setting.key) @@ -99,21 +124,7 @@ defmodule Pleroma.Config.TransferTask do        :ok = update_env(group, key, merged_value) -      if group != :logger do -        if group != :pleroma or pleroma_need_restart?(group, key, value) do -          group -        end -      else -        # change logger configuration in runtime, without restart -        if Keyword.keyword?(merged_value) and -             key not in [:compile_time_application, :backends, :compile_time_purge_matching] do -          Logger.configure_backend(key, merged_value) -        else -          Logger.configure([{key, merged_value}]) -        end - -        nil -      end +      group_for_restart(group, key, value, merged_value)      rescue        error ->          error_msg = diff --git a/lib/pleroma/gun/api.ex b/lib/pleroma/gun/api.ex new file mode 100644 index 000000000..f51cd7db8 --- /dev/null +++ b/lib/pleroma/gun/api.ex @@ -0,0 +1,45 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API do +  @behaviour Pleroma.Gun + +  alias Pleroma.Gun + +  @gun_keys [ +    :connect_timeout, +    :http_opts, +    :http2_opts, +    :protocols, +    :retry, +    :retry_timeout, +    :trace, +    :transport, +    :tls_opts, +    :tcp_opts, +    :socks_opts, +    :ws_opts +  ] + +  @impl Gun +  def open(host, port, opts \\ %{}), do: :gun.open(host, port, Map.take(opts, @gun_keys)) + +  @impl Gun +  defdelegate info(pid), to: :gun + +  @impl Gun +  defdelegate close(pid), to: :gun + +  @impl Gun +  defdelegate await_up(pid, timeout \\ 5_000), to: :gun + +  @impl Gun +  defdelegate connect(pid, opts), to: :gun + +  @impl Gun +  defdelegate await(pid, ref), to: :gun + +  @impl Gun +  defdelegate set_owner(pid, owner), to: :gun +end diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex new file mode 100644 index 000000000..319718690 --- /dev/null +++ b/lib/pleroma/gun/conn.ex @@ -0,0 +1,175 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.Conn do +  @moduledoc """ +  Struct for gun connection data +  """ +  alias Pleroma.Gun +  alias Pleroma.Pool.Connections + +  require Logger + +  @type gun_state :: :up | :down +  @type conn_state :: :active | :idle + +  @type t :: %__MODULE__{ +          conn: pid(), +          gun_state: gun_state(), +          conn_state: conn_state(), +          used_by: [pid()], +          last_reference: pos_integer(), +          crf: float(), +          retries: pos_integer() +        } + +  defstruct conn: nil, +            gun_state: :open, +            conn_state: :init, +            used_by: [], +            last_reference: 0, +            crf: 1, +            retries: 0 + +  @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil +  def open(url, name, opts \\ []) +  def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts) + +  def open(%URI{} = uri, name, opts) do +    pool_opts = Pleroma.Config.get([:connections_pool], []) + +    opts = +      opts +      |> Enum.into(%{}) +      |> Map.put_new(:retry, pool_opts[:retry] || 1) +      |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000) +      |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000) + +    key = "#{uri.scheme}:#{uri.host}:#{uri.port}" + +    Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}") + +    conn_pid = +      if Connections.count(name) < opts[:max_connection] do +        do_open(uri, opts) +      else +        close_least_used_and_do_open(name, uri, opts) +      end + +    if is_pid(conn_pid) do +      conn = %Pleroma.Gun.Conn{ +        conn: conn_pid, +        gun_state: :up, +        conn_state: :active, +        last_reference: :os.system_time(:second) +      } + +      :ok = Gun.set_owner(conn_pid, Process.whereis(name)) +      Connections.add_conn(name, key, conn) +    end +  end + +  defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do +    connect_opts = +      uri +      |> destination_opts() +      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, [])) + +    with open_opts <- Map.delete(opts, :tls_opts), +         {:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts), +         {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]), +         stream <- Gun.connect(conn, connect_opts), +         {:response, :fin, 200, _} <- Gun.await(conn, stream) do +      conn +    else +      error -> +        Logger.warn( +          "Received error on opening connection with http proxy #{ +            Connections.compose_uri_log(uri) +          } #{inspect(error)}" +        ) + +        error +    end +  end + +  defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do +    version = +      proxy_type +      |> to_string() +      |> String.last() +      |> case do +        "4" -> 4 +        _ -> 5 +      end + +    socks_opts = +      uri +      |> destination_opts() +      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, [])) +      |> Map.put(:version, version) + +    opts = +      opts +      |> Map.put(:protocols, [:socks]) +      |> Map.put(:socks_opts, socks_opts) + +    with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts), +         {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do +      conn +    else +      error -> +        Logger.warn( +          "Received error on opening connection with socks proxy #{ +            Connections.compose_uri_log(uri) +          } #{inspect(error)}" +        ) + +        error +    end +  end + +  defp do_open(%URI{host: host, port: port} = uri, opts) do +    host = Pleroma.HTTP.Connection.parse_host(host) + +    with {:ok, conn} <- Gun.open(host, port, opts), +         {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do +      conn +    else +      error -> +        Logger.warn( +          "Received error on opening connection #{Connections.compose_uri_log(uri)} #{ +            inspect(error) +          }" +        ) + +        error +    end +  end + +  defp destination_opts(%URI{host: host, port: port}) do +    host = Pleroma.HTTP.Connection.parse_host(host) +    %{host: host, port: port} +  end + +  defp add_http2_opts(opts, "https", tls_opts) do +    Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts}) +  end + +  defp add_http2_opts(opts, _, _), do: opts + +  defp close_least_used_and_do_open(name, uri, opts) do +    Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}") + +    with [{close_key, least_used} | _conns] <- +           Connections.get_unused_conns(name), +         :ok <- Gun.close(least_used.conn) do +      Connections.remove_conn(name, close_key) + +      do_open(uri, opts) +    else +      [] -> {:error, :pool_overflowed} +    end +  end +end diff --git a/lib/pleroma/gun/gun.ex b/lib/pleroma/gun/gun.ex new file mode 100644 index 000000000..4043e4880 --- /dev/null +++ b/lib/pleroma/gun/gun.ex @@ -0,0 +1,31 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun do +  @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} +  @callback info(pid()) :: map() +  @callback close(pid()) :: :ok +  @callback await_up(pid, pos_integer()) :: {:ok, atom()} | {:error, atom()} +  @callback connect(pid(), map()) :: reference() +  @callback await(pid(), reference()) :: {:response, :fin, 200, []} +  @callback set_owner(pid(), pid()) :: :ok + +  @api Pleroma.Config.get([Pleroma.Gun], Pleroma.Gun.API) + +  defp api, do: @api + +  def open(host, port, opts), do: api().open(host, port, opts) + +  def info(pid), do: api().info(pid) + +  def close(pid), do: api().close(pid) + +  def await_up(pid, timeout \\ 5_000), do: api().await_up(pid, timeout) + +  def connect(pid, opts), do: api().connect(pid, opts) + +  def await(pid, ref), do: api().await(pid, ref) + +  def set_owner(pid, owner), do: api().set_owner(pid, owner) +end diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex new file mode 100644 index 000000000..2c13666ec --- /dev/null +++ b/lib/pleroma/http/adapter_helper.ex @@ -0,0 +1,41 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.HTTP.AdapterHelper do +  alias Pleroma.HTTP.Connection + +  @type proxy :: +          {Connection.host(), pos_integer()} +          | {Connection.proxy_type(), pos_integer()} + +  @callback options(keyword(), URI.t()) :: keyword() +  @callback after_request(keyword()) :: :ok + +  @spec options(keyword(), URI.t()) :: keyword() +  def options(opts, _uri) do +    proxy = Pleroma.Config.get([:http, :proxy_url], nil) +    maybe_add_proxy(opts, format_proxy(proxy)) +  end + +  @spec maybe_get_conn(URI.t(), keyword()) :: keyword() +  def maybe_get_conn(_uri, opts), do: opts + +  @spec after_request(keyword()) :: :ok +  def after_request(_opts), do: :ok + +  @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil +  def format_proxy(nil), do: nil + +  def format_proxy(proxy_url) do +    case Connection.parse_proxy(proxy_url) do +      {:ok, host, port} -> {host, port} +      {:ok, type, host, port} -> {type, host, port} +      _ -> nil +    end +  end + +  @spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword() +  def maybe_add_proxy(opts, nil), do: opts +  def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy) +end diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex new file mode 100644 index 000000000..862e851c0 --- /dev/null +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -0,0 +1,109 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.HTTP.AdapterHelper.Gun do +  @behaviour Pleroma.HTTP.AdapterHelper + +  alias Pleroma.HTTP.AdapterHelper +  alias Pleroma.Pool.Connections + +  require Logger + +  @defaults [ +    connect_timeout: 5_000, +    domain_lookup_timeout: 5_000, +    tls_handshake_timeout: 5_000, +    retry: 1, +    retry_timeout: 1000, +    await_up_timeout: 5_000 +  ] + +  @spec options(keyword(), URI.t()) :: keyword() +  def options(connection_opts \\ [], %URI{} = uri) do +    formatted_proxy = +      Pleroma.Config.get([:http, :proxy_url], nil) +      |> AdapterHelper.format_proxy() + +    config_opts = Pleroma.Config.get([:http, :adapter], []) + +    @defaults +    |> Keyword.merge(config_opts) +    |> add_scheme_opts(uri) +    |> AdapterHelper.maybe_add_proxy(formatted_proxy) +    |> maybe_get_conn(uri, connection_opts) +  end + +  @spec after_request(keyword()) :: :ok +  def after_request(opts) do +    if opts[:conn] && opts[:body_as] != :chunks do +      Connections.checkout(opts[:conn], self(), :gun_connections) +    end + +    :ok +  end + +  defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts + +  defp add_scheme_opts(opts, %URI{scheme: "https", host: host}) do +    adapter_opts = [ +      certificates_verification: true, +      transport: :tls, +      tls_opts: [ +        verify: :verify_peer, +        cacertfile: CAStore.file_path(), +        depth: 20, +        reuse_sessions: false, +        verify_fun: {&:ssl_verify_hostname.verify_fun/3, [check_hostname: format_host(host)]}, +        log_level: :warning +      ] +    ] + +    Keyword.merge(opts, adapter_opts) +  end + +  defp maybe_get_conn(adapter_opts, uri, connection_opts) do +    {receive_conn?, opts} = +      adapter_opts +      |> Keyword.merge(connection_opts) +      |> Keyword.pop(:receive_conn, true) + +    if Connections.alive?(:gun_connections) and receive_conn? do +      try_to_get_conn(uri, opts) +    else +      opts +    end +  end + +  defp try_to_get_conn(uri, opts) do +    case Connections.checkin(uri, :gun_connections) do +      nil -> +        Logger.debug( +          "Gun connections pool checkin was not successful. Trying to open conn for next request." +        ) + +        Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end) +        opts + +      conn when is_pid(conn) -> +        Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}") + +        opts +        |> Keyword.put(:conn, conn) +        |> Keyword.put(:close_conn, false) +    end +  end + +  @spec format_host(String.t()) :: charlist() +  def format_host(host) do +    host_charlist = to_charlist(host) + +    case :inet.parse_address(host_charlist) do +      {:error, :einval} -> +        :idna.encode(host_charlist) + +      {:ok, _ip} -> +        host_charlist +    end +  end +end diff --git a/lib/pleroma/http/adapter_helper/hackney.ex b/lib/pleroma/http/adapter_helper/hackney.ex new file mode 100644 index 000000000..d08afae0c --- /dev/null +++ b/lib/pleroma/http/adapter_helper/hackney.ex @@ -0,0 +1,43 @@ +defmodule Pleroma.HTTP.AdapterHelper.Hackney do +  @behaviour Pleroma.HTTP.AdapterHelper + +  @defaults [ +    connect_timeout: 10_000, +    recv_timeout: 20_000, +    follow_redirect: true, +    force_redirect: true, +    pool: :federation +  ] + +  @spec options(keyword(), URI.t()) :: keyword() +  def options(connection_opts \\ [], %URI{} = uri) do +    proxy = Pleroma.Config.get([:http, :proxy_url], nil) + +    config_opts = Pleroma.Config.get([:http, :adapter], []) + +    @defaults +    |> Keyword.merge(config_opts) +    |> Keyword.merge(connection_opts) +    |> add_scheme_opts(uri) +    |> Pleroma.HTTP.AdapterHelper.maybe_add_proxy(proxy) +  end + +  defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts + +  defp add_scheme_opts(opts, %URI{scheme: "https", host: host}) do +    ssl_opts = [ +      ssl_options: [ +        # Workaround for remote server certificate chain issues +        partial_chain: &:hackney_connect.partial_chain/1, + +        # We don't support TLS v1.3 yet +        versions: [:tlsv1, :"tlsv1.1", :"tlsv1.2"], +        server_name_indication: to_charlist(host) +      ] +    ] + +    Keyword.merge(opts, ssl_opts) +  end + +  def after_request(_), do: :ok +end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index 80e6c30d6..777e5d4c8 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -4,40 +4,106 @@  defmodule Pleroma.HTTP.Connection do    @moduledoc """ -  Connection for http-requests. +  Configure Tesla.Client with default and customized adapter options.    """ -  @hackney_options [ -    connect_timeout: 10_000, -    recv_timeout: 20_000, -    follow_redirect: true, -    force_redirect: true, -    pool: :federation -  ] -  @adapter Application.get_env(:tesla, :adapter) +  alias Pleroma.Config +  alias Pleroma.HTTP.AdapterHelper -  @doc """ -  Configure a client connection +  require Logger + +  @defaults [pool: :federation] -  # Returns +  @type ip_address :: ipv4_address() | ipv6_address() +  @type ipv4_address :: {0..255, 0..255, 0..255, 0..255} +  @type ipv6_address :: +          {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535} +  @type proxy_type() :: :socks4 | :socks5 +  @type host() :: charlist() | ip_address() -  Tesla.Env.client +  @doc """ +  Merge default connection & adapter options with received ones.    """ -  @spec new(Keyword.t()) :: Tesla.Env.client() -  def new(opts \\ []) do -    Tesla.client([], {@adapter, hackney_options(opts)}) + +  @spec options(URI.t(), keyword()) :: keyword() +  def options(%URI{} = uri, opts \\ []) do +    @defaults +    |> pool_timeout() +    |> Keyword.merge(opts) +    |> adapter().options(uri) +  end + +  defp pool_timeout(opts) do +    {config_key, default} = +      if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do +        {:pools, Config.get([:pools, :default, :timeout])} +      else +        {:hackney_pools, 10_000} +      end + +    timeout = Config.get([config_key, opts[:pool], :timeout], default) + +    Keyword.merge(opts, timeout: timeout) +  end + +  @spec after_request(keyword()) :: :ok +  def after_request(opts), do: adapter().after_request(opts) + +  defp adapter do +    case Application.get_env(:tesla, :adapter) do +      Tesla.Adapter.Gun -> AdapterHelper.Gun +      Tesla.Adapter.Hackney -> AdapterHelper.Hackney +      _ -> AdapterHelper +    end    end -  # fetch Hackney options -  # -  def hackney_options(opts) do -    options = Keyword.get(opts, :adapter, []) -    adapter_options = Pleroma.Config.get([:http, :adapter], []) -    proxy_url = Pleroma.Config.get([:http, :proxy_url], nil) - -    @hackney_options -    |> Keyword.merge(adapter_options) -    |> Keyword.merge(options) -    |> Keyword.merge(proxy: proxy_url) +  @spec parse_proxy(String.t() | tuple() | nil) :: +          {:ok, host(), pos_integer()} +          | {:ok, proxy_type(), host(), pos_integer()} +          | {:error, atom()} +          | nil + +  def parse_proxy(nil), do: nil + +  def parse_proxy(proxy) when is_binary(proxy) do +    with [host, port] <- String.split(proxy, ":"), +         {port, ""} <- Integer.parse(port) do +      {:ok, parse_host(host), port} +    else +      {_, _} -> +        Logger.warn("parsing port in proxy fail #{inspect(proxy)}") +        {:error, :invalid_proxy_port} + +      :error -> +        Logger.warn("parsing port in proxy fail #{inspect(proxy)}") +        {:error, :invalid_proxy_port} + +      _ -> +        Logger.warn("parsing proxy fail #{inspect(proxy)}") +        {:error, :invalid_proxy} +    end +  end + +  def parse_proxy(proxy) when is_tuple(proxy) do +    with {type, host, port} <- proxy do +      {:ok, type, parse_host(host), port} +    else +      _ -> +        Logger.warn("parsing proxy fail #{inspect(proxy)}") +        {:error, :invalid_proxy} +    end +  end + +  @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address() +  def parse_host(host) when is_list(host), do: host +  def parse_host(host) when is_atom(host), do: to_charlist(host) + +  def parse_host(host) when is_binary(host) do +    host = to_charlist(host) + +    case :inet.parse_address(host) do +      {:error, :einval} -> host +      {:ok, ip} -> ip +    end    end  end diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex index ee5b5e127..466a94adc 100644 --- a/lib/pleroma/http/http.ex +++ b/lib/pleroma/http/http.ex @@ -4,21 +4,47 @@  defmodule Pleroma.HTTP do    @moduledoc """ - +    Wrapper for `Tesla.request/2`.    """    alias Pleroma.HTTP.Connection +  alias Pleroma.HTTP.Request    alias Pleroma.HTTP.RequestBuilder, as: Builder +  alias Tesla.Client +  alias Tesla.Env + +  require Logger    @type t :: __MODULE__    @doc """ -  Builds and perform http request. +  Performs GET request. + +  See `Pleroma.HTTP.request/5` +  """ +  @spec get(Request.url() | nil, Request.headers(), keyword()) :: +          nil | {:ok, Env.t()} | {:error, any()} +  def get(url, headers \\ [], options \\ []) +  def get(nil, _, _), do: nil +  def get(url, headers, options), do: request(:get, url, "", headers, options) + +  @doc """ +  Performs POST request. + +  See `Pleroma.HTTP.request/5` +  """ +  @spec post(Request.url(), String.t(), Request.headers(), keyword()) :: +          {:ok, Env.t()} | {:error, any()} +  def post(url, body, headers \\ [], options \\ []), +    do: request(:post, url, body, headers, options) + +  @doc """ +  Builds and performs http request.    # Arguments:    `method` - :get, :post, :put, :delete -  `url` -  `body` +  `url` - full url +  `body` - request body    `headers` - a keyworld list of headers, e.g. `[{"content-type", "text/plain"}]`    `options` - custom, per-request middleware or adapter options @@ -26,61 +52,67 @@ defmodule Pleroma.HTTP do    `{:ok, %Tesla.Env{}}` or `{:error, error}`    """ -  def request(method, url, body \\ "", headers \\ [], options \\ []) do -    try do -      options = -        process_request_options(options) -        |> process_sni_options(url) - -      params = Keyword.get(options, :params, []) - -      %{} -      |> Builder.method(method) -      |> Builder.headers(headers) -      |> Builder.opts(options) -      |> Builder.url(url) -      |> Builder.add_param(:body, :body, body) -      |> Builder.add_param(:query, :query, params) -      |> Enum.into([]) -      |> (&Tesla.request(Connection.new(options), &1)).() -    rescue -      e -> -        {:error, e} -    catch -      :exit, e -> -        {:error, e} -    end -  end +  @spec request(atom(), Request.url(), String.t(), Request.headers(), keyword()) :: +          {:ok, Env.t()} | {:error, any()} +  def request(method, url, body, headers, options) when is_binary(url) do +    uri = URI.parse(url) +    received_adapter_opts = Keyword.get(options, :adapter, []) +    adapter_opts = Connection.options(uri, received_adapter_opts) +    options = put_in(options[:adapter], adapter_opts) +    params = Keyword.get(options, :params, []) +    request = build_request(method, headers, options, url, body, params) -  defp process_sni_options(options, nil), do: options +    adapter = Application.get_env(:tesla, :adapter) +    client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter) -  defp process_sni_options(options, url) do -    uri = URI.parse(url) -    host = uri.host |> to_charlist() +    pid = Process.whereis(adapter_opts[:pool]) -    case uri.scheme do -      "https" -> options ++ [ssl: [server_name_indication: host]] -      _ -> options -    end -  end +    pool_alive? = +      if adapter == Tesla.Adapter.Gun && pid do +        Process.alive?(pid) +      else +        false +      end + +    request_opts = +      adapter_opts +      |> Enum.into(%{}) +      |> Map.put(:env, Pleroma.Config.get([:env])) +      |> Map.put(:pool_alive?, pool_alive?) + +    response = request(client, request, request_opts) + +    Connection.after_request(adapter_opts) -  def process_request_options(options) do -    Keyword.merge(Pleroma.HTTP.Connection.hackney_options([]), options) +    response    end -  @doc """ -  Performs GET request. +  @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()} +  def request(%Client{} = client, request, %{env: :test}), do: request(client, request) -  See `Pleroma.HTTP.request/5` -  """ -  def get(url, headers \\ [], options \\ []), -    do: request(:get, url, "", headers, options) +  def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request) -  @doc """ -  Performs POST request. +  def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request) -  See `Pleroma.HTTP.request/5` -  """ -  def post(url, body, headers \\ [], options \\ []), -    do: request(:post, url, body, headers, options) +  def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do +    :poolboy.transaction( +      pool, +      &Pleroma.Pool.Request.execute(&1, client, request, timeout), +      timeout +    ) +  end + +  @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()} +  def request(client, request), do: Tesla.request(client, request) + +  defp build_request(method, headers, options, url, body, params) do +    Builder.new() +    |> Builder.method(method) +    |> Builder.headers(headers) +    |> Builder.opts(options) +    |> Builder.url(url) +    |> Builder.add_param(:body, :body, body) +    |> Builder.add_param(:query, :query, params) +    |> Builder.convert_to_keyword() +  end  end diff --git a/lib/pleroma/http/request.ex b/lib/pleroma/http/request.ex new file mode 100644 index 000000000..761bd6ccf --- /dev/null +++ b/lib/pleroma/http/request.ex @@ -0,0 +1,23 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.HTTP.Request do +  @moduledoc """ +  Request struct. +  """ +  defstruct method: :get, url: "", query: [], headers: [], body: "", opts: [] + +  @type method :: :head | :get | :delete | :trace | :options | :post | :put | :patch +  @type url :: String.t() +  @type headers :: [{String.t(), String.t()}] + +  @type t :: %__MODULE__{ +          method: method(), +          url: url(), +          query: keyword(), +          headers: headers(), +          body: String.t(), +          opts: keyword() +        } +end diff --git a/lib/pleroma/http/request_builder.ex b/lib/pleroma/http/request_builder.ex index 77ef4bfd8..2fc876d92 100644 --- a/lib/pleroma/http/request_builder.ex +++ b/lib/pleroma/http/request_builder.ex @@ -7,136 +7,87 @@ defmodule Pleroma.HTTP.RequestBuilder do    Helper functions for building Tesla requests    """ -  @doc """ -  Specify the request method when building a request - -  ## Parameters - -  - request (Map) - Collected request options -  - m (atom) - Request method - -  ## Returns +  alias Pleroma.HTTP.Request +  alias Tesla.Multipart -  Map +  @doc """ +  Creates new request    """ -  @spec method(map(), atom) :: map() -  def method(request, m) do -    Map.put_new(request, :method, m) -  end +  @spec new(Request.t()) :: Request.t() +  def new(%Request{} = request \\ %Request{}), do: request    @doc """    Specify the request method when building a request +  """ +  @spec method(Request.t(), Request.method()) :: Request.t() +  def method(request, m), do: %{request | method: m} -  ## Parameters - -  - request (Map) - Collected request options -  - u (String) - Request URL - -  ## Returns - -  Map +  @doc """ +  Specify the request method when building a request    """ -  @spec url(map(), String.t()) :: map() -  def url(request, u) do -    Map.put_new(request, :url, u) -  end +  @spec url(Request.t(), Request.url()) :: Request.t() +  def url(request, u), do: %{request | url: u}    @doc """    Add headers to the request    """ -  @spec headers(map(), list(tuple)) :: map() -  def headers(request, header_list) do -    header_list = +  @spec headers(Request.t(), Request.headers()) :: Request.t() +  def headers(request, headers) do +    headers_list =        if Pleroma.Config.get([:http, :send_user_agent]) do -        header_list ++ [{"User-Agent", Pleroma.Application.user_agent()}] +        [{"user-agent", Pleroma.Application.user_agent()} | headers]        else -        header_list +        headers        end -    Map.put_new(request, :headers, header_list) +    %{request | headers: headers_list}    end    @doc """    Add custom, per-request middleware or adapter options to the request    """ -  @spec opts(map(), Keyword.t()) :: map() -  def opts(request, options) do -    Map.put_new(request, :opts, options) -  end - -  @doc """ -  Add optional parameters to the request - -  ## Parameters - -  - request (Map) - Collected request options -  - definitions (Map) - Map of parameter name to parameter location. -  - options (KeywordList) - The provided optional parameters - -  ## Returns - -  Map -  """ -  @spec add_optional_params(map(), %{optional(atom) => atom}, keyword()) :: map() -  def add_optional_params(request, _, []), do: request - -  def add_optional_params(request, definitions, [{key, value} | tail]) do -    case definitions do -      %{^key => location} -> -        request -        |> add_param(location, key, value) -        |> add_optional_params(definitions, tail) - -      _ -> -        add_optional_params(request, definitions, tail) -    end -  end +  @spec opts(Request.t(), keyword()) :: Request.t() +  def opts(request, options), do: %{request | opts: options}    @doc """    Add optional parameters to the request - -  ## Parameters - -  - request (Map) - Collected request options -  - location (atom) - Where to put the parameter -  - key (atom) - The name of the parameter -  - value (any) - The value of the parameter - -  ## Returns - -  Map    """ -  @spec add_param(map(), atom, atom, any()) :: map() -  def add_param(request, :query, :query, values), do: Map.put(request, :query, values) +  @spec add_param(Request.t(), atom(), atom(), any()) :: Request.t() +  def add_param(request, :query, :query, values), do: %{request | query: values} -  def add_param(request, :body, :body, value), do: Map.put(request, :body, value) +  def add_param(request, :body, :body, value), do: %{request | body: value}    def add_param(request, :body, key, value) do      request -    |> Map.put_new_lazy(:body, &Tesla.Multipart.new/0) +    |> Map.put(:body, Multipart.new())      |> Map.update!(        :body, -      &Tesla.Multipart.add_field( +      &Multipart.add_field(          &1,          key,          Jason.encode!(value), -        headers: [{:"Content-Type", "application/json"}] +        headers: [{"content-type", "application/json"}]        )      )    end    def add_param(request, :file, name, path) do      request -    |> Map.put_new_lazy(:body, &Tesla.Multipart.new/0) -    |> Map.update!(:body, &Tesla.Multipart.add_file(&1, path, name: name)) +    |> Map.put(:body, Multipart.new()) +    |> Map.update!(:body, &Multipart.add_file(&1, path, name: name))    end    def add_param(request, :form, name, value) do -    request -    |> Map.update(:body, %{name => value}, &Map.put(&1, name, value)) +    Map.update(request, :body, %{name => value}, &Map.put(&1, name, value))    end    def add_param(request, location, key, value) do      Map.update(request, location, [{key, value}], &(&1 ++ [{key, value}]))    end + +  def convert_to_keyword(request) do +    request +    |> Map.from_struct() +    |> Enum.into([]) +  end  end diff --git a/lib/pleroma/object/fetcher.ex b/lib/pleroma/object/fetcher.ex index eaa13d1e7..263ded5dd 100644 --- a/lib/pleroma/object/fetcher.ex +++ b/lib/pleroma/object/fetcher.ex @@ -141,7 +141,7 @@ defmodule Pleroma.Object.Fetcher do          date: date        }) -    [{:Signature, signature}] +    [{"signature", signature}]    end    defp sign_fetch(headers, id, date) do @@ -154,7 +154,7 @@ defmodule Pleroma.Object.Fetcher do    defp maybe_date_fetch(headers, date) do      if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do -      headers ++ [{:Date, date}] +      headers ++ [{"date", date}]      else        headers      end @@ -166,7 +166,7 @@ defmodule Pleroma.Object.Fetcher do      date = Pleroma.Signature.signed_date()      headers = -      [{:Accept, "application/activity+json"}] +      [{"accept", "application/activity+json"}]        |> maybe_date_fetch(date)        |> sign_fetch(id, date) diff --git a/lib/pleroma/otp_version.ex b/lib/pleroma/otp_version.ex new file mode 100644 index 000000000..114d0054f --- /dev/null +++ b/lib/pleroma/otp_version.ex @@ -0,0 +1,28 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.OTPVersion do +  @spec version() :: String.t() | nil +  def version do +    # OTP Version https://erlang.org/doc/system_principles/versions.html#otp-version +    [ +      Path.join(:code.root_dir(), "OTP_VERSION"), +      Path.join([:code.root_dir(), "releases", :erlang.system_info(:otp_release), "OTP_VERSION"]) +    ] +    |> get_version_from_files() +  end + +  @spec get_version_from_files([Path.t()]) :: String.t() | nil +  def get_version_from_files([]), do: nil + +  def get_version_from_files([path | paths]) do +    if File.exists?(path) do +      path +      |> File.read!() +      |> String.replace(~r/\r|\n|\s/, "") +    else +      get_version_from_files(paths) +    end +  end +end diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex new file mode 100644 index 000000000..7529e9240 --- /dev/null +++ b/lib/pleroma/pool/connections.ex @@ -0,0 +1,317 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Pool.Connections do +  use GenServer + +  alias Pleroma.Config +  alias Pleroma.Gun + +  require Logger + +  @type domain :: String.t() +  @type conn :: Pleroma.Gun.Conn.t() + +  @type t :: %__MODULE__{ +          conns: %{domain() => conn()}, +          opts: keyword() +        } + +  defstruct conns: %{}, opts: [] + +  @spec start_link({atom(), keyword()}) :: {:ok, pid()} +  def start_link({name, opts}) do +    GenServer.start_link(__MODULE__, opts, name: name) +  end + +  @impl true +  def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} + +  @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil +  def checkin(url, name) +  def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name) + +  def checkin(%URI{} = uri, name) do +    timeout = Config.get([:connections_pool, :checkin_timeout], 250) + +    GenServer.call(name, {:checkin, uri}, timeout) +  end + +  @spec alive?(atom()) :: boolean() +  def alive?(name) do +    if pid = Process.whereis(name) do +      Process.alive?(pid) +    else +      false +    end +  end + +  @spec get_state(atom()) :: t() +  def get_state(name) do +    GenServer.call(name, :state) +  end + +  @spec count(atom()) :: pos_integer() +  def count(name) do +    GenServer.call(name, :count) +  end + +  @spec get_unused_conns(atom()) :: [{domain(), conn()}] +  def get_unused_conns(name) do +    GenServer.call(name, :unused_conns) +  end + +  @spec checkout(pid(), pid(), atom()) :: :ok +  def checkout(conn, pid, name) do +    GenServer.cast(name, {:checkout, conn, pid}) +  end + +  @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok +  def add_conn(name, key, conn) do +    GenServer.cast(name, {:add_conn, key, conn}) +  end + +  @spec remove_conn(atom(), String.t()) :: :ok +  def remove_conn(name, key) do +    GenServer.cast(name, {:remove_conn, key}) +  end + +  @impl true +  def handle_cast({:add_conn, key, conn}, state) do +    state = put_in(state.conns[key], conn) + +    Process.monitor(conn.conn) +    {:noreply, state} +  end + +  @impl true +  def handle_cast({:checkout, conn_pid, pid}, state) do +    Logger.debug("checkout #{inspect(conn_pid)}") + +    state = +      with true <- Process.alive?(conn_pid), +           {key, conn} <- find_conn(state.conns, conn_pid), +           used_by <- List.keydelete(conn.used_by, pid, 0) do +        conn_state = +          if used_by == [] do +            :idle +          else +            conn.conn_state +          end + +        put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) +      else +        false -> +          Logger.debug("checkout for closed conn #{inspect(conn_pid)}") +          state + +        nil -> +          Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state") +          state +      end + +    {:noreply, state} +  end + +  @impl true +  def handle_cast({:remove_conn, key}, state) do +    state = put_in(state.conns, Map.delete(state.conns, key)) +    {:noreply, state} +  end + +  @impl true +  def handle_call({:checkin, uri}, from, state) do +    key = "#{uri.scheme}:#{uri.host}:#{uri.port}" +    Logger.debug("checkin #{key}") + +    case state.conns[key] do +      %{conn: conn, gun_state: :up} = current_conn -> +        Logger.debug("reusing conn #{key}") + +        time = :os.system_time(:second) +        last_reference = time - current_conn.last_reference +        current_crf = crf(last_reference, 100, current_conn.crf) + +        state = +          put_in(state.conns[key], %{ +            current_conn +            | last_reference: time, +              crf: current_crf, +              conn_state: :active, +              used_by: [from | current_conn.used_by] +          }) + +        {:reply, conn, state} + +      %{gun_state: :down} -> +        {:reply, nil, state} + +      nil -> +        {:reply, nil, state} +    end +  end + +  @impl true +  def handle_call(:state, _from, state), do: {:reply, state, state} + +  @impl true +  def handle_call(:count, _from, state) do +    {:reply, Enum.count(state.conns), state} +  end + +  @impl true +  def handle_call(:unused_conns, _from, state) do +    unused_conns = +      state.conns +      |> Enum.filter(fn {_k, v} -> +        v.conn_state == :idle and v.used_by == [] +      end) +      |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> +        x.crf <= y.crf and x.last_reference <= y.last_reference +      end) + +    {:reply, unused_conns, state} +  end + +  @impl true +  def handle_info({:gun_up, conn_pid, _protocol}, state) do +    state = +      with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid), +           {key, conn} <- find_conn(state.conns, conn_pid, conn_key), +           {true, key} <- {Process.alive?(conn_pid), key} do +        time = :os.system_time(:second) +        last_reference = time - conn.last_reference +        current_crf = crf(last_reference, 100, conn.crf) + +        put_in(state.conns[key], %{ +          conn +          | gun_state: :up, +            last_reference: time, +            crf: current_crf, +            conn_state: :active, +            retries: 0 +        }) +      else +        :error_gun_info -> +          Logger.debug(":gun.info caused error") +          state + +        {false, key} -> +          Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}") + +          put_in( +            state.conns, +            Map.delete(state.conns, key) +          ) + +        nil -> +          Logger.debug(":gun_up message for conn which is not found in state") + +          :ok = Gun.close(conn_pid) + +          state +      end + +    {:noreply, state} +  end + +  @impl true +  def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do +    retries = Config.get([:connections_pool, :retry], 1) +    # we can't get info on this pid, because pid is dead +    state = +      with {key, conn} <- find_conn(state.conns, conn_pid), +           {true, key} <- {Process.alive?(conn_pid), key} do +        if conn.retries == retries do +          Logger.debug("closing conn if retries is eq  #{inspect(conn_pid)}") +          :ok = Gun.close(conn.conn) + +          put_in( +            state.conns, +            Map.delete(state.conns, key) +          ) +        else +          put_in(state.conns[key], %{ +            conn +            | gun_state: :down, +              retries: conn.retries + 1 +          }) +        end +      else +        {false, key} -> +          # gun can send gun_down for closed conn, maybe connection is not closed yet +          Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}") + +          put_in( +            state.conns, +            Map.delete(state.conns, key) +          ) + +        nil -> +          Logger.debug(":gun_down message for conn which is not found in state") + +          :ok = Gun.close(conn_pid) + +          state +      end + +    {:noreply, state} +  end + +  @impl true +  def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do +    Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}") + +    state = +      with {key, conn} <- find_conn(state.conns, conn_pid) do +        Enum.each(conn.used_by, fn {pid, _ref} -> +          Process.exit(pid, reason) +        end) + +        put_in( +          state.conns, +          Map.delete(state.conns, key) +        ) +      else +        nil -> +          Logger.debug(":DOWN message for conn which is not found in state") + +          state +      end + +    {:noreply, state} +  end + +  defp compose_key_gun_info(pid) do +    %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid) + +    host = +      case :inet.ntoa(origin_host) do +        {:error, :einval} -> origin_host +        ip -> ip +      end + +    "#{scheme}:#{host}:#{port}" +  end + +  defp find_conn(conns, conn_pid) do +    Enum.find(conns, fn {_key, conn} -> +      conn.conn == conn_pid +    end) +  end + +  defp find_conn(conns, conn_pid, conn_key) do +    Enum.find(conns, fn {key, conn} -> +      key == conn_key and conn.conn == conn_pid +    end) +  end + +  def crf(current, steps, crf) do +    1 + :math.pow(0.5, current / steps) * crf +  end + +  def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do +    "#{scheme}://#{host}#{path}" +  end +end diff --git a/lib/pleroma/pool/pool.ex b/lib/pleroma/pool/pool.ex new file mode 100644 index 000000000..21a6fbbc5 --- /dev/null +++ b/lib/pleroma/pool/pool.ex @@ -0,0 +1,22 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Pool do +  def child_spec(opts) do +    poolboy_opts = +      opts +      |> Keyword.put(:worker_module, Pleroma.Pool.Request) +      |> Keyword.put(:name, {:local, opts[:name]}) +      |> Keyword.put(:size, opts[:size]) +      |> Keyword.put(:max_overflow, opts[:max_overflow]) + +    %{ +      id: opts[:id] || {__MODULE__, make_ref()}, +      start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]}, +      restart: :permanent, +      shutdown: 5000, +      type: :worker +    } +  end +end diff --git a/lib/pleroma/pool/request.ex b/lib/pleroma/pool/request.ex new file mode 100644 index 000000000..db7c10c01 --- /dev/null +++ b/lib/pleroma/pool/request.ex @@ -0,0 +1,66 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Pool.Request do +  use GenServer + +  require Logger + +  def start_link(args) do +    GenServer.start_link(__MODULE__, args) +  end + +  @impl true +  def init(_), do: {:ok, []} + +  @spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) :: +          {:ok, Tesla.Env.t()} | {:error, any()} +  def execute(pid, client, request, timeout) do +    GenServer.call(pid, {:execute, client, request}, timeout) +  end + +  @impl true +  def handle_call({:execute, client, request}, _from, state) do +    response = Pleroma.HTTP.request(client, request) + +    {:reply, response, state} +  end + +  @impl true +  def handle_info({:gun_data, _conn, _stream, _, _}, state) do +    {:noreply, state} +  end + +  @impl true +  def handle_info({:gun_up, _conn, _protocol}, state) do +    {:noreply, state} +  end + +  @impl true +  def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do +    # don't flush messages here, because gun can reconnect +    {:noreply, state} +  end + +  @impl true +  def handle_info({:gun_error, _conn, _stream, _error}, state) do +    {:noreply, state} +  end + +  @impl true +  def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do +    {:noreply, state} +  end + +  @impl true +  def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do +    {:noreply, state} +  end + +  @impl true +  def handle_info(msg, state) do +    Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}") +    {:noreply, state} +  end +end diff --git a/lib/pleroma/pool/supervisor.ex b/lib/pleroma/pool/supervisor.ex new file mode 100644 index 000000000..8dc5b64b7 --- /dev/null +++ b/lib/pleroma/pool/supervisor.ex @@ -0,0 +1,45 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Pool.Supervisor do +  use Supervisor + +  alias Pleroma.Config +  alias Pleroma.Pool + +  def start_link(args) do +    Supervisor.start_link(__MODULE__, args, name: __MODULE__) +  end + +  def init(_) do +    children = +      [ +        %{ +          id: Pool.Connections, +          start: +            {Pool.Connections, :start_link, [{:gun_connections, Config.get([:connections_pool])}]} +        } +      ] ++ pools() + +    Supervisor.init(children, strategy: :one_for_one) +  end + +  defp pools do +    pools = Config.get(:pools) + +    pools = +      if Config.get([Pleroma.Upload, :proxy_remote]) == false do +        Keyword.delete(pools, :upload) +      else +        pools +      end + +    for {pool_name, pool_opts} <- pools do +      pool_opts +      |> Keyword.put(:id, {Pool, pool_name}) +      |> Keyword.put(:name, pool_name) +      |> Pool.child_spec() +    end +  end +end diff --git a/lib/pleroma/reverse_proxy/client.ex b/lib/pleroma/reverse_proxy/client.ex index 26d14fabd..0d13ff174 100644 --- a/lib/pleroma/reverse_proxy/client.ex +++ b/lib/pleroma/reverse_proxy/client.ex @@ -3,19 +3,23 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.ReverseProxy.Client do -  @callback request(atom(), String.t(), [tuple()], String.t(), list()) :: -              {:ok, pos_integer(), [tuple()], reference() | map()} -              | {:ok, pos_integer(), [tuple()]} +  @type status :: pos_integer() +  @type header_name :: String.t() +  @type header_value :: String.t() +  @type headers :: [{header_name(), header_value()}] + +  @callback request(atom(), String.t(), headers(), String.t(), list()) :: +              {:ok, status(), headers(), reference() | map()} +              | {:ok, status(), headers()}                | {:ok, reference()}                | {:error, term()} -  @callback stream_body(reference() | pid() | map()) :: -              {:ok, binary()} | :done | {:error, String.t()} +  @callback stream_body(map()) :: {:ok, binary(), map()} | :done | {:error, atom() | String.t()}    @callback close(reference() | pid() | map()) :: :ok -  def request(method, url, headers, "", opts \\ []) do -    client().request(method, url, headers, "", opts) +  def request(method, url, headers, body \\ "", opts \\ []) do +    client().request(method, url, headers, body, opts)    end    def stream_body(ref), do: client().stream_body(ref) @@ -23,6 +27,12 @@ defmodule Pleroma.ReverseProxy.Client do    def close(ref), do: client().close(ref)    defp client do -    Pleroma.Config.get([Pleroma.ReverseProxy.Client], :hackney) +    :tesla +    |> Application.get_env(:adapter) +    |> client()    end + +  defp client(Tesla.Adapter.Hackney), do: Pleroma.ReverseProxy.Client.Hackney +  defp client(Tesla.Adapter.Gun), do: Pleroma.ReverseProxy.Client.Tesla +  defp client(_), do: Pleroma.Config.get!(Pleroma.ReverseProxy.Client)  end diff --git a/lib/pleroma/reverse_proxy/client/hackney.ex b/lib/pleroma/reverse_proxy/client/hackney.ex new file mode 100644 index 000000000..e84118a90 --- /dev/null +++ b/lib/pleroma/reverse_proxy/client/hackney.ex @@ -0,0 +1,24 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.ReverseProxy.Client.Hackney do +  @behaviour Pleroma.ReverseProxy.Client + +  @impl true +  def request(method, url, headers, body, opts \\ []) do +    :hackney.request(method, url, headers, body, opts) +  end + +  @impl true +  def stream_body(ref) do +    case :hackney.stream_body(ref) do +      :done -> :done +      {:ok, data} -> {:ok, data, ref} +      {:error, error} -> {:error, error} +    end +  end + +  @impl true +  def close(ref), do: :hackney.close(ref) +end diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex new file mode 100644 index 000000000..dbc6b66a3 --- /dev/null +++ b/lib/pleroma/reverse_proxy/client/tesla.ex @@ -0,0 +1,89 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.ReverseProxy.Client.Tesla do +  @type headers() :: [{String.t(), String.t()}] +  @type status() :: pos_integer() + +  @behaviour Pleroma.ReverseProxy.Client + +  @spec request(atom(), String.t(), headers(), String.t(), keyword()) :: +          {:ok, status(), headers} +          | {:ok, status(), headers, map()} +          | {:error, atom() | String.t()} +          | no_return() + +  @impl true +  def request(method, url, headers, body, opts \\ []) do +    check_adapter() + +    opts = Keyword.merge(opts, body_as: :chunks) + +    with {:ok, response} <- +           Pleroma.HTTP.request( +             method, +             url, +             body, +             headers, +             Keyword.put(opts, :adapter, opts) +           ) do +      if is_map(response.body) and method != :head do +        {:ok, response.status, response.headers, response.body} +      else +        {:ok, response.status, response.headers} +      end +    else +      {:error, error} -> {:error, error} +    end +  end + +  @impl true +  @spec stream_body(map()) :: {:ok, binary(), map()} | {:error, atom() | String.t()} | :done +  def stream_body(%{pid: pid, opts: opts, fin: true}) do +    # if connection was reused, but in tesla were redirects, +    # tesla returns new opened connection, which must be closed manually +    if opts[:old_conn], do: Tesla.Adapter.Gun.close(pid) +    # if there were redirects we need to checkout old conn +    conn = opts[:old_conn] || opts[:conn] + +    if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections) + +    :done +  end + +  def stream_body(client) do +    case read_chunk!(client) do +      {:fin, body} -> +        {:ok, body, Map.put(client, :fin, true)} + +      {:nofin, part} -> +        {:ok, part, client} + +      {:error, error} -> +        {:error, error} +    end +  end + +  defp read_chunk!(%{pid: pid, stream: stream, opts: opts}) do +    adapter = check_adapter() +    adapter.read_chunk(pid, stream, opts) +  end + +  @impl true +  @spec close(map) :: :ok | no_return() +  def close(%{pid: pid}) do +    adapter = check_adapter() +    adapter.close(pid) +  end + +  defp check_adapter do +    adapter = Application.get_env(:tesla, :adapter) + +    unless adapter == Tesla.Adapter.Gun do +      raise "#{adapter} doesn't support reading body in chunks" +    end + +    adapter +  end +end diff --git a/lib/pleroma/reverse_proxy/reverse_proxy.ex b/lib/pleroma/reverse_proxy/reverse_proxy.ex index a281a00dc..8f1aa3200 100644 --- a/lib/pleroma/reverse_proxy/reverse_proxy.ex +++ b/lib/pleroma/reverse_proxy/reverse_proxy.ex @@ -3,8 +3,6 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.ReverseProxy do -  alias Pleroma.HTTP -    @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++                        ~w(if-unmodified-since if-none-match if-range range)    @resp_cache_headers ~w(etag date last-modified cache-control) @@ -61,10 +59,10 @@ defmodule Pleroma.ReverseProxy do    * `req_headers`, `resp_headers` additional headers. -  * `http`: options for [hackney](https://github.com/benoitc/hackney). +  * `http`: options for [gun](https://github.com/ninenines/gun).    """ -  @default_hackney_options [pool: :media] +  @default_options [pool: :media]    @inline_content_types [      "image/gif", @@ -97,11 +95,7 @@ defmodule Pleroma.ReverseProxy do    def call(_conn, _url, _opts \\ [])    def call(conn = %{method: method}, url, opts) when method in @methods do -    hackney_opts = -      Pleroma.HTTP.Connection.hackney_options([]) -      |> Keyword.merge(@default_hackney_options) -      |> Keyword.merge(Keyword.get(opts, :http, [])) -      |> HTTP.process_request_options() +    client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))      req_headers = build_req_headers(conn.req_headers, opts) @@ -113,7 +107,7 @@ defmodule Pleroma.ReverseProxy do        end      with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url), -         {:ok, code, headers, client} <- request(method, url, req_headers, hackney_opts), +         {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),           :ok <-             header_length_constraint(               headers, @@ -159,11 +153,11 @@ defmodule Pleroma.ReverseProxy do      |> halt()    end -  defp request(method, url, headers, hackney_opts) do +  defp request(method, url, headers, opts) do      Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")      method = method |> String.downcase() |> String.to_existing_atom() -    case client().request(method, url, headers, "", hackney_opts) do +    case client().request(method, url, headers, "", opts) do        {:ok, code, headers, client} when code in @valid_resp_codes ->          {:ok, code, downcase_headers(headers), client} @@ -213,7 +207,7 @@ defmodule Pleroma.ReverseProxy do               duration,               Keyword.get(opts, :max_read_duration, @max_read_duration)             ), -         {:ok, data} <- client().stream_body(client), +         {:ok, data, client} <- client().stream_body(client),           {:ok, duration} <- increase_read_duration(duration),           sent_so_far = sent_so_far + byte_size(data),           :ok <- diff --git a/lib/pleroma/web/activity_pub/mrf/anti_followbot_policy.ex b/lib/pleroma/web/activity_pub/mrf/anti_followbot_policy.ex index b3547ecd4..0270b96ae 100644 --- a/lib/pleroma/web/activity_pub/mrf/anti_followbot_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/anti_followbot_policy.ex @@ -1,5 +1,5 @@  # Pleroma: A lightweight social networking server -# Copyright © 2019 Pleroma Authors <https://pleroma.social/> +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.ActivityPub.MRF.AntiFollowbotPolicy do diff --git a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex index d9a0acfd3..dfab105a3 100644 --- a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex @@ -12,17 +12,23 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do    require Logger -  @hackney_options [ -    pool: :media, -    recv_timeout: 10_000 +  @options [ +    pool: :media    ]    def perform(:prefetch, url) do      Logger.debug("Prefetching #{inspect(url)}") +    opts = +      if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do +        Keyword.put(@options, :recv_timeout, 10_000) +      else +        @options +      end +      url      |> MediaProxy.url() -    |> HTTP.get([], adapter: @hackney_options) +    |> HTTP.get([], adapter: opts)    end    def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do diff --git a/lib/pleroma/web/activity_pub/mrf/no_placeholder_text_policy.ex b/lib/pleroma/web/activity_pub/mrf/no_placeholder_text_policy.ex index f67f48ab6..fc3475048 100644 --- a/lib/pleroma/web/activity_pub/mrf/no_placeholder_text_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/no_placeholder_text_policy.ex @@ -1,5 +1,5 @@  # Pleroma: A lightweight social networking server -# Copyright © 2019 Pleroma Authors <https://pleroma.social/> +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.ActivityPub.MRF.NoPlaceholderTextPolicy do diff --git a/lib/pleroma/web/rel_me.ex b/lib/pleroma/web/rel_me.ex index e97c398dc..8e2b51508 100644 --- a/lib/pleroma/web/rel_me.ex +++ b/lib/pleroma/web/rel_me.ex @@ -3,11 +3,9 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.RelMe do -  @hackney_options [ +  @options [      pool: :media, -    recv_timeout: 2_000, -    max_body: 2_000_000, -    with_body: true +    max_body: 2_000_000    ]    if Pleroma.Config.get(:env) == :test do @@ -25,8 +23,18 @@ defmodule Pleroma.Web.RelMe do    def parse(_), do: {:error, "No URL provided"}    defp parse_url(url) do +    opts = +      if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do +        Keyword.merge(@options, +          recv_timeout: 2_000, +          with_body: true +        ) +      else +        @options +      end +      with {:ok, %Tesla.Env{body: html, status: status}} when status in 200..299 <- -           Pleroma.HTTP.get(url, [], adapter: @hackney_options), +           Pleroma.HTTP.get(url, [], adapter: opts),           {:ok, html_tree} <- Floki.parse_document(html),           data <-             Floki.attribute(html_tree, "link[rel~=me]", "href") ++ diff --git a/lib/pleroma/web/rich_media/parser.ex b/lib/pleroma/web/rich_media/parser.ex index 0779065ee..40980def8 100644 --- a/lib/pleroma/web/rich_media/parser.ex +++ b/lib/pleroma/web/rich_media/parser.ex @@ -3,11 +3,9 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.RichMedia.Parser do -  @hackney_options [ +  @options [      pool: :media, -    recv_timeout: 2_000, -    max_body: 2_000_000, -    with_body: true +    max_body: 2_000_000    ]    defp parsers do @@ -77,8 +75,18 @@ defmodule Pleroma.Web.RichMedia.Parser do    end    defp parse_url(url) do +    opts = +      if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do +        Keyword.merge(@options, +          recv_timeout: 2_000, +          with_body: true +        ) +      else +        @options +      end +      try do -      {:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: @hackney_options) +      {:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: opts)        html        |> parse_html() diff --git a/lib/pleroma/web/web_finger/web_finger.ex b/lib/pleroma/web/web_finger/web_finger.ex index 43a81c75d..7ffd0e51b 100644 --- a/lib/pleroma/web/web_finger/web_finger.ex +++ b/lib/pleroma/web/web_finger/web_finger.ex @@ -173,7 +173,8 @@ defmodule Pleroma.Web.WebFinger do        get_template_from_xml(body)      else        _ -> -        with {:ok, %{body: body}} <- HTTP.get("https://#{domain}/.well-known/host-meta", []) do +        with {:ok, %{body: body, status: status}} when status in 200..299 <- +               HTTP.get("https://#{domain}/.well-known/host-meta", []) do            get_template_from_xml(body)          else            e -> {:error, "Can't find LRDD template: #{inspect(e)}"} @@ -205,7 +206,7 @@ defmodule Pleroma.Web.WebFinger do      with response <-             HTTP.get(               address, -             Accept: "application/xrd+xml,application/jrd+json" +             [{"accept", "application/xrd+xml,application/jrd+json"}]             ),           {:ok, %{status: status, body: body}} when status in 200..299 <- response do        doc = XML.parse_document(body) | 
