diff options
25 files changed, 360 insertions, 507 deletions
diff --git a/config/config.exs b/config/config.exs index d2325edbc..258e4d274 100644 --- a/config/config.exs +++ b/config/config.exs @@ -440,13 +440,7 @@ config :pleroma, Pleroma.User,      "web"    ] -config :pleroma, Pleroma.Web.Federator.RetryQueue, -  enabled: false, -  max_jobs: 20, -  initial_timeout: 30, -  max_retries: 5 - -config :pleroma_job_queue, :queues, +job_queues = [    federator_incoming: 50,    federator_outgoing: 50,    web_push: 50, @@ -454,6 +448,22 @@ config :pleroma_job_queue, :queues,    transmogrifier: 20,    scheduled_activities: 10,    background: 5 +] + +config :pleroma_job_queue, :queues, job_queues + +config :pleroma, Oban, +  repo: Pleroma.Repo, +  verbose: false, +  prune: {:maxage, 60 * 60 * 24 * 7}, +  queues: job_queues + +config :pleroma, :workers, +  retries: [ +    compile_time_default: 1, +    federator_incoming: 5, +    federator_outgoing: 5 +  ]  config :pleroma, :fetch_initial_posts,    enabled: false, diff --git a/config/test.exs b/config/test.exs index 6f75f39b5..f897b5d48 100644 --- a/config/test.exs +++ b/config/test.exs @@ -63,6 +63,10 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock  config :pleroma_job_queue, disabled: true +config :pleroma, Oban, +  queues: false, +  prune: :disabled +  config :pleroma, Pleroma.ScheduledActivity,    daily_user_limit: 2,    total_user_limit: 3, diff --git a/docs/config.md b/docs/config.md index 703ef67dd..8f8bd22f4 100644 --- a/docs/config.md +++ b/docs/config.md @@ -412,13 +412,6 @@ config :pleroma_job_queue, :queues,  This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. -## Pleroma.Web.Federator.RetryQueue - -* `enabled`: If set to `true`, failed federation jobs will be retried -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time. -* `initial_timeout`: The initial timeout in seconds -* `max_retries`: The maximum number of times a federation job is retried -  ## Pleroma.Web.Metadata  * `providers`: a list of metadata providers to enable. Providers available:    * Pleroma.Web.Metadata.Providers.OpenGraph diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 00b06f723..5550a4902 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -120,8 +120,8 @@ defmodule Pleroma.Application do          hackney_pool_children() ++          [            %{ -            id: Pleroma.Web.Federator.RetryQueue, -            start: {Pleroma.Web.Federator.RetryQueue, :start_link, []} +            id: Oban, +            start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}            },            %{              id: Pleroma.Web.OAuth.Token.CleanWorker, diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 46edab0bd..29f3221d1 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -85,6 +85,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do      end    end +  def publish_one(%{actor_id: actor_id} = params) do +    actor = User.get_by_id(actor_id) + +    params +    |> Map.delete(:actor_id) +    |> Map.put(:actor, actor) +    |> publish_one() +  end +    defp should_federate?(inbox, public) do      if public do        true @@ -160,7 +169,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do    Publishes an activity with BCC to all relevant peers.    """ -  def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do +  def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) +      when is_list(bcc) and bcc != [] do      public = is_public?(activity)      {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -187,7 +197,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do        Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{          inbox: inbox,          json: json, -        actor: actor, +        actor_id: actor.id,          id: activity.data["id"],          unreachable_since: unreachable_since        }) @@ -222,7 +232,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do          %{            inbox: inbox,            json: json, -          actor: actor, +          actor_id: actor.id,            id: activity.data["id"],            unreachable_since: unreachable_since          } diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index fc5305c58..4f68acc78 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do    """    def maybe_federate(%Activity{local: true} = activity) do      if Pleroma.Config.get!([:instance, :federating]) do -      priority = -        case activity.data["type"] do -          "Delete" -> 10 -          "Create" -> 1 -          _ -> 5 -        end - -      Pleroma.Web.Federator.publish(activity, priority) +      Pleroma.Web.Federator.publish(activity)      end      :ok diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..bb9eadfee 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,23 +3,15 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.Federator do -  alias Pleroma.Activity -  alias Pleroma.Object.Containment -  alias Pleroma.User -  alias Pleroma.Web.ActivityPub.ActivityPub -  alias Pleroma.Web.ActivityPub.Transmogrifier -  alias Pleroma.Web.ActivityPub.Utils -  alias Pleroma.Web.Federator.Publisher -  alias Pleroma.Web.Federator.RetryQueue -  alias Pleroma.Web.OStatus -  alias Pleroma.Web.Websub +  alias Pleroma.Workers.Publisher, as: PublisherWorker +  alias Pleroma.Workers.Receiver, as: ReceiverWorker +  alias Pleroma.Workers.Subscriber, as: SubscriberWorker    require Logger    def init do      # 1 minute -    Process.sleep(1000 * 60) -    refresh_subscriptions() +    refresh_subscriptions(schedule_in: 60)    end    @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -37,124 +29,50 @@ defmodule Pleroma.Web.Federator do    # Client API    def incoming_doc(doc) do -    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) +    %{"op" => "incoming_doc", "body" => doc} +    |> ReceiverWorker.new(worker_args(:federator_incoming)) +    |> Pleroma.Repo.insert()    end    def incoming_ap_doc(params) do -    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) +    %{"op" => "incoming_ap_doc", "params" => params} +    |> ReceiverWorker.new(worker_args(:federator_incoming)) +    |> Pleroma.Repo.insert()    end -  def publish(activity, priority \\ 1) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) +  def publish(%{id: "pleroma:fakeid"} = activity) do +    PublisherWorker.perform_publish(activity)    end -  def verify_websub(websub) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) -  end - -  def request_subscription(sub) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) -  end - -  def refresh_subscriptions do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) -  end - -  # Job Worker Callbacks - -  def perform(:refresh_subscriptions) do -    Logger.debug("Federator running refresh subscriptions") -    Websub.refresh_subscriptions() - -    spawn(fn -> -      # 6 hours -      Process.sleep(1000 * 60 * 60 * 6) -      refresh_subscriptions() -    end) -  end - -  def perform(:request_subscription, websub) do -    Logger.debug("Refreshing #{websub.topic}") - -    with {:ok, websub} <- Websub.request_subscription(websub) do -      Logger.debug("Successfully refreshed #{websub.topic}") -    else -      _e -> Logger.debug("Couldn't refresh #{websub.topic}") -    end -  end - -  def perform(:publish, activity) do -    Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) - -    with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), -         {:ok, actor} <- User.ensure_keys_present(actor) do -      Publisher.publish(actor, activity) -    end -  end - -  def perform(:verify_websub, websub) do -    Logger.debug(fn -> -      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" -    end) - -    Websub.verify(websub) -  end - -  def perform(:incoming_doc, doc) do -    Logger.info("Got document, trying to parse") -    OStatus.handle_incoming(doc) +  def publish(activity) do +    %{"op" => "publish", "activity_id" => activity.id} +    |> PublisherWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def perform(:incoming_ap_doc, params) do -    Logger.info("Handling incoming AP activity") - -    params = Utils.normalize_params(params) - -    # NOTE: we use the actor ID to do the containment, this is fine because an -    # actor shouldn't be acting on objects outside their own AP server. -    with {:ok, _user} <- ap_enabled_actor(params["actor"]), -         nil <- Activity.normalize(params["id"]), -         :ok <- Containment.contain_origin_from_id(params["actor"], params), -         {:ok, activity} <- Transmogrifier.handle_incoming(params) do -      {:ok, activity} -    else -      %Activity{} -> -        Logger.info("Already had #{params["id"]}") -        :error - -      _e -> -        # Just drop those for now -        Logger.info("Unhandled activity") -        Logger.info(Jason.encode!(params, pretty: true)) -        :error -    end +  def verify_websub(websub) do +    %{"op" => "verify_websub", "websub_id" => websub.id} +    |> SubscriberWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def perform( -        :publish_single_websub, -        %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params -      ) do -    case Websub.publish_one(params) do -      {:ok, _} -> -        :ok - -      {:error, _} -> -        RetryQueue.enqueue(params, Websub) -    end +  def request_subscription(websub) do +    %{"op" => "request_subscription", "websub_id" => websub.id} +    |> SubscriberWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def perform(type, _) do -    Logger.debug(fn -> "Unknown task: #{type}" end) -    {:error, "Don't know what to do with this"} +  def refresh_subscriptions(worker_args \\ []) do +    %{"op" => "refresh_subscriptions"} +    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def ap_enabled_actor(id) do -    user = User.get_cached_by_ap_id(id) - -    if User.ap_enabled?(user) do -      {:ok, user} +  defp worker_args(queue) do +    if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do +      [max_attempts: max_attempts]      else -      ActivityPub.make_user_from_ap_id(id) +      []      end    end  end diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 70f870244..05d2be615 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do    alias Pleroma.Activity    alias Pleroma.Config    alias Pleroma.User -  alias Pleroma.Web.Federator.RetryQueue +  alias Pleroma.Workers.Publisher, as: PublisherWorker    require Logger @@ -30,23 +30,17 @@ defmodule Pleroma.Web.Federator.Publisher do    Enqueue publishing a single activity.    """    @spec enqueue_one(module(), Map.t()) :: :ok -  def enqueue_one(module, %{} = params), -    do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) - -  @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} -  def perform(:publish_one, module, params) do -    case apply(module, :publish_one, [params]) do -      {:ok, _} -> -        :ok - -      {:error, _e} -> -        RetryQueue.enqueue(params, module) -    end -  end +  def enqueue_one(module, %{} = params) do +    worker_args = +      if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do +        [max_attempts: max_attempts] +      else +        [] +      end -  def perform(type, _, _) do -    Logger.debug("Unknown task: #{type}") -    {:error, "Don't know what to do with this"} +    %{"op" => "publish_one", "module" => to_string(module), "params" => params} +    |> PublisherWorker.new(worker_args) +    |> Pleroma.Repo.insert()    end    @doc """ diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex deleted file mode 100644 index 3db948c2e..000000000 --- a/lib/pleroma/web/federator/retry_queue.ex +++ /dev/null @@ -1,239 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Federator.RetryQueue do -  use GenServer - -  require Logger - -  def init(args) do -    queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) - -    {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} -  end - -  def start_link do -    enabled = -      if Pleroma.Config.get(:env) == :test, -        do: true, -        else: Pleroma.Config.get([__MODULE__, :enabled], false) - -    if enabled do -      Logger.info("Starting retry queue") - -      linkres = -        GenServer.start_link( -          __MODULE__, -          %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, -          name: __MODULE__ -        ) - -      maybe_kickoff_timer() -      linkres -    else -      Logger.info("Retry queue disabled") -      :ignore -    end -  end - -  def enqueue(data, transport, retries \\ 0) do -    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) -  end - -  def get_stats do -    GenServer.call(__MODULE__, :get_stats) -  end - -  def reset_stats do -    GenServer.call(__MODULE__, :reset_stats) -  end - -  def get_retry_params(retries) do -    if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do -      {:drop, "Max retries reached"} -    else -      {:retry, growth_function(retries)} -    end -  end - -  def get_retry_timer_interval do -    Pleroma.Config.get([:retry_queue, :interval], 1000) -  end - -  defp ets_count_expires(table, current_time) do -    :ets.select_count( -      table, -      [ -        { -          {:"$1", :"$2"}, -          [{:"=<", :"$1", {:const, current_time}}], -          [true] -        } -      ] -    ) -  end - -  defp ets_pop_n_expired(table, current_time, desired) do -    {popped, _continuation} = -      :ets.select( -        table, -        [ -          { -            {:"$1", :"$2"}, -            [{:"=<", :"$1", {:const, current_time}}], -            [:"$_"] -          } -        ], -        desired -      ) - -    popped -    |> Enum.each(fn e -> -      :ets.delete_object(table, e) -    end) - -    popped -  end - -  def maybe_start_job(running_jobs, queue_table) do -    # we don't want to hit the ets or the DateTime more times than we have to -    # could optimize slightly further by not using the count, and instead grabbing -    # up to N objects early... -    current_time = DateTime.to_unix(DateTime.utc_now()) -    n_running_jobs = :sets.size(running_jobs) - -    if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do -      n_ready_jobs = ets_count_expires(queue_table, current_time) - -      if n_ready_jobs > 0 do -        # figure out how many we could start -        available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs -        start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) -      else -        running_jobs -      end -    else -      running_jobs -    end -  end - -  defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do -    running_jobs -  end - -  defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) -       when available_job_slots > 0 do -    candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) - -    candidates -    |> List.foldl(running_jobs, fn {_, e}, rj -> -      {:ok, pid} = Task.start(fn -> worker(e) end) -      mref = Process.monitor(pid) -      :sets.add_element(mref, rj) -    end) -  end - -  def worker({:send, data, transport, retries}) do -    case transport.publish_one(data) do -      {:ok, _} -> -        GenServer.cast(__MODULE__, :inc_delivered) -        :delivered - -      {:error, _reason} -> -        enqueue(data, transport, retries) -        :retry -    end -  end - -  def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do -    {:reply, %{delivered: delivery_count, dropped: drop_count}, state} -  end - -  def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do -    {:reply, %{delivered: delivery_count, dropped: drop_count}, -     %{state | delivered: 0, dropped: 0}} -  end - -  def handle_cast(:reset_stats, state) do -    {:noreply, %{state | delivered: 0, dropped: 0}} -  end - -  def handle_cast( -        {:maybe_enqueue, data, transport, retries}, -        %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state -      ) do -    case get_retry_params(retries) do -      {:retry, timeout} -> -        :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) -        running_jobs = maybe_start_job(running_jobs, queue_table) -        {:noreply, %{state | running_jobs: running_jobs}} - -      {:drop, message} -> -        Logger.debug(message) -        {:noreply, %{state | dropped: drop_count + 1}} -    end -  end - -  def handle_cast(:kickoff_timer, state) do -    retry_interval = get_retry_timer_interval() -    Process.send_after(__MODULE__, :retry_timer_run, retry_interval) -    {:noreply, state} -  end - -  def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do -    {:noreply, %{state | delivered: delivery_count + 1}} -  end - -  def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do -    {:noreply, %{state | dropped: drop_count + 1}} -  end - -  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do -    case transport.publish_one(data) do -      {:ok, _} -> -        {:noreply, %{state | delivered: delivery_count + 1}} - -      {:error, _reason} -> -        enqueue(data, transport, retries) -        {:noreply, state} -    end -  end - -  def handle_info( -        :retry_timer_run, -        %{queue_table: queue_table, running_jobs: running_jobs} = state -      ) do -    maybe_kickoff_timer() -    running_jobs = maybe_start_job(running_jobs, queue_table) -    {:noreply, %{state | running_jobs: running_jobs}} -  end - -  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do -    %{running_jobs: running_jobs, queue_table: queue_table} = state -    running_jobs = :sets.del_element(ref, running_jobs) -    running_jobs = maybe_start_job(running_jobs, queue_table) -    {:noreply, %{state | running_jobs: running_jobs}} -  end - -  def handle_info(unknown, state) do -    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") -    {:noreply, state} -  end - -  if Pleroma.Config.get(:env) == :test do -    defp growth_function(_retries) do -      _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) -      DateTime.to_unix(DateTime.utc_now()) - 1 -    end -  else -    defp growth_function(retries) do -      round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + -        DateTime.to_unix(DateTime.utc_now()) -    end -  end - -  defp maybe_kickoff_timer do -    GenServer.cast(__MODULE__, :kickoff_timer) -  end -end diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 9b01ebcc6..bbaa293fd 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do      end    end +  def publish_one(%{recipient_id: recipient_id} = params) do +    recipient = User.get_by_id(recipient_id) + +    params +    |> Map.delete(:recipient_id) +    |> Map.put(:recipient, recipient) +    |> publish_one() +  end +    def publish_one(_), do: :noop    @supported_activities [ @@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do          Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)          Publisher.enqueue_one(__MODULE__, %{ -          recipient: remote_user, +          recipient_id: remote_user.id,            feed: feed,            unreachable_since: reachable_urls_metadata[remote_user.info.salmon]          }) diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex new file mode 100644 index 000000000..67871977a --- /dev/null +++ b/lib/pleroma/workers/publisher.ex @@ -0,0 +1,35 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Publisher do +  alias Pleroma.Activity +  alias Pleroma.User + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_outgoing", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "publish", "activity_id" => activity_id}) do +    with %Activity{} = activity <- Activity.get_by_id(activity_id) do +      perform_publish(activity) +    else +      _ -> raise "Non-existing activity: #{activity_id}" +    end +  end + +  def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do +    module_name +    |> String.to_atom() +    |> apply(:publish_one, [params]) +  end + +  def perform_publish(%Activity{} = activity) do +    with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), +         {:ok, actor} <- User.ensure_keys_present(actor) do +      Pleroma.Web.Federator.Publisher.publish(actor, activity) +    end +  end +end diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex new file mode 100644 index 000000000..43558b4e6 --- /dev/null +++ b/lib/pleroma/workers/receiver.ex @@ -0,0 +1,61 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Receiver do +  alias Pleroma.Activity +  alias Pleroma.Object.Containment +  alias Pleroma.User +  alias Pleroma.Web.ActivityPub.ActivityPub +  alias Pleroma.Web.ActivityPub.Transmogrifier +  alias Pleroma.Web.ActivityPub.Utils +  alias Pleroma.Web.OStatus + +  require Logger + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_incoming", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "incoming_doc", "body" => doc}) do +    Logger.info("Got incoming document, trying to parse") +    OStatus.handle_incoming(doc) +  end + +  def perform(%{"op" => "incoming_ap_doc", "params" => params}) do +    Logger.info("Handling incoming AP activity") + +    params = Utils.normalize_params(params) + +    # NOTE: we use the actor ID to do the containment, this is fine because an +    # actor shouldn't be acting on objects outside their own AP server. +    with {:ok, _user} <- ap_enabled_actor(params["actor"]), +         nil <- Activity.normalize(params["id"]), +         :ok <- Containment.contain_origin_from_id(params["actor"], params), +         {:ok, activity} <- Transmogrifier.handle_incoming(params) do +      {:ok, activity} +    else +      %Activity{} -> +        Logger.info("Already had #{params["id"]}") +        :error + +      _e -> +        # Just drop those for now +        Logger.info("Unhandled activity") +        Logger.info(Jason.encode!(params, pretty: true)) +        :error +    end +  end + +  defp ap_enabled_actor(id) do +    user = User.get_cached_by_ap_id(id) + +    if User.ap_enabled?(user) do +      {:ok, user} +    else +      ActivityPub.make_user_from_ap_id(id) +    end +  end +end diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex new file mode 100644 index 000000000..a8c01bb10 --- /dev/null +++ b/lib/pleroma/workers/subscriber.ex @@ -0,0 +1,44 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Subscriber do +  alias Pleroma.Repo +  alias Pleroma.Web.Websub +  alias Pleroma.Web.Websub.WebsubClientSubscription + +  require Logger + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_outgoing", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "refresh_subscriptions"}) do +    Websub.refresh_subscriptions() +    # Schedule the next run in 6 hours +    Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6) +  end + +  def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do +    websub = Repo.get(WebsubClientSubscription, websub_id) +    Logger.debug("Refreshing #{websub.topic}") + +    with {:ok, websub} <- Websub.request_subscription(websub) do +      Logger.debug("Successfully refreshed #{websub.topic}") +    else +      _e -> Logger.debug("Couldn't refresh #{websub.topic}") +    end +  end + +  def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do +    websub = Repo.get(WebsubClientSubscription, websub_id) + +    Logger.debug(fn -> +      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" +    end) + +    Websub.verify(websub) +  end +end @@ -101,6 +101,7 @@ defmodule Pleroma.Mixfile do        {:phoenix_ecto, "~> 4.0"},        {:ecto_sql, "~> 3.1"},        {:postgrex, ">= 0.13.5"}, +      {:oban, "~> 0.6"},        {:gettext, "~> 0.15"},        {:comeonin, "~> 4.1.1"},        {:pbkdf2_elixir, "~> 0.12.3"}, @@ -57,6 +57,7 @@    "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},    "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},    "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, +  "oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},    "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},    "pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},    "phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs new file mode 100644 index 000000000..2f201bd05 --- /dev/null +++ b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs @@ -0,0 +1,6 @@ +defmodule Pleroma.Repo.Migrations.AddObanJobsTable do +  use Ecto.Migration + +  defdelegate up, to: Oban.Migrations +  defdelegate down, to: Oban.Migrations +end diff --git a/test/activity_test.exs b/test/activity_test.exs index b27f6fd36..b9c12adb2 100644 --- a/test/activity_test.exs +++ b/test/activity_test.exs @@ -6,6 +6,7 @@ defmodule Pleroma.ActivityTest do    use Pleroma.DataCase    alias Pleroma.Activity    alias Pleroma.Bookmark +  alias Pleroma.ObanHelpers    alias Pleroma.Object    alias Pleroma.ThreadMute    import Pleroma.Factory @@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do        }        {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"}) -      {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params) +      {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params) +      {:ok, remote_activity} = ObanHelpers.perform(job)        %{local_activity: local_activity, remote_activity: remote_activity, user: user}      end diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex new file mode 100644 index 000000000..54b5a9566 --- /dev/null +++ b/test/support/oban_helpers.ex @@ -0,0 +1,36 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.ObanHelpers do +  @moduledoc """ +  Oban test helpers. +  """ + +  alias Pleroma.Repo + +  def perform(%Oban.Job{} = job) do +    res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job]) +    Repo.delete(job) +    res +  end + +  def perform(jobs) when is_list(jobs) do +    for job <- jobs, do: perform(job) +  end + +  def member?(%{} = job_args, jobs) when is_list(jobs) do +    Enum.any?(jobs, fn job -> +      member?(job_args, job.args) +    end) +  end + +  def member?(%{} = test_attrs, %{} = attrs) do +    Enum.all?( +      test_attrs, +      fn {k, _v} -> member?(test_attrs[k], attrs[k]) end +    ) +  end + +  def member?(x, y), do: x == y +end diff --git a/test/user_test.exs b/test/user_test.exs index 8440d456d..8617752d7 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -5,6 +5,7 @@  defmodule Pleroma.UserTest do    alias Pleroma.Activity    alias Pleroma.Builders.UserBuilder +  alias Pleroma.ObanHelpers    alias Pleroma.Object    alias Pleroma.Repo    alias Pleroma.User @@ -12,9 +13,9 @@ defmodule Pleroma.UserTest do    alias Pleroma.Web.CommonAPI    use Pleroma.DataCase +  use Oban.Testing, repo: Pleroma.Repo    import Pleroma.Factory -  import Mock    setup_all do      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -1034,11 +1035,7 @@ defmodule Pleroma.UserTest do        refute Activity.get_by_id(repeat.id)      end -    test_with_mock "it sends out User Delete activity", -                   %{user: user}, -                   Pleroma.Web.ActivityPub.Publisher, -                   [:passthrough], -                   [] do +    test "it sends out User Delete activity", %{user: user} do        config_path = [:instance, :federating]        initial_setting = Pleroma.Config.get(config_path)        Pleroma.Config.put(config_path, true) @@ -1048,10 +1045,15 @@ defmodule Pleroma.UserTest do        {:ok, _user} = User.delete(user) -      assert called( -               Pleroma.Web.ActivityPub.Publisher.publish_one(%{ -                 inbox: "http://mastodon.example.org/inbox" -               }) +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{ +                   "inbox" => "http://mastodon.example.org/inbox", +                   "id" => "pleroma:fakeid" +                 } +               }, +               all_enqueued(worker: Pleroma.Workers.Publisher)               )        Pleroma.Config.put(config_path, initial_setting) diff --git a/test/web/activity_pub/activity_pub_controller_test.exs b/test/web/activity_pub/activity_pub_controller_test.exs index 251055ee1..d7f0a8264 100644 --- a/test/web/activity_pub/activity_pub_controller_test.exs +++ b/test/web/activity_pub/activity_pub_controller_test.exs @@ -4,15 +4,19 @@  defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do    use Pleroma.Web.ConnCase +  use Oban.Testing, repo: Pleroma.Repo +    import Pleroma.Factory    alias Pleroma.Activity    alias Pleroma.Instances +  alias Pleroma.ObanHelpers    alias Pleroma.Object    alias Pleroma.User    alias Pleroma.Web.ActivityPub.ObjectView    alias Pleroma.Web.ActivityPub.UserView    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.CommonAPI +  alias Pleroma.Workers.Receiver, as: ReceiverWorker    setup_all do      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -279,7 +283,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) + +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -321,7 +326,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{user.nickname}/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -350,7 +355,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{recipient.nickname}/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -429,6 +434,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do        |> post("/users/#{recipient.nickname}/inbox", data)        |> json_response(200) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) +        activity = Activity.get_by_ap_id(data["id"])        assert activity.id @@ -504,6 +511,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{user.nickname}/outbox", data)        result = json_response(conn, 201) +        assert Activity.get_by_ap_id(result["id"])      end diff --git a/test/web/activity_pub/publisher_test.exs b/test/web/activity_pub/publisher_test.exs index 36a39c84c..26d019878 100644 --- a/test/web/activity_pub/publisher_test.exs +++ b/test/web/activity_pub/publisher_test.exs @@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do        assert called(                 Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{                   inbox: "https://domain.com/users/nick1/inbox", -                 actor: actor, +                 actor_id: actor.id,                   id: note_activity.data["id"]                 })               ) diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index 73cfaa8f1..d3a28d50e 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -4,9 +4,14 @@  defmodule Pleroma.Web.FederatorTest do    alias Pleroma.Instances +  alias Pleroma.ObanHelpers    alias Pleroma.Web.CommonAPI    alias Pleroma.Web.Federator +  alias Pleroma.Workers.Publisher, as: PublisherWorker +    use Pleroma.DataCase +  use Oban.Testing, repo: Pleroma.Repo +    import Pleroma.Factory    import Mock @@ -22,15 +27,6 @@ defmodule Pleroma.Web.FederatorTest do      :ok    end -  describe "Publisher.perform" do -    test "call `perform` with unknown task" do -      assert { -               :error, -               "Don't know what to do with this" -             } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok) -    end -  end -    describe "Publish an activity" do      setup do        user = insert(:user) @@ -51,6 +47,7 @@ defmodule Pleroma.Web.FederatorTest do      } do        with_mocks([relay_mock]) do          Federator.publish(activity) +        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))        end        assert_received :relay_publish @@ -64,6 +61,7 @@ defmodule Pleroma.Web.FederatorTest do        with_mocks([relay_mock]) do          Federator.publish(activity) +        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))        end        refute_received :relay_publish @@ -73,10 +71,7 @@ defmodule Pleroma.Web.FederatorTest do    end    describe "Targets reachability filtering in `publish`" do -    test_with_mock "it federates only to reachable instances via AP", -                   Pleroma.Web.ActivityPub.Publisher, -                   [:passthrough], -                   [] do +    test "it federates only to reachable instances via AP" do        user = insert(:user)        {inbox1, inbox2} = @@ -104,20 +99,20 @@ defmodule Pleroma.Web.FederatorTest do        {:ok, _activity} =          CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) -      assert called( -               Pleroma.Web.ActivityPub.Publisher.publish_one(%{ -                 inbox: inbox1, -                 unreachable_since: dt -               }) -             ) +      expected_dt = NaiveDateTime.to_iso8601(dt) -      refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2})) +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) + +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt} +               }, +               all_enqueued(worker: PublisherWorker) +             )      end -    test_with_mock "it federates only to reachable instances via Websub", -                   Pleroma.Web.Websub, -                   [:passthrough], -                   [] do +    test "it federates only to reachable instances via Websub" do        user = insert(:user)        websub_topic = Pleroma.Web.OStatus.feed_path(user) @@ -142,23 +137,27 @@ defmodule Pleroma.Web.FederatorTest do        {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) -      assert called( -               Pleroma.Web.Websub.publish_one(%{ -                 callback: sub2.callback, -                 unreachable_since: dt -               }) -             ) +      expected_callback = sub2.callback +      expected_dt = NaiveDateTime.to_iso8601(dt) + +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) -      refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback})) +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{ +                   "callback" => expected_callback, +                   "unreachable_since" => expected_dt +                 } +               }, +               all_enqueued(worker: PublisherWorker) +             )      end -    test_with_mock "it federates only to reachable instances via Salmon", -                   Pleroma.Web.Salmon, -                   [:passthrough], -                   [] do +    test "it federates only to reachable instances via Salmon" do        user = insert(:user) -      remote_user1 = +      _remote_user1 =          insert(:user, %{            local: false,            nickname: "nick1@domain.com", @@ -174,6 +173,8 @@ defmodule Pleroma.Web.FederatorTest do            info: %{salmon: "https://domain2.com/salmon"}          }) +      remote_user2_id = remote_user2.id +        dt = NaiveDateTime.utc_now()        Instances.set_unreachable(remote_user2.ap_id, dt) @@ -182,14 +183,20 @@ defmodule Pleroma.Web.FederatorTest do        {:ok, _activity} =          CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) -      assert called( -               Pleroma.Web.Salmon.publish_one(%{ -                 recipient: remote_user2, -                 unreachable_since: dt -               }) -             ) +      expected_dt = NaiveDateTime.to_iso8601(dt) + +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) -      refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1})) +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{ +                   "recipient_id" => remote_user2_id, +                   "unreachable_since" => expected_dt +                 } +               }, +               all_enqueued(worker: PublisherWorker) +             )      end    end @@ -209,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do          "to" => ["https://www.w3.org/ns/activitystreams#Public"]        } -      {:ok, _activity} = Federator.incoming_ap_doc(params) +      assert {:ok, job} = Federator.incoming_ap_doc(params) +      assert {:ok, _activity} = ObanHelpers.perform(job)      end      test "rejects incoming AP docs with incorrect origin" do @@ -227,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do          "to" => ["https://www.w3.org/ns/activitystreams#Public"]        } -      :error = Federator.incoming_ap_doc(params) +      assert {:ok, job} = Federator.incoming_ap_doc(params) +      assert :error = ObanHelpers.perform(job)      end      test "it does not crash if MRF rejects the post" do diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs deleted file mode 100644 index ecb3ce5d0..000000000 --- a/test/web/retry_queue_test.exs +++ /dev/null @@ -1,48 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule MockActivityPub do -  def publish_one({ret, waiter}) do -    send(waiter, :complete) -    {ret, "success"} -  end -end - -defmodule Pleroma.Web.Federator.RetryQueueTest do -  use Pleroma.DataCase -  alias Pleroma.Web.Federator.RetryQueue - -  @small_retry_count 0 -  @hopeless_retry_count 10 - -  setup do -    RetryQueue.reset_stats() -  end - -  test "RetryQueue responds to stats request" do -    assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats() -  end - -  test "failed posts are retried" do -    {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count) - -    wait_task = -      Task.async(fn -> -        receive do -          :complete -> :ok -        end -      end) - -    RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count) -    Task.await(wait_task) -    assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats() -  end - -  test "posts that have been tried too many times are dropped" do -    {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count) - -    RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count) -    assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats() -  end -end diff --git a/test/web/salmon/salmon_test.exs b/test/web/salmon/salmon_test.exs index e86e76fe9..0186f3fef 100644 --- a/test/web/salmon/salmon_test.exs +++ b/test/web/salmon/salmon_test.exs @@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do      Salmon.publish(user, activity) -    assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user})) +    assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))    end  end diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs index 74386d7db..b704a558a 100644 --- a/test/web/websub/websub_test.exs +++ b/test/web/websub/websub_test.exs @@ -4,11 +4,14 @@  defmodule Pleroma.Web.WebsubTest do    use Pleroma.DataCase +  use Oban.Testing, repo: Pleroma.Repo +  alias Pleroma.ObanHelpers    alias Pleroma.Web.Router.Helpers    alias Pleroma.Web.Websub    alias Pleroma.Web.Websub.WebsubClientSubscription    alias Pleroma.Web.Websub.WebsubServerSubscription +  alias Pleroma.Workers.Subscriber, as: SubscriberWorker    import Pleroma.Factory    import Tesla.Mock @@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do          })        _refresh = Websub.refresh_subscriptions() +      ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))        assert still_good == Repo.get(WebsubClientSubscription, still_good.id)        refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id)  | 
