diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/application.ex | 5 | ||||
| -rw-r--r-- | lib/pleroma/jobs.ex | 152 | ||||
| -rw-r--r-- | lib/pleroma/user.ex | 1 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub.ex | 29 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub_controller.ex | 4 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/utils.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/federator.ex | 145 | ||||
| -rw-r--r-- | lib/pleroma/web/ostatus/ostatus_controller.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/web/salmon/salmon.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/web/websub/websub.ex | 7 | ||||
| -rw-r--r-- | lib/pleroma/web/websub/websub_controller.ex | 2 | 
11 files changed, 235 insertions, 116 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index d67e2cdc8..d2523c045 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -108,9 +108,10 @@ defmodule Pleroma.Application do          hackney_pool_children() ++          [            worker(Pleroma.Web.Federator.RetryQueue, []), -          worker(Pleroma.Web.Federator, []),            worker(Pleroma.Stats, []), -          worker(Pleroma.Web.Push, []) +          worker(Pleroma.Web.Push, []), +          worker(Pleroma.Jobs, []), +          worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary)          ] ++          streamer_child() ++          chat_child() ++ diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex new file mode 100644 index 000000000..24b7e5e46 --- /dev/null +++ b/lib/pleroma/jobs.ex @@ -0,0 +1,152 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Jobs do +  @moduledoc """ +  A basic job queue +  """ +  use GenServer + +  require Logger + +  def init(args) do +    {:ok, args} +  end + +  def start_link do +    queues = +      Pleroma.Config.get(Pleroma.Jobs) +      |> Enum.map(fn {name, _} -> create_queue(name) end) +      |> Enum.into(%{}) + +    state = %{ +      queues: queues, +      refs: %{} +    } + +    GenServer.start_link(__MODULE__, state, name: __MODULE__) +  end + +  def create_queue(name) do +    {name, {:sets.new(), []}} +  end + +  @doc """ +  Enqueues a job. + +  Returns `:ok`. + +  ## Arguments + +  - `queue_name` - a queue name(must be specified in the config). +  - `mod` - a worker module (must have `perform` function). +  - `args` - a list of arguments for the `perform` function of the worker module. +  - `priority` - a job priority (`0` by default). + +  ## Examples + +  Enqueue `Module.perform/0` with `priority=1`: + +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, []) +      :ok + +  Enqueue `Module.perform(:job_name)` with `priority=5`: + +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5) +      :ok + +  Enqueue `Module.perform(:another_job, data)` with `priority=1`: + +      iex> data = "foobar" +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data]) +      :ok + +  Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`: + +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42]) +      :ok + +  """ + +  def enqueue(queue_name, mod, args, priority \\ 1) + +  if Mix.env() == :test do +    def enqueue(_queue_name, mod, args, _priority) do +      apply(mod, :perform, args) +    end +  else +    @spec enqueue(atom(), atom(), [any()], integer()) :: :ok +    def enqueue(queue_name, mod, args, priority) do +      GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) +    end +  end + +  def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do +    {running_jobs, queue} = state[:queues][queue_name] + +    queue = enqueue_sorted(queue, {mod, args}, priority) + +    state = +      state +      |> update_queue(queue_name, {running_jobs, queue}) +      |> maybe_start_job(queue_name, running_jobs, queue) + +    {:noreply, state} +  end + +  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do +    queue_name = state.refs[ref] + +    {running_jobs, queue} = state[:queues][queue_name] + +    running_jobs = :sets.del_element(ref, running_jobs) + +    state = +      state +      |> remove_ref(ref) +      |> update_queue(queue_name, {running_jobs, queue}) +      |> maybe_start_job(queue_name, running_jobs, queue) + +    {:noreply, state} +  end + +  def maybe_start_job(state, queue_name, running_jobs, queue) do +    if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) && +         queue != [] do +      {{mod, args}, queue} = queue_pop(queue) +      {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end) +      mref = Process.monitor(pid) + +      state +      |> add_ref(queue_name, mref) +      |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue}) +    else +      state +    end +  end + +  def enqueue_sorted(queue, element, priority) do +    [%{item: element, priority: priority} | queue] +    |> Enum.sort_by(fn %{priority: priority} -> priority end) +  end + +  def queue_pop([%{item: element} | queue]) do +    {element, queue} +  end + +  defp add_ref(state, queue_name, ref) do +    refs = Map.put(state[:refs], ref, queue_name) +    Map.put(state, :refs, refs) +  end + +  defp remove_ref(state, ref) do +    refs = Map.delete(state[:refs], ref) +    Map.put(state, :refs, refs) +  end + +  defp update_queue(state, queue_name, data) do +    queues = Map.put(state[:queues], queue_name, data) +    Map.put(state, :queues, queues) +  end +end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 322c338cd..18bb56667 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -237,6 +237,7 @@ defmodule Pleroma.User do        changeset        |> put_change(:password_hash, hashed)        |> put_change(:ap_id, ap_id) +      |> unique_constraint(:ap_id)        |> put_change(:following, [followers])        |> put_change(:follower_address, followers)      else diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index a4ef47b40..8d3116839 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -757,21 +757,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      public = is_public?(activity) -    reachable_inboxes_metadata = -      (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) -      |> Enum.filter(fn user -> User.ap_enabled?(user) end) -      |> Enum.map(fn %{info: %{source_data: data}} -> -        (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] -      end) -      |> Enum.uniq() -      |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) -      |> Instances.filter_reachable() -      {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)      json = Jason.encode!(data) -    Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} -> -      Federator.enqueue(:publish_single_ap, %{ +    (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) +    |> Enum.filter(fn user -> User.ap_enabled?(user) end) +    |> Enum.map(fn %{info: %{source_data: data}} -> +      (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] +    end) +    |> Enum.uniq() +    |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) +    |> Instances.filter_reachable() +    |> Enum.each(fn {inbox, unreachable_since} -> +      Federator.publish_single_ap(%{          inbox: inbox,          json: json,          actor: actor, @@ -880,7 +878,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    end    def is_private?(activity) do -    !is_public?(activity) && Enum.any?(activity.data["to"], &String.contains?(&1, "/followers")) +    unless is_public?(activity) do +      follower_address = User.get_cached_by_ap_id(activity.data["actor"]).follower_address +      Enum.any?(activity.data["to"], &(&1 == follower_address)) +    else +      false +    end    end    def is_direct?(%Activity{data: %{"directMessage" => true}}), do: true diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 69879476e..2bea51311 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -155,13 +155,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do      with %User{} = user <- User.get_cached_by_nickname(nickname),           true <- Utils.recipient_in_message(user.ap_id, params),           params <- Utils.maybe_splice_recipient(user.ap_id, params) do -      Federator.enqueue(:incoming_ap_doc, params) +      Federator.incoming_ap_doc(params)        json(conn, "ok")      end    end    def inbox(%{assigns: %{valid_signature: true}} = conn, params) do -    Federator.enqueue(:incoming_ap_doc, params) +    Federator.incoming_ap_doc(params)      json(conn, "ok")    end diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 964e11c9d..6a89374d0 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -164,7 +164,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do          _ -> 5        end -    Pleroma.Web.Federator.enqueue(:publish, activity, priority) +    Pleroma.Web.Federator.publish(activity, priority)      :ok    end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 468959a65..d4e2a9742 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,8 +3,6 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.Federator do -  use GenServer -    alias Pleroma.Activity    alias Pleroma.User    alias Pleroma.Web.WebFinger @@ -16,45 +14,71 @@ defmodule Pleroma.Web.Federator do    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.Federator.RetryQueue    alias Pleroma.Web.OStatus +  alias Pleroma.Jobs    require Logger    @websub Application.get_env(:pleroma, :websub)    @ostatus Application.get_env(:pleroma, :ostatus) -  def init(args) do -    {:ok, args} +  def init() do +    # 1 minute +    Process.sleep(1000 * 60 * 1) +    refresh_subscriptions()    end -  def start_link do -    spawn(fn -> -      # 1 minute -      Process.sleep(1000 * 60) -      enqueue(:refresh_subscriptions, nil) -    end) +  # Client API + +  def incoming_doc(doc) do +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) +  end + +  def incoming_ap_doc(params) do +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) +  end + +  def publish(activity, priority \\ 1) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) +  end + +  def publish_single_ap(params) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params]) +  end -    GenServer.start_link( -      __MODULE__, -      %{ -        in: {:sets.new(), []}, -        out: {:sets.new(), []} -      }, -      name: __MODULE__ -    ) +  def publish_single_websub(websub) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])    end -  def handle(:refresh_subscriptions, _) do +  def verify_websub(websub) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) +  end + +  def request_subscription(sub) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) +  end + +  def refresh_subscriptions() do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) +  end + +  def publish_single_salmon(params) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params]) +  end + +  # Job Worker Callbacks + +  def perform(:refresh_subscriptions) do      Logger.debug("Federator running refresh subscriptions")      Websub.refresh_subscriptions()      spawn(fn ->        # 6 hours        Process.sleep(1000 * 60 * 60 * 6) -      enqueue(:refresh_subscriptions, nil) +      refresh_subscriptions()      end)    end -  def handle(:request_subscription, websub) do +  def perform(:request_subscription, websub) do      Logger.debug("Refreshing #{websub.topic}")      with {:ok, websub} <- Websub.request_subscription(websub) do @@ -64,7 +88,7 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(:publish, activity) do +  def perform(:publish, activity) do      Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)      with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do @@ -90,7 +114,7 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(:verify_websub, websub) do +  def perform(:verify_websub, websub) do      Logger.debug(fn ->        "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"      end) @@ -98,12 +122,12 @@ defmodule Pleroma.Web.Federator do      @websub.verify(websub)    end -  def handle(:incoming_doc, doc) do +  def perform(:incoming_doc, doc) do      Logger.info("Got document, trying to parse")      @ostatus.handle_incoming(doc)    end -  def handle(:incoming_ap_doc, params) do +  def perform(:incoming_ap_doc, params) do      Logger.info("Handling incoming AP activity")      params = Utils.normalize_params(params) @@ -128,11 +152,11 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(:publish_single_salmon, params) do +  def perform(:publish_single_salmon, params) do      Salmon.send_to_user(params)    end -  def handle(:publish_single_ap, params) do +  def perform(:publish_single_ap, params) do      case ActivityPub.publish_one(params) do        {:ok, _} ->          :ok @@ -142,7 +166,7 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle( +  def perform(          :publish_single_websub,          %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params        ) do @@ -155,74 +179,11 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(type, _) do +  def perform(type, _) do      Logger.debug(fn -> "Unknown task: #{type}" end)      {:error, "Don't know what to do with this"}    end -  if Mix.env() == :test do -    def enqueue(type, payload, _priority \\ 1) do -      if Pleroma.Config.get([:instance, :federating]) do -        handle(type, payload) -      end -    end -  else -    def enqueue(type, payload, priority \\ 1) do -      if Pleroma.Config.get([:instance, :federating]) do -        GenServer.cast(__MODULE__, {:enqueue, type, payload, priority}) -      end -    end -  end - -  def maybe_start_job(running_jobs, queue) do -    if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do -      {{type, payload}, queue} = queue_pop(queue) -      {:ok, pid} = Task.start(fn -> handle(type, payload) end) -      mref = Process.monitor(pid) -      {:sets.add_element(mref, running_jobs), queue} -    else -      {running_jobs, queue} -    end -  end - -  def handle_cast({:enqueue, type, payload, _priority}, state) -      when type in [:incoming_doc, :incoming_ap_doc] do -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state -    i_queue = enqueue_sorted(i_queue, {type, payload}, 1) -    {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} -  end - -  def handle_cast({:enqueue, type, payload, _priority}, state) do -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state -    o_queue = enqueue_sorted(o_queue, {type, payload}, 1) -    {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} -  end - -  def handle_cast(_, state) do -    {:noreply, state} -  end - -  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state -    i_running_jobs = :sets.del_element(ref, i_running_jobs) -    o_running_jobs = :sets.del_element(ref, o_running_jobs) -    {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) -    {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} -  end - -  def enqueue_sorted(queue, element, priority) do -    [%{item: element, priority: priority} | queue] -    |> Enum.sort_by(fn %{priority: priority} -> priority end) -  end - -  def queue_pop([%{item: element} | queue]) do -    {element, queue} -  end -    def ap_enabled_actor(id) do      user = User.get_by_ap_id(id) diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex index db4c8f4da..bab3da2b0 100644 --- a/lib/pleroma/web/ostatus/ostatus_controller.ex +++ b/lib/pleroma/web/ostatus/ostatus_controller.ex @@ -87,7 +87,7 @@ defmodule Pleroma.Web.OStatus.OStatusController do      {:ok, body, _conn} = read_body(conn)      {:ok, doc} = decode_or_retry(body) -    Federator.enqueue(:incoming_doc, doc) +    Federator.incoming_doc(doc)      conn      |> send_resp(200, "") diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index a5a9e16c6..0a69aa1ec 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -229,7 +229,7 @@ defmodule Pleroma.Web.Salmon do        |> Enum.each(fn remote_user ->          Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) -        Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{ +        Pleroma.Web.Federator.publish_single_salmon(%{            recipient: remote_user,            feed: feed,            poster: poster, diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index a08d7993d..c00ec0858 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -13,6 +13,7 @@ defmodule Pleroma.Web.Websub do    alias Pleroma.Web.Endpoint    alias Pleroma.Web.OStatus    alias Pleroma.Web.Router.Helpers +  alias Pleroma.Web.Federator    require Logger    import Ecto.Query @@ -87,7 +88,7 @@ defmodule Pleroma.Web.Websub do          unreachable_since: reachable_callbacks_metadata[sub.callback]        } -      Pleroma.Web.Federator.enqueue(:publish_single_websub, data) +      Federator.publish_single_websub(data)      end)    end @@ -119,7 +120,7 @@ defmodule Pleroma.Web.Websub do        websub = Repo.update!(change) -      Pleroma.Web.Federator.enqueue(:verify_websub, websub) +      Federator.verify_websub(websub)        {:ok, websub}      else @@ -269,7 +270,7 @@ defmodule Pleroma.Web.Websub do      subs = Repo.all(query)      Enum.each(subs, fn sub -> -      Pleroma.Web.Federator.enqueue(:request_subscription, sub) +      Federator.request_subscription(sub)      end)    end diff --git a/lib/pleroma/web/websub/websub_controller.ex b/lib/pleroma/web/websub/websub_controller.ex index 1ad18a8a4..ad40f1b94 100644 --- a/lib/pleroma/web/websub/websub_controller.ex +++ b/lib/pleroma/web/websub/websub_controller.ex @@ -84,7 +84,7 @@ defmodule Pleroma.Web.Websub.WebsubController do           %WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id),           {:ok, body, _conn} = read_body(conn),           ^signature <- Websub.sign(websub.secret, body) do -      Federator.enqueue(:incoming_doc, body) +      Federator.incoming_doc(body)        conn        |> send_resp(200, "OK") | 
