diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/application.ex | 18 | ||||
| -rw-r--r-- | lib/pleroma/config/transfer_task.ex | 42 | ||||
| -rw-r--r-- | lib/pleroma/gun/conn.ex | 31 | ||||
| -rw-r--r-- | lib/pleroma/http/adapter_helper.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/http/adapter_helper/gun.ex | 33 | ||||
| -rw-r--r-- | lib/pleroma/http/connection.ex | 8 | ||||
| -rw-r--r-- | lib/pleroma/http/http.ex | 5 | ||||
| -rw-r--r-- | lib/pleroma/pool/connections.ex | 94 | 
8 files changed, 88 insertions, 145 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index c8a0617a5..55b5be488 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -42,7 +42,9 @@ defmodule Pleroma.Application do      setup_instrumenters()      load_custom_modules() -    if adapter() == Tesla.Adapter.Gun do +    adapter = Application.get_env(:tesla, :adapter) + +    if adapter == Tesla.Adapter.Gun do        if version = Pleroma.OTPVersion.version() do          [major, minor] =            version @@ -74,7 +76,7 @@ defmodule Pleroma.Application do          Pleroma.Plugs.RateLimiter.Supervisor        ] ++          cachex_children() ++ -        http_pools_children(Config.get(:env)) ++ +        http_children(adapter, @env) ++          [            Pleroma.Stats,            Pleroma.JobQueueMonitor, @@ -206,15 +208,13 @@ defmodule Pleroma.Application do    end    # start hackney and gun pools in tests -  defp http_pools_children(:test) do +  defp http_children(_, :test) do      hackney_options = Config.get([:hackney_pools, :federation])      hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)      [hackney_pool, Pleroma.Pool.Supervisor]    end -  defp http_pools_children(_), do: http_pools(adapter()) - -  defp http_pools(Tesla.Adapter.Hackney) do +  defp http_children(Tesla.Adapter.Hackney, _) do      pools = [:federation, :media]      pools = @@ -230,9 +230,7 @@ defmodule Pleroma.Application do      end    end -  defp http_pools(Tesla.Adapter.Gun), do: [Pleroma.Pool.Supervisor] - -  defp http_pools(_), do: [] +  defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor] -  defp adapter, do: Application.get_env(:tesla, :adapter) +  defp http_children(_, _), do: []  end diff --git a/lib/pleroma/config/transfer_task.ex b/lib/pleroma/config/transfer_task.ex index 4a4c022f0..b6d80adb7 100644 --- a/lib/pleroma/config/transfer_task.ex +++ b/lib/pleroma/config/transfer_task.ex @@ -5,6 +5,7 @@  defmodule Pleroma.Config.TransferTask do    use Task +  alias Pleroma.Config    alias Pleroma.ConfigDB    alias Pleroma.Repo @@ -36,36 +37,31 @@ defmodule Pleroma.Config.TransferTask do    def start_link(_) do      load_and_update_env() -    if Pleroma.Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo) +    if Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo)      :ignore    end -  @spec load_and_update_env([ConfigDB.t()]) :: :ok | false -  def load_and_update_env(deleted \\ [], restart_pleroma? \\ true) do -    with {_, true} <- {:configurable, Pleroma.Config.get(:configurable_from_database)} do +  @spec load_and_update_env([ConfigDB.t()], boolean()) :: :ok +  def load_and_update_env(deleted_settings \\ [], restart_pleroma? \\ true) do +    with {_, true} <- {:configurable, Config.get(:configurable_from_database)} do        # We need to restart applications for loaded settings take effect -      in_db = Repo.all(ConfigDB) - -      with_deleted = in_db ++ deleted        # TODO: some problem with prometheus after restart! -      reject = [nil, :prometheus] - -      reject_for_restart = +      reject_restart =          if restart_pleroma? do -          reject +          [nil, :prometheus]          else -          [:pleroma | reject] +          [:pleroma, nil, :prometheus]          end        started_applications = Application.started_applications() -      with_deleted -      |> Enum.map(&merge_and_update(&1)) +      (Repo.all(ConfigDB) ++ deleted_settings) +      |> Enum.map(&merge_and_update/1)        |> Enum.uniq() -      |> Enum.reject(&(&1 in reject_for_restart)) +      |> Enum.reject(&(&1 in reject_restart))        |> maybe_set_pleroma_last() -      |> Enum.each(&restart(started_applications, &1, Pleroma.Config.get(:env))) +      |> Enum.each(&restart(started_applications, &1, Config.get(:env)))        :ok      else @@ -108,18 +104,14 @@ defmodule Pleroma.Config.TransferTask do        key = ConfigDB.from_string(setting.key)        group = ConfigDB.from_string(setting.group) -      default = Pleroma.Config.Holder.config(group, key) +      default = Config.Holder.config(group, key)        value = ConfigDB.from_binary(setting.value)        merged_value = -        if Ecto.get_meta(setting, :state) == :deleted do -          default -        else -          if can_be_merged?(default, value) do -            ConfigDB.merge_group(group, key, default, value) -          else -            value -          end +        cond do +          Ecto.get_meta(setting, :state) == :deleted -> default +          can_be_merged?(default, value) -> ConfigDB.merge_group(group, key, default, value) +          true -> value          end        :ok = update_env(group, key, merged_value) diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index 57a847c30..20823a765 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -49,8 +49,6 @@ defmodule Pleroma.Gun.Conn do      key = "#{uri.scheme}:#{uri.host}:#{uri.port}" -    Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}") -      conn_pid =        if Connections.count(name) < opts[:max_connection] do          do_open(uri, opts) @@ -109,9 +107,9 @@ defmodule Pleroma.Gun.Conn do      else        error ->          Logger.warn( -          "Received error on opening connection with http proxy #{ -            Connections.compose_uri_log(uri) -          } #{inspect(error)}" +          "Opening proxied connection to #{compose_uri_log(uri)} failed with error #{ +            inspect(error) +          }"          )          error @@ -145,9 +143,9 @@ defmodule Pleroma.Gun.Conn do      else        error ->          Logger.warn( -          "Received error on opening connection with socks proxy #{ -            Connections.compose_uri_log(uri) -          } #{inspect(error)}" +          "Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{ +            inspect(error) +          }"          )          error @@ -163,9 +161,7 @@ defmodule Pleroma.Gun.Conn do      else        error ->          Logger.warn( -          "Received error on opening connection #{Connections.compose_uri_log(uri)} #{ -            inspect(error) -          }" +          "Opening connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}"          )          error @@ -184,16 +180,17 @@ defmodule Pleroma.Gun.Conn do    defp add_http2_opts(opts, _, _), do: opts    defp close_least_used_and_do_open(name, uri, opts) do -    Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}") - -    with [{close_key, least_used} | _conns] <- -           Connections.get_unused_conns(name), -         :ok <- Gun.close(least_used.conn) do -      Connections.remove_conn(name, close_key) +    with [{key, conn} | _conns] <- Connections.get_unused_conns(name), +         :ok <- Gun.close(conn.conn) do +      Connections.remove_conn(name, key)        do_open(uri, opts)      else        [] -> {:error, :pool_overflowed}      end    end + +  def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do +    "#{scheme}://#{host}#{path}" +  end  end diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex index 2c13666ec..510722ff9 100644 --- a/lib/pleroma/http/adapter_helper.ex +++ b/lib/pleroma/http/adapter_helper.ex @@ -7,7 +7,7 @@ defmodule Pleroma.HTTP.AdapterHelper do    @type proxy ::            {Connection.host(), pos_integer()} -          | {Connection.proxy_type(), pos_integer()} +          | {Connection.proxy_type(), Connection.host(), pos_integer()}    @callback options(keyword(), URI.t()) :: keyword()    @callback after_request(keyword()) :: :ok diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index 55c2b192a..f14b95c19 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -20,8 +20,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do    ]    @spec options(keyword(), URI.t()) :: keyword() -  def options(connection_opts \\ [], %URI{} = uri) do -    formatted_proxy = +  def options(incoming_opts \\ [], %URI{} = uri) do +    proxy =        Pleroma.Config.get([:http, :proxy_url], nil)        |> AdapterHelper.format_proxy() @@ -30,8 +30,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do      @defaults      |> Keyword.merge(config_opts)      |> add_scheme_opts(uri) -    |> AdapterHelper.maybe_add_proxy(formatted_proxy) -    |> maybe_get_conn(uri, connection_opts) +    |> AdapterHelper.maybe_add_proxy(proxy) +    |> maybe_get_conn(uri, incoming_opts)    end    @spec after_request(keyword()) :: :ok @@ -43,44 +43,35 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do      :ok    end -  defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts +  defp add_scheme_opts(opts, %{scheme: "http"}), do: opts -  defp add_scheme_opts(opts, %URI{scheme: "https"}) do +  defp add_scheme_opts(opts, %{scheme: "https"}) do      opts      |> Keyword.put(:certificates_verification, true) -    |> Keyword.put(:transport, :tls)      |> Keyword.put(:tls_opts, log_level: :warning)    end -  defp maybe_get_conn(adapter_opts, uri, connection_opts) do +  defp maybe_get_conn(adapter_opts, uri, incoming_opts) do      {receive_conn?, opts} =        adapter_opts -      |> Keyword.merge(connection_opts) +      |> Keyword.merge(incoming_opts)        |> Keyword.pop(:receive_conn, true)      if Connections.alive?(:gun_connections) and receive_conn? do -      try_to_get_conn(uri, opts) +      checkin_conn(uri, opts)      else        opts      end    end -  defp try_to_get_conn(uri, opts) do +  defp checkin_conn(uri, opts) do      case Connections.checkin(uri, :gun_connections) do        nil -> -        Logger.debug( -          "Gun connections pool checkin was not successful. Trying to open conn for next request." -        ) - -        Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end) +        Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])          opts        conn when is_pid(conn) -> -        Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}") - -        opts -        |> Keyword.put(:conn, conn) -        |> Keyword.put(:close_conn, false) +        Keyword.merge(opts, conn: conn, close_conn: false)      end    end  end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index 0fc88f708..76de3fcfe 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -71,15 +71,15 @@ defmodule Pleroma.HTTP.Connection do        {:ok, parse_host(host), port}      else        {_, _} -> -        Logger.warn("parsing port in proxy fail #{inspect(proxy)}") +        Logger.warn("Parsing port failed #{inspect(proxy)}")          {:error, :invalid_proxy_port}        :error -> -        Logger.warn("parsing port in proxy fail #{inspect(proxy)}") +        Logger.warn("Parsing port failed #{inspect(proxy)}")          {:error, :invalid_proxy_port}        _ -> -        Logger.warn("parsing proxy fail #{inspect(proxy)}") +        Logger.warn("Parsing proxy failed #{inspect(proxy)}")          {:error, :invalid_proxy}      end    end @@ -89,7 +89,7 @@ defmodule Pleroma.HTTP.Connection do        {:ok, type, parse_host(host), port}      else        _ -> -        Logger.warn("parsing proxy fail #{inspect(proxy)}") +        Logger.warn("Parsing proxy failed #{inspect(proxy)}")          {:error, :invalid_proxy}      end    end diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex index 466a94adc..583b56484 100644 --- a/lib/pleroma/http/http.ex +++ b/lib/pleroma/http/http.ex @@ -56,10 +56,9 @@ defmodule Pleroma.HTTP do            {:ok, Env.t()} | {:error, any()}    def request(method, url, body, headers, options) when is_binary(url) do      uri = URI.parse(url) -    received_adapter_opts = Keyword.get(options, :adapter, []) -    adapter_opts = Connection.options(uri, received_adapter_opts) +    adapter_opts = Connection.options(uri, options[:adapter] || [])      options = put_in(options[:adapter], adapter_opts) -    params = Keyword.get(options, :params, []) +    params = options[:params] || []      request = build_request(method, headers, options, url, body, params)      adapter = Application.get_env(:tesla, :adapter) diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex index 7529e9240..772833509 100644 --- a/lib/pleroma/pool/connections.ex +++ b/lib/pleroma/pool/connections.ex @@ -87,18 +87,11 @@ defmodule Pleroma.Pool.Connections do    @impl true    def handle_cast({:checkout, conn_pid, pid}, state) do -    Logger.debug("checkout #{inspect(conn_pid)}") -      state =        with true <- Process.alive?(conn_pid),             {key, conn} <- find_conn(state.conns, conn_pid),             used_by <- List.keydelete(conn.used_by, pid, 0) do -        conn_state = -          if used_by == [] do -            :idle -          else -            conn.conn_state -          end +        conn_state = if used_by == [], do: :idle, else: conn.conn_state          put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})        else @@ -123,26 +116,23 @@ defmodule Pleroma.Pool.Connections do    @impl true    def handle_call({:checkin, uri}, from, state) do      key = "#{uri.scheme}:#{uri.host}:#{uri.port}" -    Logger.debug("checkin #{key}")      case state.conns[key] do -      %{conn: conn, gun_state: :up} = current_conn -> -        Logger.debug("reusing conn #{key}") - +      %{conn: pid, gun_state: :up} = conn ->          time = :os.system_time(:second) -        last_reference = time - current_conn.last_reference -        current_crf = crf(last_reference, 100, current_conn.crf) +        last_reference = time - conn.last_reference +        crf = crf(last_reference, 100, conn.crf)          state =            put_in(state.conns[key], %{ -            current_conn +            conn              | last_reference: time, -              crf: current_crf, +              crf: crf,                conn_state: :active, -              used_by: [from | current_conn.used_by] +              used_by: [from | conn.used_by]            }) -        {:reply, conn, state} +        {:reply, pid, state}        %{gun_state: :down} ->          {:reply, nil, state} @@ -164,50 +154,48 @@ defmodule Pleroma.Pool.Connections do    def handle_call(:unused_conns, _from, state) do      unused_conns =        state.conns -      |> Enum.filter(fn {_k, v} -> -        v.conn_state == :idle and v.used_by == [] -      end) -      |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> -        x.crf <= y.crf and x.last_reference <= y.last_reference -      end) +      |> Enum.filter(&filter_conns/1) +      |> Enum.sort(&sort_conns/2)      {:reply, unused_conns, state}    end +  defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true +  defp filter_conns(_), do: false + +  defp sort_conns({_, c1}, {_, c2}) do +    c1.crf <= c2.crf and c1.last_reference <= c2.last_reference +  end +    @impl true    def handle_info({:gun_up, conn_pid, _protocol}, state) do +    %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid) + +    host = +      case :inet.ntoa(host) do +        {:error, :einval} -> host +        ip -> ip +      end + +    key = "#{scheme}:#{host}:#{port}" +      state = -      with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid), -           {key, conn} <- find_conn(state.conns, conn_pid, conn_key), +      with {_key, conn} <- find_conn(state.conns, conn_pid, key),             {true, key} <- {Process.alive?(conn_pid), key} do -        time = :os.system_time(:second) -        last_reference = time - conn.last_reference -        current_crf = crf(last_reference, 100, conn.crf) -          put_in(state.conns[key], %{            conn            | gun_state: :up, -            last_reference: time, -            crf: current_crf,              conn_state: :active,              retries: 0          })        else -        :error_gun_info -> -          Logger.debug(":gun.info caused error") -          state -          {false, key} -> -          Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}") -            put_in(              state.conns,              Map.delete(state.conns, key)            )          nil -> -          Logger.debug(":gun_up message for conn which is not found in state") -            :ok = Gun.close(conn_pid)            state @@ -224,7 +212,6 @@ defmodule Pleroma.Pool.Connections do        with {key, conn} <- find_conn(state.conns, conn_pid),             {true, key} <- {Process.alive?(conn_pid), key} do          if conn.retries == retries do -          Logger.debug("closing conn if retries is eq  #{inspect(conn_pid)}")            :ok = Gun.close(conn.conn)            put_in( @@ -240,18 +227,13 @@ defmodule Pleroma.Pool.Connections do          end        else          {false, key} -> -          # gun can send gun_down for closed conn, maybe connection is not closed yet -          Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}") -            put_in(              state.conns,              Map.delete(state.conns, key)            )          nil -> -          Logger.debug(":gun_down message for conn which is not found in state") - -          :ok = Gun.close(conn_pid) +          Logger.debug(":gun_down for conn which isn't found in state")            state        end @@ -275,7 +257,7 @@ defmodule Pleroma.Pool.Connections do          )        else          nil -> -          Logger.debug(":DOWN message for conn which is not found in state") +          Logger.debug(":DOWN for conn which isn't found in state")            state        end @@ -283,18 +265,6 @@ defmodule Pleroma.Pool.Connections do      {:noreply, state}    end -  defp compose_key_gun_info(pid) do -    %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid) - -    host = -      case :inet.ntoa(origin_host) do -        {:error, :einval} -> origin_host -        ip -> ip -      end - -    "#{scheme}:#{host}:#{port}" -  end -    defp find_conn(conns, conn_pid) do      Enum.find(conns, fn {_key, conn} ->        conn.conn == conn_pid @@ -310,8 +280,4 @@ defmodule Pleroma.Pool.Connections do    def crf(current, steps, crf) do      1 + :math.pow(0.5, current / steps) * crf    end - -  def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do -    "#{scheme}://#{host}#{path}" -  end  end | 
