diff options
52 files changed, 664 insertions, 520 deletions
| diff --git a/config/config.exs b/config/config.exs index 758661120..805176461 100644 --- a/config/config.exs +++ b/config/config.exs @@ -445,13 +445,7 @@ config :pleroma, Pleroma.User,      "web"    ] -config :pleroma, Pleroma.Web.Federator.RetryQueue, -  enabled: false, -  max_jobs: 20, -  initial_timeout: 30, -  max_retries: 5 - -config :pleroma_job_queue, :queues, +job_queues = [    federator_incoming: 50,    federator_outgoing: 50,    web_push: 50, @@ -459,6 +453,22 @@ config :pleroma_job_queue, :queues,    transmogrifier: 20,    scheduled_activities: 10,    background: 5 +] + +config :pleroma_job_queue, :queues, job_queues + +config :pleroma, Oban, +  repo: Pleroma.Repo, +  verbose: false, +  prune: {:maxage, 60 * 60 * 24 * 7}, +  queues: job_queues + +config :pleroma, :workers, +  retries: [ +    compile_time_default: 1, +    federator_incoming: 5, +    federator_outgoing: 5 +  ]  config :pleroma, :fetch_initial_posts,    enabled: false, diff --git a/config/test.exs b/config/test.exs index 6f75f39b5..f897b5d48 100644 --- a/config/test.exs +++ b/config/test.exs @@ -63,6 +63,10 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock  config :pleroma_job_queue, disabled: true +config :pleroma, Oban, +  queues: false, +  prune: :disabled +  config :pleroma, Pleroma.ScheduledActivity,    daily_user_limit: 2,    total_user_limit: 3, diff --git a/docs/config.md b/docs/config.md index 20311db54..a558a51d9 100644 --- a/docs/config.md +++ b/docs/config.md @@ -419,13 +419,6 @@ config :pleroma_job_queue, :queues,  This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. -## Pleroma.Web.Federator.RetryQueue - -* `enabled`: If set to `true`, failed federation jobs will be retried -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time. -* `initial_timeout`: The initial timeout in seconds -* `max_retries`: The maximum number of times a federation job is retried -  ## Pleroma.Web.Metadata  * `providers`: a list of metadata providers to enable. Providers available:    * Pleroma.Web.Metadata.Providers.OpenGraph diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 00b06f723..7cf60f44a 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -120,8 +120,8 @@ defmodule Pleroma.Application do          hackney_pool_children() ++          [            %{ -            id: Pleroma.Web.Federator.RetryQueue, -            start: {Pleroma.Web.Federator.RetryQueue, :start_link, []} +            id: Oban, +            start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}            },            %{              id: Pleroma.Web.OAuth.Token.CleanWorker, @@ -233,6 +233,7 @@ defmodule Pleroma.Application do    defp after_supervisor_start do      with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],           true <- digest_config[:active] do +      # TODO: consider replacing with `quantum` scheduler        PleromaJobQueue.schedule(          digest_config[:schedule],          :digest_emails, diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex index 18e67d39b..6e44cc955 100644 --- a/lib/pleroma/digest_email_worker.ex +++ b/lib/pleroma/digest_email_worker.ex @@ -1,7 +1,10 @@  defmodule Pleroma.DigestEmailWorker do +  alias Pleroma.Repo +  alias Pleroma.Workers.Mailer, as: MailerWorker +    import Ecto.Query -  @queue_name :digest_emails +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper    def perform do      config = Pleroma.Config.get([:email_notifications, :digest]) @@ -17,7 +20,11 @@ defmodule Pleroma.DigestEmailWorker do        select: u      )      |> Pleroma.Repo.all() -    |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1])) +    |> Enum.each(fn user -> +      %{"op" => "digest_email", "user_id" => user.id} +      |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails)) +      |> Repo.insert() +    end)    end    @doc """ diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index 2e4657b7c..bb534f602 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -9,6 +9,8 @@ defmodule Pleroma.Emails.Mailer do    The module contains functions to delivery email using Swoosh.Mailer.    """ +  alias Pleroma.Repo +  alias Pleroma.Workers.Mailer, as: MailerWorker    alias Swoosh.DeliveryError    @otp_app :pleroma @@ -17,9 +19,18 @@ defmodule Pleroma.Emails.Mailer do    @spec enabled?() :: boolean()    def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled]) +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    @doc "add email to queue"    def deliver_async(email, config \\ []) do -    PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) +    encoded_email = +      email +      |> :erlang.term_to_binary() +      |> Base.encode64() + +    %{"op" => "email", "encoded_email" => encoded_email, "config" => config} +    |> MailerWorker.new(worker_args(:mailer)) +    |> Repo.insert()    end    @doc "callback to perform send email from queue" diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index 4d7ed4ca1..544c4b687 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do    def set_unreachable(url_or_host, unreachable_since \\ nil)    def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do -    unreachable_since = unreachable_since || DateTime.utc_now() +    unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()      host = host(url_or_host)      existing_record = Repo.get_by(Instance, %{host: host}) @@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do    end    def set_unreachable(_, _), do: {:error, nil} + +  defp parse_datetime(datetime) when is_binary(datetime) do +    NaiveDateTime.from_iso8601(datetime) +  end + +  defp parse_datetime(datetime), do: datetime  end diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex index 65b38622f..cabea51ca 100644 --- a/lib/pleroma/scheduled_activity_worker.ex +++ b/lib/pleroma/scheduled_activity_worker.ex @@ -8,14 +8,18 @@ defmodule Pleroma.ScheduledActivityWorker do    """    alias Pleroma.Config +  alias Pleroma.Repo    alias Pleroma.ScheduledActivity    alias Pleroma.User    alias Pleroma.Web.CommonAPI +    use GenServer    require Logger    @schedule_interval :timer.minutes(1) +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    def start_link do      GenServer.start_link(__MODULE__, nil)    end @@ -45,7 +49,9 @@ defmodule Pleroma.ScheduledActivityWorker do    def handle_info(:perform, state) do      ScheduledActivity.due_activities(@schedule_interval)      |> Enum.each(fn scheduled_activity -> -      PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id]) +      %{"op" => "execute", "activity_id" => scheduled_activity.id} +      |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities)) +      |> Repo.insert()      end)      schedule_next() diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index b67743846..32fde2b6b 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -26,6 +26,7 @@ defmodule Pleroma.User do    alias Pleroma.Web.OStatus    alias Pleroma.Web.RelMe    alias Pleroma.Web.Websub +  alias Pleroma.Workers.BackgroundWorker    require Logger @@ -39,6 +40,8 @@ defmodule Pleroma.User do    @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/    @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/ +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    schema "users" do      field(:bio, :string)      field(:email, :string) @@ -583,8 +586,11 @@ defmodule Pleroma.User do    end    @doc "Fetch some posts when the user has just been federated with" -  def fetch_initial_posts(user), -    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) +  def fetch_initial_posts(user) do +    %{"op" => "fetch_initial_posts", "user_id" => user.id} +    |> BackgroundWorker.new(worker_args(:background)) +    |> Repo.insert() +  end    @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()    def get_followers_query(%User{} = user, nil) do @@ -1005,7 +1011,9 @@ defmodule Pleroma.User do    end    def deactivate_async(user, status \\ true) do -    PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status]) +    %{"op" => "deactivate_user", "user_id" => user.id, "status" => status} +    |> BackgroundWorker.new(worker_args(:background)) +    |> Repo.insert()    end    def deactivate(%User{} = user, status \\ true) do @@ -1033,9 +1041,11 @@ defmodule Pleroma.User do      |> update_and_set_cache()    end -  @spec delete(User.t()) :: :ok -  def delete(%User{} = user), -    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user]) +  def delete(%User{} = user) do +    %{"op" => "delete_user", "user_id" => user.id} +    |> BackgroundWorker.new(worker_args(:background)) +    |> Repo.insert() +  end    @spec perform(atom(), User.t()) :: {:ok, User.t()}    def perform(:delete, %User{} = user) do @@ -1142,21 +1152,26 @@ defmodule Pleroma.User do      Repo.all(query)    end -  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), -    do: -      PleromaJobQueue.enqueue(:background, __MODULE__, [ -        :blocks_import, -        blocker, -        blocked_identifiers -      ]) - -  def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), -    do: -      PleromaJobQueue.enqueue(:background, __MODULE__, [ -        :follow_import, -        follower, -        followed_identifiers -      ]) +  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do +    %{ +      "op" => "blocks_import", +      "blocker_id" => blocker.id, +      "blocked_identifiers" => blocked_identifiers +    } +    |> BackgroundWorker.new(worker_args(:background)) +    |> Repo.insert() +  end + +  def follow_import(%User{} = follower, followed_identifiers) +      when is_list(followed_identifiers) do +    %{ +      "op" => "follow_import", +      "follower_id" => follower.id, +      "followed_identifiers" => followed_identifiers +    } +    |> BackgroundWorker.new(worker_args(:background)) +    |> Repo.insert() +  end    def delete_user_activities(%User{ap_id: ap_id} = user) do      ap_id diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index cf55c9520..233746047 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    alias Pleroma.Web.ActivityPub.MRF    alias Pleroma.Web.ActivityPub.Transmogrifier    alias Pleroma.Web.WebFinger +  alias Pleroma.Workers.BackgroundWorker    import Ecto.Query    import Pleroma.Web.ActivityPub.Utils @@ -25,6 +26,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    require Logger    require Pleroma.Constants +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    # For Announce activities, we filter the recipients based on following status for any actors    # that match actual users.  See issue #164 for more information about why this is necessary.    defp get_recipients(%{"type" => "Announce"} = data) do @@ -145,7 +148,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do            activity          end -      PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) +      %{"op" => "fetch_data_for_activity", "activity_id" => activity.id} +      |> BackgroundWorker.new(worker_args(:background)) +      |> Repo.insert()        Notification.create_notifications(activity) diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index a179dd54d..b188164ee 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -7,7 +7,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do    @behaviour Pleroma.Web.ActivityPub.MRF    alias Pleroma.HTTP +  alias Pleroma.Repo    alias Pleroma.Web.MediaProxy +  alias Pleroma.Workers.BackgroundWorker    require Logger @@ -16,6 +18,8 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do      recv_timeout: 10_000    ] +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    def perform(:prefetch, url) do      Logger.info("Prefetching #{inspect(url)}") @@ -30,7 +34,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do          url          |> Enum.each(fn            %{"href" => href} -> -            PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) +            %{"op" => "media_proxy_prefetch", "url" => href} +            |> BackgroundWorker.new(worker_args(:background)) +            |> Repo.insert()            x ->              Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -46,7 +52,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do          %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message        )        when is_list(attachments) and length(attachments) > 0 do -    PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) +    %{"op" => "media_proxy_preload", "message" => message} +    |> BackgroundWorker.new(worker_args(:background)) +    |> Repo.insert()      {:ok, message}    end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 46edab0bd..29f3221d1 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -85,6 +85,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do      end    end +  def publish_one(%{actor_id: actor_id} = params) do +    actor = User.get_by_id(actor_id) + +    params +    |> Map.delete(:actor_id) +    |> Map.put(:actor, actor) +    |> publish_one() +  end +    defp should_federate?(inbox, public) do      if public do        true @@ -160,7 +169,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do    Publishes an activity with BCC to all relevant peers.    """ -  def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do +  def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) +      when is_list(bcc) and bcc != [] do      public = is_public?(activity)      {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -187,7 +197,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do        Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{          inbox: inbox,          json: json, -        actor: actor, +        actor_id: actor.id,          id: activity.data["id"],          unreachable_since: unreachable_since        }) @@ -222,7 +232,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do          %{            inbox: inbox,            json: json, -          actor: actor, +          actor_id: actor.id,            id: activity.data["id"],            unreachable_since: unreachable_since          } diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 0fcc81bf3..2cb7ca8d1 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -15,12 +15,15 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.ActivityPub.Visibility    alias Pleroma.Web.Federator +  alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker    import Ecto.Query    require Logger    require Pleroma.Constants +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    @doc """    Modifies an incoming AP object (mastodon format) to our internal format.    """ @@ -1043,7 +1046,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do           already_ap <- User.ap_enabled?(user),           {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do        unless already_ap do -        PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) +        %{"op" => "user_upgrade", "user_id" => user.id} +        |> TransmogrifierWorker.new(worker_args(:transmogrifier)) +        |> Repo.insert()        end        {:ok, user} diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index fc5305c58..4f68acc78 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do    """    def maybe_federate(%Activity{local: true} = activity) do      if Pleroma.Config.get!([:instance, :federating]) do -      priority = -        case activity.data["type"] do -          "Delete" -> 10 -          "Create" -> 1 -          _ -> 5 -        end - -      Pleroma.Web.Federator.publish(activity, priority) +      Pleroma.Web.Federator.publish(activity)      end      :ok diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..d85fe824f 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -10,16 +10,19 @@ defmodule Pleroma.Web.Federator do    alias Pleroma.Web.ActivityPub.Transmogrifier    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.Federator.Publisher -  alias Pleroma.Web.Federator.RetryQueue    alias Pleroma.Web.OStatus    alias Pleroma.Web.Websub +  alias Pleroma.Workers.Publisher, as: PublisherWorker +  alias Pleroma.Workers.Receiver, as: ReceiverWorker +  alias Pleroma.Workers.Subscriber, as: SubscriberWorker    require Logger +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    def init do      # 1 minute -    Process.sleep(1000 * 60) -    refresh_subscriptions() +    refresh_subscriptions(schedule_in: 60)    end    @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -37,50 +40,50 @@ defmodule Pleroma.Web.Federator do    # Client API    def incoming_doc(doc) do -    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) +    %{"op" => "incoming_doc", "body" => doc} +    |> ReceiverWorker.new(worker_args(:federator_incoming)) +    |> Pleroma.Repo.insert()    end    def incoming_ap_doc(params) do -    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) +    %{"op" => "incoming_ap_doc", "params" => params} +    |> ReceiverWorker.new(worker_args(:federator_incoming)) +    |> Pleroma.Repo.insert()    end -  def publish(activity, priority \\ 1) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) +  def publish(%{id: "pleroma:fakeid"} = activity) do +    perform(:publish, activity)    end -  def verify_websub(websub) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) +  def publish(activity) do +    %{"op" => "publish", "activity_id" => activity.id} +    |> PublisherWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def request_subscription(sub) do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) +  def verify_websub(websub) do +    %{"op" => "verify_websub", "websub_id" => websub.id} +    |> SubscriberWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def refresh_subscriptions do -    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) +  def request_subscription(websub) do +    %{"op" => "request_subscription", "websub_id" => websub.id} +    |> SubscriberWorker.new(worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  # Job Worker Callbacks - -  def perform(:refresh_subscriptions) do -    Logger.debug("Federator running refresh subscriptions") -    Websub.refresh_subscriptions() - -    spawn(fn -> -      # 6 hours -      Process.sleep(1000 * 60 * 60 * 6) -      refresh_subscriptions() -    end) +  def refresh_subscriptions(worker_args \\ []) do +    %{"op" => "refresh_subscriptions"} +    |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) +    |> Pleroma.Repo.insert()    end -  def perform(:request_subscription, websub) do -    Logger.debug("Refreshing #{websub.topic}") +  # Job Worker Callbacks -    with {:ok, websub} <- Websub.request_subscription(websub) do -      Logger.debug("Successfully refreshed #{websub.topic}") -    else -      _e -> Logger.debug("Couldn't refresh #{websub.topic}") -    end +  @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} +  def perform(:publish_one, module, params) do +    apply(module, :publish_one, [params])    end    def perform(:publish, activity) do @@ -92,14 +95,6 @@ defmodule Pleroma.Web.Federator do      end    end -  def perform(:verify_websub, websub) do -    Logger.debug(fn -> -      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" -    end) - -    Websub.verify(websub) -  end -    def perform(:incoming_doc, doc) do      Logger.info("Got document, trying to parse")      OStatus.handle_incoming(doc) @@ -130,22 +125,33 @@ defmodule Pleroma.Web.Federator do      end    end -  def perform( -        :publish_single_websub, -        %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params -      ) do -    case Websub.publish_one(params) do -      {:ok, _} -> -        :ok +  def perform(:request_subscription, websub) do +    Logger.debug("Refreshing #{websub.topic}") -      {:error, _} -> -        RetryQueue.enqueue(params, Websub) +    with {:ok, websub} <- Websub.request_subscription(websub) do +      Logger.debug("Successfully refreshed #{websub.topic}") +    else +      _e -> Logger.debug("Couldn't refresh #{websub.topic}")      end    end -  def perform(type, _) do -    Logger.debug(fn -> "Unknown task: #{type}" end) -    {:error, "Don't know what to do with this"} +  def perform(:verify_websub, websub) do +    Logger.debug(fn -> +      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" +    end) + +    Websub.verify(websub) +  end + +  def perform(:refresh_subscriptions) do +    Logger.debug("Federator running refresh subscriptions") +    Websub.refresh_subscriptions() + +    spawn(fn -> +      # 6 hours +      Process.sleep(1000 * 60 * 60 * 6) +      refresh_subscriptions() +    end)    end    def ap_enabled_actor(id) do diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 70f870244..05d2be615 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do    alias Pleroma.Activity    alias Pleroma.Config    alias Pleroma.User -  alias Pleroma.Web.Federator.RetryQueue +  alias Pleroma.Workers.Publisher, as: PublisherWorker    require Logger @@ -30,23 +30,17 @@ defmodule Pleroma.Web.Federator.Publisher do    Enqueue publishing a single activity.    """    @spec enqueue_one(module(), Map.t()) :: :ok -  def enqueue_one(module, %{} = params), -    do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) - -  @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} -  def perform(:publish_one, module, params) do -    case apply(module, :publish_one, [params]) do -      {:ok, _} -> -        :ok - -      {:error, _e} -> -        RetryQueue.enqueue(params, module) -    end -  end +  def enqueue_one(module, %{} = params) do +    worker_args = +      if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do +        [max_attempts: max_attempts] +      else +        [] +      end -  def perform(type, _, _) do -    Logger.debug("Unknown task: #{type}") -    {:error, "Don't know what to do with this"} +    %{"op" => "publish_one", "module" => to_string(module), "params" => params} +    |> PublisherWorker.new(worker_args) +    |> Pleroma.Repo.insert()    end    @doc """ diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex deleted file mode 100644 index 3db948c2e..000000000 --- a/lib/pleroma/web/federator/retry_queue.ex +++ /dev/null @@ -1,239 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Federator.RetryQueue do -  use GenServer - -  require Logger - -  def init(args) do -    queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) - -    {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} -  end - -  def start_link do -    enabled = -      if Pleroma.Config.get(:env) == :test, -        do: true, -        else: Pleroma.Config.get([__MODULE__, :enabled], false) - -    if enabled do -      Logger.info("Starting retry queue") - -      linkres = -        GenServer.start_link( -          __MODULE__, -          %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, -          name: __MODULE__ -        ) - -      maybe_kickoff_timer() -      linkres -    else -      Logger.info("Retry queue disabled") -      :ignore -    end -  end - -  def enqueue(data, transport, retries \\ 0) do -    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) -  end - -  def get_stats do -    GenServer.call(__MODULE__, :get_stats) -  end - -  def reset_stats do -    GenServer.call(__MODULE__, :reset_stats) -  end - -  def get_retry_params(retries) do -    if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do -      {:drop, "Max retries reached"} -    else -      {:retry, growth_function(retries)} -    end -  end - -  def get_retry_timer_interval do -    Pleroma.Config.get([:retry_queue, :interval], 1000) -  end - -  defp ets_count_expires(table, current_time) do -    :ets.select_count( -      table, -      [ -        { -          {:"$1", :"$2"}, -          [{:"=<", :"$1", {:const, current_time}}], -          [true] -        } -      ] -    ) -  end - -  defp ets_pop_n_expired(table, current_time, desired) do -    {popped, _continuation} = -      :ets.select( -        table, -        [ -          { -            {:"$1", :"$2"}, -            [{:"=<", :"$1", {:const, current_time}}], -            [:"$_"] -          } -        ], -        desired -      ) - -    popped -    |> Enum.each(fn e -> -      :ets.delete_object(table, e) -    end) - -    popped -  end - -  def maybe_start_job(running_jobs, queue_table) do -    # we don't want to hit the ets or the DateTime more times than we have to -    # could optimize slightly further by not using the count, and instead grabbing -    # up to N objects early... -    current_time = DateTime.to_unix(DateTime.utc_now()) -    n_running_jobs = :sets.size(running_jobs) - -    if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do -      n_ready_jobs = ets_count_expires(queue_table, current_time) - -      if n_ready_jobs > 0 do -        # figure out how many we could start -        available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs -        start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) -      else -        running_jobs -      end -    else -      running_jobs -    end -  end - -  defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do -    running_jobs -  end - -  defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) -       when available_job_slots > 0 do -    candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) - -    candidates -    |> List.foldl(running_jobs, fn {_, e}, rj -> -      {:ok, pid} = Task.start(fn -> worker(e) end) -      mref = Process.monitor(pid) -      :sets.add_element(mref, rj) -    end) -  end - -  def worker({:send, data, transport, retries}) do -    case transport.publish_one(data) do -      {:ok, _} -> -        GenServer.cast(__MODULE__, :inc_delivered) -        :delivered - -      {:error, _reason} -> -        enqueue(data, transport, retries) -        :retry -    end -  end - -  def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do -    {:reply, %{delivered: delivery_count, dropped: drop_count}, state} -  end - -  def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do -    {:reply, %{delivered: delivery_count, dropped: drop_count}, -     %{state | delivered: 0, dropped: 0}} -  end - -  def handle_cast(:reset_stats, state) do -    {:noreply, %{state | delivered: 0, dropped: 0}} -  end - -  def handle_cast( -        {:maybe_enqueue, data, transport, retries}, -        %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state -      ) do -    case get_retry_params(retries) do -      {:retry, timeout} -> -        :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) -        running_jobs = maybe_start_job(running_jobs, queue_table) -        {:noreply, %{state | running_jobs: running_jobs}} - -      {:drop, message} -> -        Logger.debug(message) -        {:noreply, %{state | dropped: drop_count + 1}} -    end -  end - -  def handle_cast(:kickoff_timer, state) do -    retry_interval = get_retry_timer_interval() -    Process.send_after(__MODULE__, :retry_timer_run, retry_interval) -    {:noreply, state} -  end - -  def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do -    {:noreply, %{state | delivered: delivery_count + 1}} -  end - -  def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do -    {:noreply, %{state | dropped: drop_count + 1}} -  end - -  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do -    case transport.publish_one(data) do -      {:ok, _} -> -        {:noreply, %{state | delivered: delivery_count + 1}} - -      {:error, _reason} -> -        enqueue(data, transport, retries) -        {:noreply, state} -    end -  end - -  def handle_info( -        :retry_timer_run, -        %{queue_table: queue_table, running_jobs: running_jobs} = state -      ) do -    maybe_kickoff_timer() -    running_jobs = maybe_start_job(running_jobs, queue_table) -    {:noreply, %{state | running_jobs: running_jobs}} -  end - -  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do -    %{running_jobs: running_jobs, queue_table: queue_table} = state -    running_jobs = :sets.del_element(ref, running_jobs) -    running_jobs = maybe_start_job(running_jobs, queue_table) -    {:noreply, %{state | running_jobs: running_jobs}} -  end - -  def handle_info(unknown, state) do -    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") -    {:noreply, state} -  end - -  if Pleroma.Config.get(:env) == :test do -    defp growth_function(_retries) do -      _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) -      DateTime.to_unix(DateTime.utc_now()) - 1 -    end -  else -    defp growth_function(retries) do -      round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + -        DateTime.to_unix(DateTime.utc_now()) -    end -  end - -  defp maybe_kickoff_timer do -    GenServer.cast(__MODULE__, :kickoff_timer) -  end -end diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex index dca852449..c0c9c3653 100644 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ b/lib/pleroma/web/oauth/token/clean_worker.ex @@ -14,9 +14,12 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do                [:oauth2, :clean_expired_tokens_interval],                86_400_000              ) -  @queue :background +  alias Pleroma.Repo    alias Pleroma.Web.OAuth.Token +  alias Pleroma.Workers.BackgroundWorker + +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper    def start_link, do: GenServer.start_link(__MODULE__, nil) @@ -31,8 +34,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do    @doc false    def handle_info(:perform, state) do +    %{"op" => "clean_expired_tokens"} +    |> BackgroundWorker.new(worker_args(:background)) +    |> Repo.insert() +      Process.send_after(self(), :perform, @interval) -    PleromaJobQueue.enqueue(@queue, __MODULE__, [:clean])      {:noreply, state}    end diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex index 729dad02a..b4f0e5127 100644 --- a/lib/pleroma/web/push/push.ex +++ b/lib/pleroma/web/push/push.ex @@ -3,10 +3,13 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.Push do -  alias Pleroma.Web.Push.Impl +  alias Pleroma.Repo +  alias Pleroma.Workers.WebPusher    require Logger +  defdelegate worker_args(queue), to: Pleroma.Workers.Helper +    def init do      unless enabled() do        Logger.warn(""" @@ -31,6 +34,9 @@ defmodule Pleroma.Web.Push do      end    end -  def send(notification), -    do: PleromaJobQueue.enqueue(:web_push, Impl, [notification]) +  def send(notification) do +    %{"op" => "web_push", "notification_id" => notification.id} +    |> WebPusher.new(worker_args(:web_push)) +    |> Repo.insert() +  end  end diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 9b01ebcc6..bbaa293fd 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do      end    end +  def publish_one(%{recipient_id: recipient_id} = params) do +    recipient = User.get_by_id(recipient_id) + +    params +    |> Map.delete(:recipient_id) +    |> Map.put(:recipient, recipient) +    |> publish_one() +  end +    def publish_one(_), do: :noop    @supported_activities [ @@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do          Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)          Publisher.enqueue_one(__MODULE__, %{ -          recipient: remote_user, +          recipient_id: remote_user.id,            feed: feed,            unreachable_since: reachable_urls_metadata[remote_user.info.salmon]          }) diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex index 3405bd3b7..7ba4ad305 100644 --- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex +++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do               String.split(line, ",") |> List.first()             end)             |> List.delete("Account address") do -      PleromaJobQueue.enqueue(:background, User, [ -        :follow_import, -        follower, -        followed_identifiers -      ]) - +      User.follow_import(follower, followed_identifiers)        json(conn, "job started")      end    end @@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do    def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do      with blocked_identifiers <- String.split(list) do -      PleromaJobQueue.enqueue(:background, User, [ -        :blocks_import, -        blocker, -        blocked_identifiers -      ]) - +      User.blocks_import(blocker, blocked_identifiers)        json(conn, "job started")      end    end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex new file mode 100644 index 000000000..3ab2b6bcc --- /dev/null +++ b/lib/pleroma/workers/background_worker.ex @@ -0,0 +1,66 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.BackgroundWorker do +  alias Pleroma.Activity +  alias Pleroma.User +  alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy +  alias Pleroma.Web.OAuth.Token.CleanWorker + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "background", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}) do +    user = User.get_by_id(user_id) +    User.perform(:fetch_initial_posts, user) +  end + +  def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}) do +    user = User.get_by_id(user_id) +    User.perform(:deactivate_async, user, status) +  end + +  def perform(%{"op" => "delete_user", "user_id" => user_id}) do +    user = User.get_by_id(user_id) +    User.perform(:delete, user) +  end + +  def perform(%{ +        "op" => "blocks_import", +        "blocker_id" => blocker_id, +        "blocked_identifiers" => blocked_identifiers +      }) do +    blocker = User.get_by_id(blocker_id) +    User.perform(:blocks_import, blocker, blocked_identifiers) +  end + +  def perform(%{ +        "op" => "follow_import", +        "follower_id" => follower_id, +        "followed_identifiers" => followed_identifiers +      }) do +    follower = User.get_by_id(follower_id) +    User.perform(:follow_import, follower, followed_identifiers) +  end + +  def perform(%{"op" => "clean_expired_tokens"}) do +    CleanWorker.perform(:clean) +  end + +  def perform(%{"op" => "media_proxy_preload", "message" => message}) do +    MediaProxyWarmingPolicy.perform(:preload, message) +  end + +  def perform(%{"op" => "media_proxy_prefetch", "url" => url}) do +    MediaProxyWarmingPolicy.perform(:prefetch, url) +  end + +  def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}) do +    activity = Activity.get_by_id(activity_id) +    Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) +  end +end diff --git a/lib/pleroma/workers/helper.ex b/lib/pleroma/workers/helper.ex new file mode 100644 index 000000000..3286ce0e8 --- /dev/null +++ b/lib/pleroma/workers/helper.ex @@ -0,0 +1,13 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Helper do +  def worker_args(queue) do +    if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do +      [max_attempts: max_attempts] +    else +      [] +    end +  end +end diff --git a/lib/pleroma/workers/mailer.ex b/lib/pleroma/workers/mailer.ex new file mode 100644 index 000000000..8bf9952bc --- /dev/null +++ b/lib/pleroma/workers/mailer.ex @@ -0,0 +1,27 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Mailer do +  alias Pleroma.User + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "mailer", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}) do +    email = +      encoded_email +      |> Base.decode64!() +      |> :erlang.binary_to_term() + +    Pleroma.Emails.Mailer.deliver(email, config) +  end + +  def perform(%{"op" => "digest_email", "user_id" => user_id}) do +    user = User.get_by_id(user_id) +    Pleroma.DigestEmailWorker.perform(user) +  end +end diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex new file mode 100644 index 000000000..c890ffb79 --- /dev/null +++ b/lib/pleroma/workers/publisher.ex @@ -0,0 +1,23 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Publisher do +  alias Pleroma.Activity +  alias Pleroma.Web.Federator + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_outgoing", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "publish", "activity_id" => activity_id}) do +    activity = Activity.get_by_id(activity_id) +    Federator.perform(:publish, activity) +  end + +  def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do +    Federator.perform(:publish_one, String.to_atom(module_name), params) +  end +end diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex new file mode 100644 index 000000000..d3de95716 --- /dev/null +++ b/lib/pleroma/workers/receiver.ex @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Receiver do +  alias Pleroma.Web.Federator + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_incoming", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "incoming_doc", "body" => doc}) do +    Federator.perform(:incoming_doc, doc) +  end + +  def perform(%{"op" => "incoming_ap_doc", "params" => params}) do +    Federator.perform(:incoming_ap_doc, params) +  end +end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex new file mode 100644 index 000000000..a49834fd8 --- /dev/null +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ScheduledActivityWorker do +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "scheduled_activities", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "execute", "activity_id" => activity_id}) do +    Pleroma.ScheduledActivityWorker.perform(:execute, activity_id) +  end +end diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex new file mode 100644 index 000000000..6af3ad0a1 --- /dev/null +++ b/lib/pleroma/workers/subscriber.ex @@ -0,0 +1,29 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Subscriber do +  alias Pleroma.Repo +  alias Pleroma.Web.Federator +  alias Pleroma.Web.Websub.WebsubClientSubscription + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "federator_outgoing", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "refresh_subscriptions"}) do +    Federator.perform(:refresh_subscriptions) +  end + +  def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do +    websub = Repo.get(WebsubClientSubscription, websub_id) +    Federator.perform(:request_subscription, websub) +  end + +  def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do +    websub = Repo.get(WebsubClientSubscription, websub_id) +    Federator.perform(:verify_websub, websub) +  end +end diff --git a/lib/pleroma/workers/transmogrifier.ex b/lib/pleroma/workers/transmogrifier.ex new file mode 100644 index 000000000..c6b4fab47 --- /dev/null +++ b/lib/pleroma/workers/transmogrifier.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Transmogrifier do +  alias Pleroma.User + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "transmogrifier", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "user_upgrade", "user_id" => user_id}) do +    user = User.get_by_id(user_id) +    Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) +  end +end diff --git a/lib/pleroma/workers/web_pusher.ex b/lib/pleroma/workers/web_pusher.ex new file mode 100644 index 000000000..b99581eb0 --- /dev/null +++ b/lib/pleroma/workers/web_pusher.ex @@ -0,0 +1,19 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WebPusher do +  alias Pleroma.Notification +  alias Pleroma.Repo + +  # Note: `max_attempts` is intended to be overridden in `new/1` call +  use Oban.Worker, +    queue: "web_push", +    max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + +  @impl Oban.Worker +  def perform(%{"op" => "web_push", "notification_id" => notification_id}) do +    notification = Repo.get(Notification, notification_id) +    Pleroma.Web.Push.Impl.perform(notification) +  end +end @@ -101,6 +101,7 @@ defmodule Pleroma.Mixfile do        {:phoenix_ecto, "~> 4.0"},        {:ecto_sql, "~> 3.1"},        {:postgrex, ">= 0.13.5"}, +      {:oban, "~> 0.6"},        {:gettext, "~> 0.15"},        {:comeonin, "~> 4.1.1"},        {:pbkdf2_elixir, "~> 0.12.3"}, @@ -57,6 +57,7 @@    "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},    "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},    "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, +  "oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},    "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},    "pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},    "phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs new file mode 100644 index 000000000..2f201bd05 --- /dev/null +++ b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs @@ -0,0 +1,6 @@ +defmodule Pleroma.Repo.Migrations.AddObanJobsTable do +  use Ecto.Migration + +  defdelegate up, to: Oban.Migrations +  defdelegate down, to: Oban.Migrations +end diff --git a/test/activity_test.exs b/test/activity_test.exs index b27f6fd36..658c47837 100644 --- a/test/activity_test.exs +++ b/test/activity_test.exs @@ -7,6 +7,7 @@ defmodule Pleroma.ActivityTest do    alias Pleroma.Activity    alias Pleroma.Bookmark    alias Pleroma.Object +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.ThreadMute    import Pleroma.Factory @@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do        }        {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"}) -      {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params) +      {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params) +      {:ok, remote_activity} = ObanHelpers.perform(job)        %{local_activity: local_activity, remote_activity: remote_activity, user: user}      end diff --git a/test/conversation_test.exs b/test/conversation_test.exs index aa193e0d4..f917aa691 100644 --- a/test/conversation_test.exs +++ b/test/conversation_test.exs @@ -28,6 +28,8 @@ defmodule Pleroma.ConversationTest do      {:ok, _activity} =        CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"}) +    Pleroma.Tests.ObanHelpers.perform_all() +      Repo.delete_all(Conversation)      Repo.delete_all(Conversation.Participation) diff --git a/test/mix/tasks/pleroma.digest_test.exs b/test/mix/tasks/pleroma.digest_test.exs index 595f64ed7..5fbeac0d6 100644 --- a/test/mix/tasks/pleroma.digest_test.exs +++ b/test/mix/tasks/pleroma.digest_test.exs @@ -4,6 +4,7 @@ defmodule Mix.Tasks.Pleroma.DigestTest do    import Pleroma.Factory    import Swoosh.TestAssertions +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.Web.CommonAPI    setup_all do @@ -39,6 +40,8 @@ defmodule Mix.Tasks.Pleroma.DigestTest do        :ok = Mix.Tasks.Pleroma.Digest.run(["test", user2.nickname, yesterday_date]) +      ObanHelpers.perform_all() +        assert_receive {:mix_shell, :info, [message]}        assert message =~ "Digest email have been sent" diff --git a/test/notification_test.exs b/test/notification_test.exs index 80ea2a085..e1c9f4f93 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -8,6 +8,7 @@ defmodule Pleroma.NotificationTest do    import Pleroma.Factory    alias Pleroma.Notification +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.Web.ActivityPub.Transmogrifier    alias Pleroma.Web.CommonAPI @@ -621,7 +622,8 @@ defmodule Pleroma.NotificationTest do        refute Enum.empty?(Notification.for_user(other_user)) -      User.delete(user) +      {:ok, job} = User.delete(user) +      ObanHelpers.perform(job)        assert Enum.empty?(Notification.for_user(other_user))      end @@ -666,6 +668,7 @@ defmodule Pleroma.NotificationTest do        }        {:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message) +      ObanHelpers.perform_all()        assert Enum.empty?(Notification.for_user(local_user))      end diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex new file mode 100644 index 000000000..d379c9ec7 --- /dev/null +++ b/test/support/oban_helpers.ex @@ -0,0 +1,42 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Tests.ObanHelpers do +  @moduledoc """ +  Oban test helpers. +  """ + +  alias Pleroma.Repo + +  def perform_all do +    Oban.Job +    |> Repo.all() +    |> perform() +  end + +  def perform(%Oban.Job{} = job) do +    res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job]) +    Repo.delete(job) +    res +  end + +  def perform(jobs) when is_list(jobs) do +    for job <- jobs, do: perform(job) +  end + +  def member?(%{} = job_args, jobs) when is_list(jobs) do +    Enum.any?(jobs, fn job -> +      member?(job_args, job.args) +    end) +  end + +  def member?(%{} = test_attrs, %{} = attrs) do +    Enum.all?( +      test_attrs, +      fn {k, _v} -> member?(test_attrs[k], attrs[k]) end +    ) +  end + +  def member?(x, y), do: x == y +end diff --git a/test/user_test.exs b/test/user_test.exs index b363b322c..2b955ced0 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -7,14 +7,15 @@ defmodule Pleroma.UserTest do    alias Pleroma.Builders.UserBuilder    alias Pleroma.Object    alias Pleroma.Repo +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.CommonAPI    use Pleroma.DataCase +  use Oban.Testing, repo: Pleroma.Repo    import Pleroma.Factory -  import Mock    setup_all do      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -678,7 +679,9 @@ defmodule Pleroma.UserTest do          user3.nickname        ] -      result = User.follow_import(user1, identifiers) +      {:ok, job} = User.follow_import(user1, identifiers) +      result = ObanHelpers.perform(job) +        assert is_list(result)        assert result == [user2, user3]      end @@ -889,7 +892,9 @@ defmodule Pleroma.UserTest do          user3.nickname        ] -      result = User.blocks_import(user1, identifiers) +      {:ok, job} = User.blocks_import(user1, identifiers) +      result = ObanHelpers.perform(job) +        assert is_list(result)        assert result == [user2, user3]      end @@ -1015,7 +1020,8 @@ defmodule Pleroma.UserTest do        {:ok, like_two, _} = CommonAPI.favorite(activity.id, follower)        {:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user) -      {:ok, _} = User.delete(user) +      {:ok, job} = User.delete(user) +      {:ok, _user} = ObanHelpers.perform(job)        follower = User.get_cached_by_id(follower.id) @@ -1037,11 +1043,7 @@ defmodule Pleroma.UserTest do        refute Activity.get_by_id(repeat.id)      end -    test_with_mock "it sends out User Delete activity", -                   %{user: user}, -                   Pleroma.Web.ActivityPub.Publisher, -                   [:passthrough], -                   [] do +    test "it sends out User Delete activity", %{user: user} do        config_path = [:instance, :federating]        initial_setting = Pleroma.Config.get(config_path)        Pleroma.Config.put(config_path, true) @@ -1049,12 +1051,18 @@ defmodule Pleroma.UserTest do        {:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin")        {:ok, _} = User.follow(follower, user) -      {:ok, _user} = User.delete(user) - -      assert called( -               Pleroma.Web.ActivityPub.Publisher.publish_one(%{ -                 inbox: "http://mastodon.example.org/inbox" -               }) +      {:ok, job} = User.delete(user) +      {:ok, _user} = ObanHelpers.perform(job) + +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{ +                   "inbox" => "http://mastodon.example.org/inbox", +                   "id" => "pleroma:fakeid" +                 } +               }, +               all_enqueued(worker: Pleroma.Workers.Publisher)               )        Pleroma.Config.put(config_path, initial_setting) @@ -1101,7 +1109,8 @@ defmodule Pleroma.UserTest do      test "User.delete() plugs any possible zombie objects" do        user = insert(:user) -      {:ok, _} = User.delete(user) +      {:ok, job} = User.delete(user) +      {:ok, _} = ObanHelpers.perform(job)        {:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}") diff --git a/test/web/activity_pub/activity_pub_controller_test.exs b/test/web/activity_pub/activity_pub_controller_test.exs index 251055ee1..f46353fdd 100644 --- a/test/web/activity_pub/activity_pub_controller_test.exs +++ b/test/web/activity_pub/activity_pub_controller_test.exs @@ -4,15 +4,19 @@  defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do    use Pleroma.Web.ConnCase +  use Oban.Testing, repo: Pleroma.Repo +    import Pleroma.Factory    alias Pleroma.Activity    alias Pleroma.Instances    alias Pleroma.Object +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.Web.ActivityPub.ObjectView    alias Pleroma.Web.ActivityPub.UserView    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.CommonAPI +  alias Pleroma.Workers.Receiver, as: ReceiverWorker    setup_all do      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -279,7 +283,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) + +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -321,7 +326,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{user.nickname}/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -350,7 +355,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{recipient.nickname}/inbox", data)        assert "ok" == json_response(conn, 200) -      :timer.sleep(500) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))        assert Activity.get_by_ap_id(data["id"])      end @@ -429,6 +434,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do        |> post("/users/#{recipient.nickname}/inbox", data)        |> json_response(200) +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) +        activity = Activity.get_by_ap_id(data["id"])        assert activity.id @@ -504,6 +511,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do          |> post("/users/#{user.nickname}/outbox", data)        result = json_response(conn, 201) +        assert Activity.get_by_ap_id(result["id"])      end diff --git a/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs b/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs index 372e789be..95a809d25 100644 --- a/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs +++ b/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs @@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do    use Pleroma.DataCase    alias Pleroma.HTTP +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy    import Mock @@ -24,6 +25,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do    test "it prefetches media proxy URIs" do      with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do        MediaProxyWarmingPolicy.filter(@message) + +      ObanHelpers.perform_all() +      # Performing jobs which has been just enqueued +      ObanHelpers.perform_all() +        assert called(HTTP.get(:_, :_, :_))      end    end diff --git a/test/web/activity_pub/publisher_test.exs b/test/web/activity_pub/publisher_test.exs index 36a39c84c..26d019878 100644 --- a/test/web/activity_pub/publisher_test.exs +++ b/test/web/activity_pub/publisher_test.exs @@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do        assert called(                 Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{                   inbox: "https://domain.com/users/nick1/inbox", -                 actor: actor, +                 actor_id: actor.id,                   id: note_activity.data["id"]                 })               ) diff --git a/test/web/activity_pub/transmogrifier_test.exs b/test/web/activity_pub/transmogrifier_test.exs index 060b91e29..e80263328 100644 --- a/test/web/activity_pub/transmogrifier_test.exs +++ b/test/web/activity_pub/transmogrifier_test.exs @@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do    alias Pleroma.Object    alias Pleroma.Object.Fetcher    alias Pleroma.Repo +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.ActivityPub.Transmogrifier @@ -584,6 +585,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do          |> Poison.decode!()        {:ok, _} = Transmogrifier.handle_incoming(data) +      ObanHelpers.perform_all()        refute User.get_cached_by_ap_id(ap_id)      end @@ -1146,6 +1148,8 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do        assert user.info.note_count == 1        {:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye") +      ObanHelpers.perform_all() +        assert user.info.ap_enabled        assert user.info.note_count == 1        assert user.follower_address == "https://niu.moe/users/rye/followers" diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index 73cfaa8f1..9ca341b6d 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -4,9 +4,14 @@  defmodule Pleroma.Web.FederatorTest do    alias Pleroma.Instances +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.Web.CommonAPI    alias Pleroma.Web.Federator +  alias Pleroma.Workers.Publisher, as: PublisherWorker +    use Pleroma.DataCase +  use Oban.Testing, repo: Pleroma.Repo +    import Pleroma.Factory    import Mock @@ -22,15 +27,6 @@ defmodule Pleroma.Web.FederatorTest do      :ok    end -  describe "Publisher.perform" do -    test "call `perform` with unknown task" do -      assert { -               :error, -               "Don't know what to do with this" -             } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok) -    end -  end -    describe "Publish an activity" do      setup do        user = insert(:user) @@ -51,6 +47,7 @@ defmodule Pleroma.Web.FederatorTest do      } do        with_mocks([relay_mock]) do          Federator.publish(activity) +        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))        end        assert_received :relay_publish @@ -64,6 +61,7 @@ defmodule Pleroma.Web.FederatorTest do        with_mocks([relay_mock]) do          Federator.publish(activity) +        ObanHelpers.perform(all_enqueued(worker: PublisherWorker))        end        refute_received :relay_publish @@ -73,10 +71,7 @@ defmodule Pleroma.Web.FederatorTest do    end    describe "Targets reachability filtering in `publish`" do -    test_with_mock "it federates only to reachable instances via AP", -                   Pleroma.Web.ActivityPub.Publisher, -                   [:passthrough], -                   [] do +    test "it federates only to reachable instances via AP" do        user = insert(:user)        {inbox1, inbox2} = @@ -104,20 +99,20 @@ defmodule Pleroma.Web.FederatorTest do        {:ok, _activity} =          CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) -      assert called( -               Pleroma.Web.ActivityPub.Publisher.publish_one(%{ -                 inbox: inbox1, -                 unreachable_since: dt -               }) -             ) +      expected_dt = NaiveDateTime.to_iso8601(dt) -      refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2})) +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) + +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt} +               }, +               all_enqueued(worker: PublisherWorker) +             )      end -    test_with_mock "it federates only to reachable instances via Websub", -                   Pleroma.Web.Websub, -                   [:passthrough], -                   [] do +    test "it federates only to reachable instances via Websub" do        user = insert(:user)        websub_topic = Pleroma.Web.OStatus.feed_path(user) @@ -142,23 +137,27 @@ defmodule Pleroma.Web.FederatorTest do        {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) -      assert called( -               Pleroma.Web.Websub.publish_one(%{ -                 callback: sub2.callback, -                 unreachable_since: dt -               }) -             ) +      expected_callback = sub2.callback +      expected_dt = NaiveDateTime.to_iso8601(dt) + +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) -      refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback})) +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{ +                   "callback" => expected_callback, +                   "unreachable_since" => expected_dt +                 } +               }, +               all_enqueued(worker: PublisherWorker) +             )      end -    test_with_mock "it federates only to reachable instances via Salmon", -                   Pleroma.Web.Salmon, -                   [:passthrough], -                   [] do +    test "it federates only to reachable instances via Salmon" do        user = insert(:user) -      remote_user1 = +      _remote_user1 =          insert(:user, %{            local: false,            nickname: "nick1@domain.com", @@ -174,6 +173,8 @@ defmodule Pleroma.Web.FederatorTest do            info: %{salmon: "https://domain2.com/salmon"}          }) +      remote_user2_id = remote_user2.id +        dt = NaiveDateTime.utc_now()        Instances.set_unreachable(remote_user2.ap_id, dt) @@ -182,14 +183,20 @@ defmodule Pleroma.Web.FederatorTest do        {:ok, _activity} =          CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) -      assert called( -               Pleroma.Web.Salmon.publish_one(%{ -                 recipient: remote_user2, -                 unreachable_since: dt -               }) -             ) +      expected_dt = NaiveDateTime.to_iso8601(dt) + +      ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) -      refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1})) +      assert ObanHelpers.member?( +               %{ +                 "op" => "publish_one", +                 "params" => %{ +                   "recipient_id" => remote_user2_id, +                   "unreachable_since" => expected_dt +                 } +               }, +               all_enqueued(worker: PublisherWorker) +             )      end    end @@ -209,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do          "to" => ["https://www.w3.org/ns/activitystreams#Public"]        } -      {:ok, _activity} = Federator.incoming_ap_doc(params) +      assert {:ok, job} = Federator.incoming_ap_doc(params) +      assert {:ok, _activity} = ObanHelpers.perform(job)      end      test "rejects incoming AP docs with incorrect origin" do @@ -227,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do          "to" => ["https://www.w3.org/ns/activitystreams#Public"]        } -      :error = Federator.incoming_ap_doc(params) +      assert {:ok, job} = Federator.incoming_ap_doc(params) +      assert :error = ObanHelpers.perform(job)      end      test "it does not crash if MRF rejects the post" do @@ -240,7 +249,8 @@ defmodule Pleroma.Web.FederatorTest do          File.read!("test/fixtures/mastodon-post-activity.json")          |> Poison.decode!() -      assert Federator.incoming_ap_doc(params) == :error +      assert {:ok, job} = Federator.incoming_ap_doc(params) +      assert :error = ObanHelpers.perform(job)        Pleroma.Config.put([:instance, :rewrite_policy], policies)        Pleroma.Config.put(:mrf_keyword, mrf_keyword_policy) diff --git a/test/web/instances/instance_test.exs b/test/web/instances/instance_test.exs index d28730994..a1bdd45d3 100644 --- a/test/web/instances/instance_test.exs +++ b/test/web/instances/instance_test.exs @@ -22,7 +22,8 @@ defmodule Pleroma.Instances.InstanceTest do    describe "set_reachable/1" do      test "clears `unreachable_since` of existing matching Instance record having non-nil `unreachable_since`" do -      instance = insert(:instance, unreachable_since: NaiveDateTime.utc_now()) +      unreachable_since = NaiveDateTime.to_iso8601(NaiveDateTime.utc_now()) +      instance = insert(:instance, unreachable_since: unreachable_since)        assert {:ok, instance} = Instance.set_reachable(instance.host)        refute instance.unreachable_since diff --git a/test/web/mastodon_api/mastodon_api_controller_test.exs b/test/web/mastodon_api/mastodon_api_controller_test.exs index 2febe8b3a..f18b58a1b 100644 --- a/test/web/mastodon_api/mastodon_api_controller_test.exs +++ b/test/web/mastodon_api/mastodon_api_controller_test.exs @@ -12,6 +12,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do    alias Pleroma.Object    alias Pleroma.Repo    alias Pleroma.ScheduledActivity +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.CommonAPI @@ -3874,6 +3875,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do      end      test "it sends an email to user", %{user: user} do +      ObanHelpers.perform_all()        token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)        email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token) @@ -3937,6 +3939,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do        |> post("/api/v1/pleroma/accounts/confirmation_resend?email=#{user.email}")        |> json_response(:no_content) +      ObanHelpers.perform_all() +        email = Pleroma.Emails.UserEmail.account_confirmation_email(user)        notify_email = Config.get([:instance, :notify_email])        instance_name = Config.get([:instance, :name]) diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs deleted file mode 100644 index ecb3ce5d0..000000000 --- a/test/web/retry_queue_test.exs +++ /dev/null @@ -1,48 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule MockActivityPub do -  def publish_one({ret, waiter}) do -    send(waiter, :complete) -    {ret, "success"} -  end -end - -defmodule Pleroma.Web.Federator.RetryQueueTest do -  use Pleroma.DataCase -  alias Pleroma.Web.Federator.RetryQueue - -  @small_retry_count 0 -  @hopeless_retry_count 10 - -  setup do -    RetryQueue.reset_stats() -  end - -  test "RetryQueue responds to stats request" do -    assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats() -  end - -  test "failed posts are retried" do -    {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count) - -    wait_task = -      Task.async(fn -> -        receive do -          :complete -> :ok -        end -      end) - -    RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count) -    Task.await(wait_task) -    assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats() -  end - -  test "posts that have been tried too many times are dropped" do -    {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count) - -    RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count) -    assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats() -  end -end diff --git a/test/web/salmon/salmon_test.exs b/test/web/salmon/salmon_test.exs index e86e76fe9..0186f3fef 100644 --- a/test/web/salmon/salmon_test.exs +++ b/test/web/salmon/salmon_test.exs @@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do      Salmon.publish(user, activity) -    assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user})) +    assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))    end  end diff --git a/test/web/twitter_api/twitter_api_controller_test.exs b/test/web/twitter_api/twitter_api_controller_test.exs index 8bb8aa36d..9ac4ff929 100644 --- a/test/web/twitter_api/twitter_api_controller_test.exs +++ b/test/web/twitter_api/twitter_api_controller_test.exs @@ -12,6 +12,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do    alias Pleroma.Notification    alias Pleroma.Object    alias Pleroma.Repo +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.CommonAPI @@ -1099,6 +1100,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do      end      test "it sends an email to user", %{user: user} do +      ObanHelpers.perform_all()        token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)        email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token) @@ -1209,6 +1211,8 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do        |> assign(:user, user)        |> post("/api/account/resend_confirmation_email?email=#{user.email}") +      ObanHelpers.perform_all() +        email = Pleroma.Emails.UserEmail.account_confirmation_email(user)        notify_email = Pleroma.Config.get([:instance, :notify_email])        instance_name = Pleroma.Config.get([:instance, :name]) diff --git a/test/web/twitter_api/twitter_api_test.exs b/test/web/twitter_api/twitter_api_test.exs index cbe83852e..bf063a0de 100644 --- a/test/web/twitter_api/twitter_api_test.exs +++ b/test/web/twitter_api/twitter_api_test.exs @@ -7,6 +7,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do    alias Pleroma.Activity    alias Pleroma.Object    alias Pleroma.Repo +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.UserInviteToken    alias Pleroma.Web.ActivityPub.ActivityPub @@ -321,6 +322,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do      }      {:ok, user} = TwitterAPI.register_user(data) +    ObanHelpers.perform_all()      assert user.info.confirmation_pending diff --git a/test/web/twitter_api/util_controller_test.exs b/test/web/twitter_api/util_controller_test.exs index 640579c09..e3f129f72 100644 --- a/test/web/twitter_api/util_controller_test.exs +++ b/test/web/twitter_api/util_controller_test.exs @@ -4,9 +4,11 @@  defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do    use Pleroma.Web.ConnCase +  use Oban.Testing, repo: Pleroma.Repo    alias Pleroma.Notification    alias Pleroma.Repo +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.User    alias Pleroma.Web.CommonAPI    import Pleroma.Factory @@ -50,8 +52,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do          {File, [],           read!: fn "follow_list.txt" ->             "Account address,Show boosts\n#{user2.ap_id},true" -         end}, -        {PleromaJobQueue, [:passthrough], []} +         end}        ]) do          response =            conn @@ -59,15 +60,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do            |> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}})            |> json_response(:ok) -        assert called( -                 PleromaJobQueue.enqueue( -                   :background, -                   User, -                   [:follow_import, user1, [user2.ap_id]] -                 ) -               ) -          assert response == "job started" + +        assert ObanHelpers.member?( +                 %{ +                   "op" => "follow_import", +                   "follower_id" => user1.id, +                   "followed_identifiers" => [user2.ap_id] +                 }, +                 all_enqueued(worker: Pleroma.Workers.BackgroundWorker) +               )        end      end @@ -126,8 +128,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do        user3 = insert(:user)        with_mocks([ -        {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}, -        {PleromaJobQueue, [:passthrough], []} +        {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}        ]) do          response =            conn @@ -135,15 +136,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do            |> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}})            |> json_response(:ok) -        assert called( -                 PleromaJobQueue.enqueue( -                   :background, -                   User, -                   [:blocks_import, user1, [user2.ap_id, user3.ap_id]] -                 ) -               ) -          assert response == "job started" + +        assert ObanHelpers.member?( +                 %{ +                   "op" => "blocks_import", +                   "blocker_id" => user1.id, +                   "blocked_identifiers" => [user2.ap_id, user3.ap_id] +                 }, +                 all_enqueued(worker: Pleroma.Workers.BackgroundWorker) +               )        end      end    end @@ -607,6 +609,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do          |> json_response(:ok)        assert response == %{"status" => "success"} +      ObanHelpers.perform_all()        user = User.get_cached_by_id(user.id) diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs index 74386d7db..414610879 100644 --- a/test/web/websub/websub_test.exs +++ b/test/web/websub/websub_test.exs @@ -4,11 +4,14 @@  defmodule Pleroma.Web.WebsubTest do    use Pleroma.DataCase +  use Oban.Testing, repo: Pleroma.Repo +  alias Pleroma.Tests.ObanHelpers    alias Pleroma.Web.Router.Helpers    alias Pleroma.Web.Websub    alias Pleroma.Web.Websub.WebsubClientSubscription    alias Pleroma.Web.Websub.WebsubServerSubscription +  alias Pleroma.Workers.Subscriber, as: SubscriberWorker    import Pleroma.Factory    import Tesla.Mock @@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do          })        _refresh = Websub.refresh_subscriptions() +      ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))        assert still_good == Repo.get(WebsubClientSubscription, still_good.id)        refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id) | 
