diff options
| -rw-r--r-- | config/config.exs | 7 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/utils.ex | 9 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/federator.ex | 132 | ||||
| -rw-r--r-- | lib/pleroma/web/federator/publisher.ex | 12 | ||||
| -rw-r--r-- | lib/pleroma/workers/publisher.ex | 25 | ||||
| -rw-r--r-- | lib/pleroma/workers/receiver.ex | 61 | ||||
| -rw-r--r-- | lib/pleroma/workers/subscriber.ex | 44 | ||||
| -rw-r--r-- | test/activity_test.exs | 4 | ||||
| -rw-r--r-- | test/support/oban_helpers.ex | 36 | ||||
| -rw-r--r-- | test/user_test.exs | 11 | ||||
| -rw-r--r-- | test/web/activity_pub/activity_pub_controller_test.exs | 14 | ||||
| -rw-r--r-- | test/web/federator_test.exs | 57 | ||||
| -rw-r--r-- | test/web/websub/websub_test.exs | 4 | 
13 files changed, 279 insertions, 137 deletions
diff --git a/config/config.exs b/config/config.exs index 1bb325bf5..5fd64365c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -458,6 +458,13 @@ config :pleroma, Oban,    prune: {:maxage, 60 * 60 * 24 * 7},    queues: job_queues +config :pleroma, :workers, +  retries: [ +    compile_time_default: 1, +    federator_incoming: 5, +    federator_outgoing: 5 +  ] +  config :pleroma, :fetch_initial_posts,    enabled: false,    pages: 5 diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 39074888b..f0917f9d4 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do    """    def maybe_federate(%Activity{local: true} = activity) do      if Pleroma.Config.get!([:instance, :federating]) do -      priority = -        case activity.data["type"] do -          "Delete" -> 10 -          "Create" -> 1 -          _ -> 5 -        end - -      Pleroma.Web.Federator.publish(activity, priority) +      Pleroma.Web.Federator.publish(activity)      end      :ok diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 97ec9d549..bb9eadfee 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,22 +3,15 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.Federator do -  alias Pleroma.Activity -  alias Pleroma.Object.Containment -  alias Pleroma.User -  alias Pleroma.Web.ActivityPub.ActivityPub -  alias Pleroma.Web.ActivityPub.Transmogrifier -  alias Pleroma.Web.ActivityPub.Utils -  alias Pleroma.Web.Federator.Publisher -  alias Pleroma.Web.OStatus -  alias Pleroma.Web.Websub +  alias Pleroma.Workers.Publisher, as: PublisherWorker +  alias Pleroma.Workers.Receiver, as: ReceiverWorker +  alias Pleroma.Workers.Subscriber, as: SubscriberWorker    require Logger    def init do      # 1 minute -    Process.sleep(1000 * 60) -    refresh_subscriptions() +    refresh_subscriptions(schedule_in: 60)    end    @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -36,111 +29,50 @@ defmodule Pleroma.Web.Federator do    # Client API    def incoming_doc(doc) do -    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) +    %{"op" => "incoming_doc", "body" => doc} +    |> ReceiverWorker.new(worker_args(:federator_incoming)) +    |> Pleroma.Repo.insert()    end    def incoming_ap_doc(params) do -    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) +    %{"op" => "incoming_ap_doc", "params" => params} +    |> ReceiverWorker.new(worker_args(:federator_incoming)) +    |> Pleroma.Repo.insert()    end -  def publish(activity, priority \\ 1) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) +  def publish(%{id: "pleroma:fakeid"} = activity) do +    PublisherWorker.perform_publish(activity)    end -  def verify_websub(websub) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) -  end - -  def request_subscription(sub) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) -  end - -  def refresh_subscriptions do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) -  end - -  # Job Worker Callbacks - -  def perform(:refresh_subscriptions) do -    Logger.debug("Federator running refresh subscriptions") -    Websub.refresh_subscriptions() - -    spawn(fn -> -      # 6 hours -      Process.sleep(1000 * 60 * 60 * 6) -      refresh_subscriptions() -    end) -  end - -  def perform(:request_subscription, websub) do -    Logger.debug("Refreshing #{websub.topic}") - -    with {:ok, websub} <- Websub.request_subscription(websub) do -      Logger.debug("Successfully refreshed #{websub.topic}") -    else -      _e -> Logger.debug("Couldn't refresh #{websub.topic}") -    end +  def publish(activity) do +    %{"op" => "publish", "activity_id" => activity.id} +    |> PublisherWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def perform(:publish, activity) do -    Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) - -    with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), -         {:ok, actor} <- User.ensure_keys_present(actor) do -      Publisher.publish(actor, activity) -    end -  end - -  def perform(:verify_websub, websub) do -    Logger.debug(fn -> -      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" -    end) - -    Websub.verify(websub) -  end - -  def perform(:incoming_doc, doc) do -    Logger.info("Got document, trying to parse") -    OStatus.handle_incoming(doc) +  def verify_websub(websub) do +    %{"op" => "verify_websub", "websub_id" => websub.id} +    |> SubscriberWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def perform(:incoming_ap_doc, params) do -    Logger.info("Handling incoming AP activity") - -    params = Utils.normalize_params(params) - -    # NOTE: we use the actor ID to do the containment, this is fine because an -    # actor shouldn't be acting on objects outside their own AP server. -    with {:ok, _user} <- ap_enabled_actor(params["actor"]), -         nil <- Activity.normalize(params["id"]), -         :ok <- Containment.contain_origin_from_id(params["actor"], params), -         {:ok, activity} <- Transmogrifier.handle_incoming(params) do -      {:ok, activity} -    else -      %Activity{} -> -        Logger.info("Already had #{params["id"]}") -        :error - -      _e -> -        # Just drop those for now -        Logger.info("Unhandled activity") -        Logger.info(Jason.encode!(params, pretty: true)) -        :error -    end +  def request_subscription(websub) do +    %{"op" => "request_subscription", "websub_id" => websub.id} +    |> SubscriberWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def perform(type, _) do -    Logger.debug(fn -> "Unknown task: #{type}" end) -    {:error, "Don't know what to do with this"} +  def refresh_subscriptions(worker_args \\ []) do +    %{"op" => "refresh_subscriptions"} +    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def ap_enabled_actor(id) do -    user = User.get_cached_by_ap_id(id) - -    if User.ap_enabled?(user) do -      {:ok, user} +  defp worker_args(queue) do +    if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do +      [max_attempts: max_attempts]      else -      ActivityPub.make_user_from_ap_id(id) +      []      end    end  end diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index e8c1bf17f..05d2be615 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do    alias Pleroma.Activity    alias Pleroma.Config    alias Pleroma.User +  alias Pleroma.Workers.Publisher, as: PublisherWorker    require Logger @@ -30,8 +31,15 @@ defmodule Pleroma.Web.Federator.Publisher do    """    @spec enqueue_one(module(), Map.t()) :: :ok    def enqueue_one(module, %{} = params) do -    %{module: to_string(module), params: params} -    |> Pleroma.Workers.Publisher.new() +    worker_args = +      if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do +        [max_attempts: max_attempts] +      else +        [] +      end + +    %{"op" => "publish_one", "module" => to_string(module), "params" => params} +    |> PublisherWorker.new(worker_args)      |> Pleroma.Repo.insert()    end diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex index 639794830..67871977a 100644 --- a/lib/pleroma/workers/publisher.ex +++ b/lib/pleroma/workers/publisher.ex @@ -3,12 +3,33 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Workers.Publisher do -  use Oban.Worker, queue: "federator_outgoing", max_attempts: 5 +  alias Pleroma.Activity +  alias Pleroma.User + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_outgoing", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])    @impl Oban.Worker -  def perform(%Oban.Job{args: %{module: module_name, params: params}}) do +  def perform(%{"op" => "publish", "activity_id" => activity_id}) do +    with %Activity{} = activity <- Activity.get_by_id(activity_id) do +      perform_publish(activity) +    else +      _ -> raise "Non-existing activity: #{activity_id}" +    end +  end + +  def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do      module_name      |> String.to_atom()      |> apply(:publish_one, [params])    end + +  def perform_publish(%Activity{} = activity) do +    with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), +         {:ok, actor} <- User.ensure_keys_present(actor) do +      Pleroma.Web.Federator.Publisher.publish(actor, activity) +    end +  end  end diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex new file mode 100644 index 000000000..43558b4e6 --- /dev/null +++ b/lib/pleroma/workers/receiver.ex @@ -0,0 +1,61 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Receiver do +  alias Pleroma.Activity +  alias Pleroma.Object.Containment +  alias Pleroma.User +  alias Pleroma.Web.ActivityPub.ActivityPub +  alias Pleroma.Web.ActivityPub.Transmogrifier +  alias Pleroma.Web.ActivityPub.Utils +  alias Pleroma.Web.OStatus + +  require Logger + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_incoming", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "incoming_doc", "body" => doc}) do +    Logger.info("Got incoming document, trying to parse") +    OStatus.handle_incoming(doc) +  end + +  def perform(%{"op" => "incoming_ap_doc", "params" => params}) do +    Logger.info("Handling incoming AP activity") + +    params = Utils.normalize_params(params) + +    # NOTE: we use the actor ID to do the containment, this is fine because an +    # actor shouldn't be acting on objects outside their own AP server. +    with {:ok, _user} <- ap_enabled_actor(params["actor"]), +         nil <- Activity.normalize(params["id"]), +         :ok <- Containment.contain_origin_from_id(params["actor"], params), +         {:ok, activity} <- Transmogrifier.handle_incoming(params) do +      {:ok, activity} +    else +      %Activity{} -> +        Logger.info("Already had #{params["id"]}") +        :error + +      _e -> +        # Just drop those for now +        Logger.info("Unhandled activity") +        Logger.info(Jason.encode!(params, pretty: true)) +        :error +    end +  end + +  defp ap_enabled_actor(id) do +    user = User.get_cached_by_ap_id(id) + +    if User.ap_enabled?(user) do +      {:ok, user} +    else +      ActivityPub.make_user_from_ap_id(id) +    end +  end +end diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex new file mode 100644 index 000000000..a8c01bb10 --- /dev/null +++ b/lib/pleroma/workers/subscriber.ex @@ -0,0 +1,44 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Subscriber do +  alias Pleroma.Repo +  alias Pleroma.Web.Websub +  alias Pleroma.Web.Websub.WebsubClientSubscription + +  require Logger + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_outgoing", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "refresh_subscriptions"}) do +    Websub.refresh_subscriptions() +    # Schedule the next run in 6 hours +    Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6) +  end + +  def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do +    websub = Repo.get(WebsubClientSubscription, websub_id) +    Logger.debug("Refreshing #{websub.topic}") + +    with {:ok, websub} <- Websub.request_subscription(websub) do +      Logger.debug("Successfully refreshed #{websub.topic}") +    else +      _e -> Logger.debug("Couldn't refresh #{websub.topic}") +    end +  end + +  def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do +    websub = Repo.get(WebsubClientSubscription, websub_id) + +    Logger.debug(fn -> +      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" +    end) + +    Websub.verify(websub) +  end +end diff --git a/test/activity_test.exs b/test/activity_test.exs index b27f6fd36..b9c12adb2 100644 --- a/test/activity_test.exs +++ b/test/activity_test.exs @@ -6,6 +6,7 @@ defmodule Pleroma.ActivityTest do    use Pleroma.DataCase    alias Pleroma.Activity    alias Pleroma.Bookmark +  alias Pleroma.ObanHelpers    alias Pleroma.Object    alias Pleroma.ThreadMute    import Pleroma.Factory @@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do        }        {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"}) -      {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params) +      {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params) +      {:ok, remote_activity} = ObanHelpers.perform(job)        %{local_activity: local_activity, remote_activity: remote_activity, user: user}      end diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex new file mode 100644 index 000000000..54b5a9566 --- /dev/null +++ b/test/support/oban_helpers.ex @@ -0,0 +1,36 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.ObanHelpers do +  @moduledoc """ +  Oban test helpers. +  """ + +  alias Pleroma.Repo + +  def perform(%Oban.Job{} = job) do +    res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job]) +    Repo.delete(job) +    res +  end + +  def perform(jobs) when is_list(jobs) do +    for job <- jobs, do: perform(job) +  end + +  def member?(%{} = job_args, jobs) when is_list(jobs) do +    Enum.any?(jobs, fn job -> +      member?(job_args, job.args) +    end) +  end + +  def member?(%{} = test_attrs, %{} = attrs) do +    Enum.all?( +      test_attrs, +      fn {k, _v} -> member?(test_attrs[k], attrs[k]) end +    ) +  end + +  def member?(x, y), do: x == y +end diff --git a/test/user_test.exs b/test/user_test.exs index 70c376384..ee6d8e8f3 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -5,6 +5,7 @@  defmodule Pleroma.UserTest do    alias Pleroma.Activity    alias Pleroma.Builders.UserBuilder +  alias Pleroma.ObanHelpers    alias Pleroma.Object    alias Pleroma.Repo    alias Pleroma.User @@ -1044,8 +1045,16 @@ defmodule Pleroma.UserTest do        {:ok, _user} = User.delete(user) -      assert [%{args: %{"params" => %{"inbox" => "http://mastodon.example.org/inbox"}}}] = +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{ +                   "inbox" => "http://mastodon.example.org/inbox", +                   "id" => "pleroma:fakeid" +                 } +               },                 all_enqueued(worker: Pleroma.Workers.Publisher) +             )        Pleroma.Config.put(config_path, initial_setting)      end diff --git a/test/web/activity_pub/activity_pub_controller_test.exs b/test/web/activity_pub/activity_pub_controller_test.exs index 40344f17e..1d809164f 100644 --- a/test/web/activity_pub/activity_pub_controller_test.exs +++ b/test/web/activity_pub/activity_pub_controller_test.exs @@ -4,15 +4,19 @@  defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do    use Pleroma.Web.ConnCase +  use Oban.Testing, repo: Pleroma.Repo +    import Pleroma.Factory    alias Pleroma.Activity    alias Pleroma.Instances +  alias Pleroma.ObanHelpers    alias Pleroma.Object    alias Pleroma.User    alias Pleroma.Web.ActivityPub.ObjectView    alias Pleroma.Web.ActivityPub.UserView    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.CommonAPI +  alias Pleroma.Workers.Receiver, as: ReceiverWorker    setup_all do      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -232,7 +236,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) + +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -274,7 +279,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{user.nickname}/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -303,7 +308,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{recipient.nickname}/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -382,6 +387,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do        |> post("/users/#{recipient.nickname}/inbox", data)        |> json_response(200) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) +        activity = Activity.get_by_ap_id(data["id"])        assert activity.id @@ -457,6 +464,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{user.nickname}/outbox", data)        result = json_response(conn, 201) +        assert Activity.get_by_ap_id(result["id"])      end diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index 5c1704548..ebe962da2 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -4,8 +4,10 @@  defmodule Pleroma.Web.FederatorTest do    alias Pleroma.Instances +  alias Pleroma.ObanHelpers    alias Pleroma.Web.CommonAPI    alias Pleroma.Web.Federator +  alias Pleroma.Workers.Publisher, as: PublisherWorker    use Pleroma.DataCase    use Oban.Testing, repo: Pleroma.Repo @@ -45,6 +47,7 @@ defmodule Pleroma.Web.FederatorTest do      } do        with_mocks([relay_mock]) do          Federator.publish(activity) +        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))        end        assert_received :relay_publish @@ -58,6 +61,7 @@ defmodule Pleroma.Web.FederatorTest do        with_mocks([relay_mock]) do          Federator.publish(activity) +        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))        end        refute_received :relay_publish @@ -97,8 +101,15 @@ defmodule Pleroma.Web.FederatorTest do        expected_dt = NaiveDateTime.to_iso8601(dt) -      assert [%{args: %{"params" => %{"inbox" => ^inbox1, "unreachable_since" => ^expected_dt}}}] = -               all_enqueued(worker: Pleroma.Workers.Publisher) +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) + +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt} +               }, +               all_enqueued(worker: PublisherWorker) +             )      end      test "it federates only to reachable instances via Websub" do @@ -129,16 +140,18 @@ defmodule Pleroma.Web.FederatorTest do        expected_callback = sub2.callback        expected_dt = NaiveDateTime.to_iso8601(dt) -      assert [ +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) + +      assert ObanHelpers.member?(                 %{ -                 args: %{ -                   "params" => %{ -                     "callback" => ^expected_callback, -                     "unreachable_since" => ^expected_dt -                   } +                 "op" => "publish_one", +                 "params" => %{ +                   "callback" => expected_callback, +                   "unreachable_since" => expected_dt                   } -               } -             ] = all_enqueued(worker: Pleroma.Workers.Publisher) +               }, +               all_enqueued(worker: PublisherWorker) +             )      end      test "it federates only to reachable instances via Salmon" do @@ -172,16 +185,18 @@ defmodule Pleroma.Web.FederatorTest do        expected_dt = NaiveDateTime.to_iso8601(dt) -      assert [ +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) + +      assert ObanHelpers.member?(                 %{ -                 args: %{ -                   "params" => %{ -                     "recipient_id" => ^remote_user2_id, -                     "unreachable_since" => ^expected_dt -                   } +                 "op" => "publish_one", +                 "params" => %{ +                   "recipient_id" => remote_user2_id, +                   "unreachable_since" => expected_dt                   } -               } -             ] = all_enqueued(worker: Pleroma.Workers.Publisher) +               }, +               all_enqueued(worker: PublisherWorker) +             )      end    end @@ -201,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do          "to" => ["https://www.w3.org/ns/activitystreams#Public"]        } -      {:ok, _activity} = Federator.incoming_ap_doc(params) +      assert {:ok, job} = Federator.incoming_ap_doc(params) +      assert {:ok, _activity} = ObanHelpers.perform(job)      end      test "rejects incoming AP docs with incorrect origin" do @@ -219,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do          "to" => ["https://www.w3.org/ns/activitystreams#Public"]        } -      :error = Federator.incoming_ap_doc(params) +      assert {:ok, job} = Federator.incoming_ap_doc(params) +      assert :error = ObanHelpers.perform(job)      end    end  end diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs index 74386d7db..b704a558a 100644 --- a/test/web/websub/websub_test.exs +++ b/test/web/websub/websub_test.exs @@ -4,11 +4,14 @@  defmodule Pleroma.Web.WebsubTest do    use Pleroma.DataCase +  use Oban.Testing, repo: Pleroma.Repo +  alias Pleroma.ObanHelpers    alias Pleroma.Web.Router.Helpers    alias Pleroma.Web.Websub    alias Pleroma.Web.Websub.WebsubClientSubscription    alias Pleroma.Web.Websub.WebsubServerSubscription +  alias Pleroma.Workers.Subscriber, as: SubscriberWorker    import Pleroma.Factory    import Tesla.Mock @@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do          })        _refresh = Websub.refresh_subscriptions() +      ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))        assert still_good == Repo.get(WebsubClientSubscription, still_good.id)        refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id)  | 
