summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/config.exs24
-rw-r--r--config/test.exs4
-rw-r--r--docs/config.md7
-rw-r--r--lib/pleroma/application.ex6
-rw-r--r--lib/pleroma/digest_email_worker.ex11
-rw-r--r--lib/pleroma/emails/mailer.ex13
-rw-r--r--lib/pleroma/instances/instance.ex8
-rw-r--r--lib/pleroma/scheduled_activity_worker.ex8
-rw-r--r--lib/pleroma/user.ex57
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex7
-rw-r--r--lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex12
-rw-r--r--lib/pleroma/web/activity_pub/publisher.ex16
-rw-r--r--lib/pleroma/web/activity_pub/transmogrifier.ex7
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex9
-rw-r--r--lib/pleroma/web/federator/federator.ex108
-rw-r--r--lib/pleroma/web/federator/publisher.ex28
-rw-r--r--lib/pleroma/web/federator/retry_queue.ex239
-rw-r--r--lib/pleroma/web/oauth/token/clean_worker.ex10
-rw-r--r--lib/pleroma/web/push/push.ex12
-rw-r--r--lib/pleroma/web/salmon/salmon.ex11
-rw-r--r--lib/pleroma/web/twitter_api/controllers/util_controller.ex14
-rw-r--r--lib/pleroma/workers/background_worker.ex66
-rw-r--r--lib/pleroma/workers/helper.ex13
-rw-r--r--lib/pleroma/workers/mailer.ex27
-rw-r--r--lib/pleroma/workers/publisher.ex23
-rw-r--r--lib/pleroma/workers/receiver.ex21
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex15
-rw-r--r--lib/pleroma/workers/subscriber.ex29
-rw-r--r--lib/pleroma/workers/transmogrifier.ex18
-rw-r--r--lib/pleroma/workers/web_pusher.ex19
-rw-r--r--mix.exs1
-rw-r--r--mix.lock1
-rw-r--r--priv/repo/migrations/20190730055101_add_oban_jobs_table.exs6
-rw-r--r--test/activity_test.exs4
-rw-r--r--test/conversation_test.exs2
-rw-r--r--test/notification_test.exs5
-rw-r--r--test/support/oban_helpers.ex42
-rw-r--r--test/tasks/digest_test.exs3
-rw-r--r--test/user_test.exs40
-rw-r--r--test/web/activity_pub/activity_pub_controller_test.exs14
-rw-r--r--test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs6
-rw-r--r--test/web/activity_pub/publisher_test.exs2
-rw-r--r--test/web/activity_pub/transmogrifier_test.exs4
-rw-r--r--test/web/digest_email_worker_test.exs4
-rw-r--r--test/web/federator_test.exs102
-rw-r--r--test/web/instances/instance_test.exs3
-rw-r--r--test/web/mastodon_api/mastodon_api_controller_test.exs4
-rw-r--r--test/web/retry_queue_test.exs48
-rw-r--r--test/web/salmon/salmon_test.exs2
-rw-r--r--test/web/twitter_api/twitter_api_controller_test.exs4
-rw-r--r--test/web/twitter_api/twitter_api_test.exs2
-rw-r--r--test/web/twitter_api/util_controller_test.exs43
-rw-r--r--test/web/websub/websub_test.exs4
53 files changed, 674 insertions, 514 deletions
diff --git a/config/config.exs b/config/config.exs
index f0b6b0363..9794997d9 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -449,13 +449,7 @@ config :pleroma, Pleroma.User,
"web"
]
-config :pleroma, Pleroma.Web.Federator.RetryQueue,
- enabled: false,
- max_jobs: 20,
- initial_timeout: 30,
- max_retries: 5
-
-config :pleroma_job_queue, :queues,
+job_queues = [
federator_incoming: 50,
federator_outgoing: 50,
web_push: 50,
@@ -463,6 +457,22 @@ config :pleroma_job_queue, :queues,
transmogrifier: 20,
scheduled_activities: 10,
background: 5
+]
+
+config :pleroma_job_queue, :queues, job_queues
+
+config :pleroma, Oban,
+ repo: Pleroma.Repo,
+ verbose: false,
+ prune: {:maxage, 60 * 60 * 24 * 7},
+ queues: job_queues
+
+config :pleroma, :workers,
+ retries: [
+ compile_time_default: 1,
+ federator_incoming: 5,
+ federator_outgoing: 5
+ ]
config :pleroma, :fetch_initial_posts,
enabled: false,
diff --git a/config/test.exs b/config/test.exs
index 30a51f734..a0fa67516 100644
--- a/config/test.exs
+++ b/config/test.exs
@@ -63,6 +63,10 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock
config :pleroma_job_queue, disabled: true
+config :pleroma, Oban,
+ queues: false,
+ prune: :disabled
+
config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 2,
total_user_limit: 3,
diff --git a/docs/config.md b/docs/config.md
index ec3035c22..ae8afad89 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -423,13 +423,6 @@ config :pleroma_job_queue, :queues,
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
-## Pleroma.Web.Federator.RetryQueue
-
-* `enabled`: If set to `true`, failed federation jobs will be retried
-* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
-* `initial_timeout`: The initial timeout in seconds
-* `max_retries`: The maximum number of times a federation job is retried
-
## Pleroma.Web.Metadata
* `providers`: a list of metadata providers to enable. Providers available:
* Pleroma.Web.Metadata.Providers.OpenGraph
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 25e56b9e2..2e2922d28 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -40,9 +40,12 @@ defmodule Pleroma.Application do
cachex_children() ++
hackney_pool_children() ++
[
- Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats,
%{
+ id: Oban,
+ start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}
+ },
+ %{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
restart: :temporary
@@ -166,6 +169,7 @@ defmodule Pleroma.Application do
defp after_supervisor_start do
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
true <- digest_config[:active] do
+ # TODO: consider replacing with `quantum` scheduler
PleromaJobQueue.schedule(
digest_config[:schedule],
:digest_emails,
diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex
index 5644d6a67..ffc48bfab 100644
--- a/lib/pleroma/digest_email_worker.ex
+++ b/lib/pleroma/digest_email_worker.ex
@@ -3,9 +3,12 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.DigestEmailWorker do
+ alias Pleroma.Repo
+ alias Pleroma.Workers.Mailer, as: MailerWorker
+
import Ecto.Query
- @queue_name :digest_emails
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def perform do
config = Pleroma.Config.get([:email_notifications, :digest])
@@ -21,7 +24,11 @@ defmodule Pleroma.DigestEmailWorker do
select: u
)
|> Pleroma.Repo.all()
- |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1]))
+ |> Enum.each(fn user ->
+ %{"op" => "digest_email", "user_id" => user.id}
+ |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails))
+ |> Repo.insert()
+ end)
end
@doc """
diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex
index 2e4657b7c..bb534f602 100644
--- a/lib/pleroma/emails/mailer.ex
+++ b/lib/pleroma/emails/mailer.ex
@@ -9,6 +9,8 @@ defmodule Pleroma.Emails.Mailer do
The module contains functions to delivery email using Swoosh.Mailer.
"""
+ alias Pleroma.Repo
+ alias Pleroma.Workers.Mailer, as: MailerWorker
alias Swoosh.DeliveryError
@otp_app :pleroma
@@ -17,9 +19,18 @@ defmodule Pleroma.Emails.Mailer do
@spec enabled?() :: boolean()
def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
@doc "add email to queue"
def deliver_async(email, config \\ []) do
- PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
+ encoded_email =
+ email
+ |> :erlang.term_to_binary()
+ |> Base.encode64()
+
+ %{"op" => "email", "encoded_email" => encoded_email, "config" => config}
+ |> MailerWorker.new(worker_args(:mailer))
+ |> Repo.insert()
end
@doc "callback to perform send email from queue"
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex
index 4d7ed4ca1..544c4b687 100644
--- a/lib/pleroma/instances/instance.ex
+++ b/lib/pleroma/instances/instance.ex
@@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do
def set_unreachable(url_or_host, unreachable_since \\ nil)
def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
- unreachable_since = unreachable_since || DateTime.utc_now()
+ unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()
host = host(url_or_host)
existing_record = Repo.get_by(Instance, %{host: host})
@@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do
end
def set_unreachable(_, _), do: {:error, nil}
+
+ defp parse_datetime(datetime) when is_binary(datetime) do
+ NaiveDateTime.from_iso8601(datetime)
+ end
+
+ defp parse_datetime(datetime), do: datetime
end
diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex
index 8578cab5e..a01fb4fcb 100644
--- a/lib/pleroma/scheduled_activity_worker.ex
+++ b/lib/pleroma/scheduled_activity_worker.ex
@@ -8,14 +8,18 @@ defmodule Pleroma.ScheduledActivityWorker do
"""
alias Pleroma.Config
+ alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI
+
use GenServer
require Logger
@schedule_interval :timer.minutes(1)
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
@@ -45,7 +49,9 @@ defmodule Pleroma.ScheduledActivityWorker do
def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity ->
- PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
+ %{"op" => "execute", "activity_id" => scheduled_activity.id}
+ |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities))
+ |> Repo.insert()
end)
schedule_next()
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 134b8bb6c..61a3d5911 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -27,6 +27,7 @@ defmodule Pleroma.User do
alias Pleroma.Web.OStatus
alias Pleroma.Web.RelMe
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -40,6 +41,8 @@ defmodule Pleroma.User do
@strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
@extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
schema "users" do
field(:bio, :string)
field(:email, :string)
@@ -613,8 +616,11 @@ defmodule Pleroma.User do
end
@doc "Fetch some posts when the user has just been federated with"
- def fetch_initial_posts(user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
+ def fetch_initial_posts(user) do
+ %{"op" => "fetch_initial_posts", "user_id" => user.id}
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
+ end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
def get_followers_query(%User{} = user, nil) do
@@ -1044,7 +1050,9 @@ defmodule Pleroma.User do
end
def deactivate_async(user, status \\ true) do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
+ %{"op" => "deactivate_user", "user_id" => user.id, "status" => status}
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
end
def deactivate(%User{} = user, status \\ true) do
@@ -1072,9 +1080,11 @@ defmodule Pleroma.User do
|> update_and_set_cache()
end
- @spec delete(User.t()) :: :ok
- def delete(%User{} = user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
+ def delete(%User{} = user) do
+ %{"op" => "delete_user", "user_id" => user.id}
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
+ end
@spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do
@@ -1181,21 +1191,26 @@ defmodule Pleroma.User do
Repo.all(query)
end
- def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
-
- def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :follow_import,
- follower,
- followed_identifiers
- ])
+ def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
+ %{
+ "op" => "blocks_import",
+ "blocker_id" => blocker.id,
+ "blocked_identifiers" => blocked_identifiers
+ }
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
+ end
+
+ def follow_import(%User{} = follower, followed_identifiers)
+ when is_list(followed_identifiers) do
+ %{
+ "op" => "follow_import",
+ "follower_id" => follower.id,
+ "followed_identifiers" => followed_identifiers
+ }
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
+ end
def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 172c952d4..07652fae4 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.WebFinger
+ alias Pleroma.Workers.BackgroundWorker
import Ecto.Query
import Pleroma.Web.ActivityPub.Utils
@@ -25,6 +26,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
require Logger
require Pleroma.Constants
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
# For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do
@@ -145,7 +148,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
activity
end
- PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
+ %{"op" => "fetch_data_for_activity", "activity_id" => activity.id}
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
Notification.create_notifications(activity)
diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
index a179dd54d..b188164ee 100644
--- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
@@ -7,7 +7,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
@behaviour Pleroma.Web.ActivityPub.MRF
alias Pleroma.HTTP
+ alias Pleroma.Repo
alias Pleroma.Web.MediaProxy
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -16,6 +18,8 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
recv_timeout: 10_000
]
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
def perform(:prefetch, url) do
Logger.info("Prefetching #{inspect(url)}")
@@ -30,7 +34,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
url
|> Enum.each(fn
%{"href" => href} ->
- PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href])
+ %{"op" => "media_proxy_prefetch", "url" => href}
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -46,7 +52,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
)
when is_list(attachments) and length(attachments) > 0 do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message])
+ %{"op" => "media_proxy_preload", "message" => message}
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
{:ok, message}
end
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index 262529b84..03deec5f4 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -86,6 +86,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
end
end
+ def publish_one(%{actor_id: actor_id} = params) do
+ actor = User.get_by_id(actor_id)
+
+ params
+ |> Map.delete(:actor_id)
+ |> Map.put(:actor, actor)
+ |> publish_one()
+ end
+
defp should_federate?(inbox, public) do
if public do
true
@@ -161,7 +170,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Publishes an activity with BCC to all relevant peers.
"""
- def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+ when is_list(bcc) and bcc != [] do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
@@ -188,7 +198,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
})
@@ -223,7 +233,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
%{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
}
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index 36340a3a1..7a03914bb 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -15,12 +15,15 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
+ alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker
import Ecto.Query
require Logger
require Pleroma.Constants
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
@doc """
Modifies an incoming AP object (mastodon format) to our internal format.
"""
@@ -1049,7 +1052,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do
- PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user])
+ %{"op" => "user_upgrade", "user_id" => user.id}
+ |> TransmogrifierWorker.new(worker_args(:transmogrifier))
+ |> Repo.insert()
end
{:ok, user}
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index 1c3058658..8502ca3be 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
"""
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
- priority =
- case activity.data["type"] do
- "Delete" -> 10
- "Create" -> 1
- _ -> 5
- end
-
- Pleroma.Web.Federator.publish(activity, priority)
+ Pleroma.Web.Federator.publish(activity)
end
:ok
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index f4f9e83e0..d85fe824f 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -10,16 +10,19 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.Publisher, as: PublisherWorker
+ alias Pleroma.Workers.Receiver, as: ReceiverWorker
+ alias Pleroma.Workers.Subscriber, as: SubscriberWorker
require Logger
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
def init do
# 1 minute
- Process.sleep(1000 * 60)
- refresh_subscriptions()
+ refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -37,50 +40,50 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ %{"op" => "incoming_doc", "body" => doc}
+ |> ReceiverWorker.new(worker_args(:federator_incoming))
+ |> Pleroma.Repo.insert()
end
def incoming_ap_doc(params) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ %{"op" => "incoming_ap_doc", "params" => params}
+ |> ReceiverWorker.new(worker_args(:federator_incoming))
+ |> Pleroma.Repo.insert()
end
- def publish(activity, priority \\ 1) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+ def publish(%{id: "pleroma:fakeid"} = activity) do
+ perform(:publish, activity)
end
- def verify_websub(websub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
+ def publish(activity) do
+ %{"op" => "publish", "activity_id" => activity.id}
+ |> PublisherWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def request_subscription(sub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
+ def verify_websub(websub) do
+ %{"op" => "verify_websub", "websub_id" => websub.id}
+ |> SubscriberWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def refresh_subscriptions do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
+ def request_subscription(websub) do
+ %{"op" => "request_subscription", "websub_id" => websub.id}
+ |> SubscriberWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- # Job Worker Callbacks
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
-
- spawn(fn ->
- # 6 hours
- Process.sleep(1000 * 60 * 60 * 6)
- refresh_subscriptions()
- end)
+ def refresh_subscriptions(worker_args \\ []) do
+ %{"op" => "refresh_subscriptions"}
+ |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
+ # Job Worker Callbacks
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
- end
+ @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
+ def perform(:publish_one, module, params) do
+ apply(module, :publish_one, [params])
end
def perform(:publish, activity) do
@@ -92,14 +95,6 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc)
@@ -130,22 +125,33 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(
- :publish_single_websub,
- %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
- ) do
- case Websub.publish_one(params) do
- {:ok, _} ->
- :ok
+ def perform(:request_subscription, websub) do
+ Logger.debug("Refreshing #{websub.topic}")
- {:error, _} ->
- RetryQueue.enqueue(params, Websub)
+ with {:ok, websub} <- Websub.request_subscription(websub) do
+ Logger.debug("Successfully refreshed #{websub.topic}")
+ else
+ _e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
- def perform(type, _) do
- Logger.debug(fn -> "Unknown task: #{type}" end)
- {:error, "Don't know what to do with this"}
+ def perform(:verify_websub, websub) do
+ Logger.debug(fn ->
+ "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+ end)
+
+ Websub.verify(websub)
+ end
+
+ def perform(:refresh_subscriptions) do
+ Logger.debug("Federator running refresh subscriptions")
+ Websub.refresh_subscriptions()
+
+ spawn(fn ->
+ # 6 hours
+ Process.sleep(1000 * 60 * 60 * 6)
+ refresh_subscriptions()
+ end)
end
def ap_enabled_actor(id) do
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index 70f870244..05d2be615 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Web.Federator.RetryQueue
+ alias Pleroma.Workers.Publisher, as: PublisherWorker
require Logger
@@ -30,23 +30,17 @@ defmodule Pleroma.Web.Federator.Publisher do
Enqueue publishing a single activity.
"""
@spec enqueue_one(module(), Map.t()) :: :ok
- def enqueue_one(module, %{} = params),
- do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
- @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
- def perform(:publish_one, module, params) do
- case apply(module, :publish_one, [params]) do
- {:ok, _} ->
- :ok
-
- {:error, _e} ->
- RetryQueue.enqueue(params, module)
- end
- end
+ def enqueue_one(module, %{} = params) do
+ worker_args =
+ if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
+ [max_attempts: max_attempts]
+ else
+ []
+ end
- def perform(type, _, _) do
- Logger.debug("Unknown task: #{type}")
- {:error, "Don't know what to do with this"}
+ %{"op" => "publish_one", "module" => to_string(module), "params" => params}
+ |> PublisherWorker.new(worker_args)
+ |> Pleroma.Repo.insert()
end
@doc """
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
deleted file mode 100644
index 9eab8c218..000000000
--- a/lib/pleroma/web/federator/retry_queue.ex
+++ /dev/null
@@ -1,239 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
- use GenServer
-
- require Logger
-
- def init(args) do
- queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
- {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
- end
-
- def start_link(_) do
- enabled =
- if Pleroma.Config.get(:env) == :test,
- do: true,
- else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
- if enabled do
- Logger.info("Starting retry queue")
-
- linkres =
- GenServer.start_link(
- __MODULE__,
- %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
- name: __MODULE__
- )
-
- maybe_kickoff_timer()
- linkres
- else
- Logger.info("Retry queue disabled")
- :ignore
- end
- end
-
- def enqueue(data, transport, retries \\ 0) do
- GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
- end
-
- def get_stats do
- GenServer.call(__MODULE__, :get_stats)
- end
-
- def reset_stats do
- GenServer.call(__MODULE__, :reset_stats)
- end
-
- def get_retry_params(retries) do
- if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
- {:drop, "Max retries reached"}
- else
- {:retry, growth_function(retries)}
- end
- end
-
- def get_retry_timer_interval do
- Pleroma.Config.get([:retry_queue, :interval], 1000)
- end
-
- defp ets_count_expires(table, current_time) do
- :ets.select_count(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [true]
- }
- ]
- )
- end
-
- defp ets_pop_n_expired(table, current_time, desired) do
- {popped, _continuation} =
- :ets.select(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [:"$_"]
- }
- ],
- desired
- )
-
- popped
- |> Enum.each(fn e ->
- :ets.delete_object(table, e)
- end)
-
- popped
- end
-
- def maybe_start_job(running_jobs, queue_table) do
- # we don't want to hit the ets or the DateTime more times than we have to
- # could optimize slightly further by not using the count, and instead grabbing
- # up to N objects early...
- current_time = DateTime.to_unix(DateTime.utc_now())
- n_running_jobs = :sets.size(running_jobs)
-
- if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
- n_ready_jobs = ets_count_expires(queue_table, current_time)
-
- if n_ready_jobs > 0 do
- # figure out how many we could start
- available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
- start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- else
- running_jobs
- end
- else
- running_jobs
- end
- end
-
- defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
- running_jobs
- end
-
- defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- when available_job_slots > 0 do
- candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
- candidates
- |> List.foldl(running_jobs, fn {_, e}, rj ->
- {:ok, pid} = Task.start(fn -> worker(e) end)
- mref = Process.monitor(pid)
- :sets.add_element(mref, rj)
- end)
- end
-
- def worker({:send, data, transport, retries}) do
- case transport.publish_one(data) do
- {:ok, _} ->
- GenServer.cast(__MODULE__, :inc_delivered)
- :delivered
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- :retry
- end
- end
-
- def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
- end
-
- def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count},
- %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(:reset_stats, state) do
- {:noreply, %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(
- {:maybe_enqueue, data, transport, retries},
- %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- case get_retry_params(retries) do
- {:retry, timeout} ->
- :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
-
- {:drop, message} ->
- Logger.debug(message)
- {:noreply, %{state | dropped: drop_count + 1}}
- end
- end
-
- def handle_cast(:kickoff_timer, state) do
- retry_interval = get_retry_timer_interval()
- Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
- {:noreply, state}
- end
-
- def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
- {:noreply, %{state | delivered: delivery_count + 1}}
- end
-
- def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
- {:noreply, %{state | dropped: drop_count + 1}}
- end
-
- def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
- case transport.publish_one(data) do
- {:ok, _} ->
- {:noreply, %{state | delivered: delivery_count + 1}}
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- {:noreply, state}
- end
- end
-
- def handle_info(
- :retry_timer_run,
- %{queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- maybe_kickoff_timer()
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- %{running_jobs: running_jobs, queue_table: queue_table} = state
- running_jobs = :sets.del_element(ref, running_jobs)
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info(unknown, state) do
- Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
- {:noreply, state}
- end
-
- if Pleroma.Config.get(:env) == :test do
- defp growth_function(_retries) do
- _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
- DateTime.to_unix(DateTime.utc_now()) - 1
- end
- else
- defp growth_function(retries) do
- round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
- DateTime.to_unix(DateTime.utc_now())
- end
- end
-
- defp maybe_kickoff_timer do
- GenServer.cast(__MODULE__, :kickoff_timer)
- end
-end
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
index f50098302..943e73289 100644
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ b/lib/pleroma/web/oauth/token/clean_worker.ex
@@ -16,7 +16,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@one_day
)
+ alias Pleroma.Repo
alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Workers.BackgroundWorker
+
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
@@ -27,9 +31,13 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@doc false
def handle_info(:perform, state) do
- Token.delete_expired_tokens()
+ %{"op" => "clean_expired_tokens"}
+ |> BackgroundWorker.new(worker_args(:background))
+ |> Repo.insert()
Process.send_after(self(), :perform, @interval)
{:noreply, state}
end
+
+ def perform(:clean), do: Token.delete_expired_tokens()
end
diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex
index 729dad02a..b4f0e5127 100644
--- a/lib/pleroma/web/push/push.ex
+++ b/lib/pleroma/web/push/push.ex
@@ -3,10 +3,13 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do
- alias Pleroma.Web.Push.Impl
+ alias Pleroma.Repo
+ alias Pleroma.Workers.WebPusher
require Logger
+ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
def init do
unless enabled() do
Logger.warn("""
@@ -31,6 +34,9 @@ defmodule Pleroma.Web.Push do
end
end
- def send(notification),
- do: PleromaJobQueue.enqueue(:web_push, Impl, [notification])
+ def send(notification) do
+ %{"op" => "web_push", "notification_id" => notification.id}
+ |> WebPusher.new(worker_args(:web_push))
+ |> Repo.insert()
+ end
end
diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex
index 9b01ebcc6..bbaa293fd 100644
--- a/lib/pleroma/web/salmon/salmon.ex
+++ b/lib/pleroma/web/salmon/salmon.ex
@@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
end
end
+ def publish_one(%{recipient_id: recipient_id} = params) do
+ recipient = User.get_by_id(recipient_id)
+
+ params
+ |> Map.delete(:recipient_id)
+ |> Map.put(:recipient, recipient)
+ |> publish_one()
+ end
+
def publish_one(_), do: :noop
@supported_activities [
@@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Publisher.enqueue_one(__MODULE__, %{
- recipient: remote_user,
+ recipient_id: remote_user.id,
feed: feed,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
})
diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
index 3405bd3b7..7ba4ad305 100644
--- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex
+++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
@@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
String.split(line, ",") |> List.first()
end)
|> List.delete("Account address") do
- PleromaJobQueue.enqueue(:background, User, [
- :follow_import,
- follower,
- followed_identifiers
- ])
-
+ User.follow_import(follower, followed_identifiers)
json(conn, "job started")
end
end
@@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
with blocked_identifiers <- String.split(list) do
- PleromaJobQueue.enqueue(:background, User, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
-
+ User.blocks_import(blocker, blocked_identifiers)
json(conn, "job started")
end
end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
new file mode 100644
index 000000000..3ab2b6bcc
--- /dev/null
+++ b/lib/pleroma/workers/background_worker.ex
@@ -0,0 +1,66 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackgroundWorker do
+ alias Pleroma.Activity
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
+ alias Pleroma.Web.OAuth.Token.CleanWorker
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "background",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}) do
+ user = User.get_by_id(user_id)
+ User.perform(:fetch_initial_posts, user)
+ end
+
+ def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}) do
+ user = User.get_by_id(user_id)
+ User.perform(:deactivate_async, user, status)
+ end
+
+ def perform(%{"op" => "delete_user", "user_id" => user_id}) do
+ user = User.get_by_id(user_id)
+ User.perform(:delete, user)
+ end
+
+ def perform(%{
+ "op" => "blocks_import",
+ "blocker_id" => blocker_id,
+ "blocked_identifiers" => blocked_identifiers
+ }) do
+ blocker = User.get_by_id(blocker_id)
+ User.perform(:blocks_import, blocker, blocked_identifiers)
+ end
+
+ def perform(%{
+ "op" => "follow_import",
+ "follower_id" => follower_id,
+ "followed_identifiers" => followed_identifiers
+ }) do
+ follower = User.get_by_id(follower_id)
+ User.perform(:follow_import, follower, followed_identifiers)
+ end
+
+ def perform(%{"op" => "clean_expired_tokens"}) do
+ CleanWorker.perform(:clean)
+ end
+
+ def perform(%{"op" => "media_proxy_preload", "message" => message}) do
+ MediaProxyWarmingPolicy.perform(:preload, message)
+ end
+
+ def perform(%{"op" => "media_proxy_prefetch", "url" => url}) do
+ MediaProxyWarmingPolicy.perform(:prefetch, url)
+ end
+
+ def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}) do
+ activity = Activity.get_by_id(activity_id)
+ Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
+ end
+end
diff --git a/lib/pleroma/workers/helper.ex b/lib/pleroma/workers/helper.ex
new file mode 100644
index 000000000..3286ce0e8
--- /dev/null
+++ b/lib/pleroma/workers/helper.ex
@@ -0,0 +1,13 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Helper do
+ def worker_args(queue) do
+ if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
+ [max_attempts: max_attempts]
+ else
+ []
+ end
+ end
+end
diff --git a/lib/pleroma/workers/mailer.ex b/lib/pleroma/workers/mailer.ex
new file mode 100644
index 000000000..8bf9952bc
--- /dev/null
+++ b/lib/pleroma/workers/mailer.ex
@@ -0,0 +1,27 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Mailer do
+ alias Pleroma.User
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "mailer",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}) do
+ email =
+ encoded_email
+ |> Base.decode64!()
+ |> :erlang.binary_to_term()
+
+ Pleroma.Emails.Mailer.deliver(email, config)
+ end
+
+ def perform(%{"op" => "digest_email", "user_id" => user_id}) do
+ user = User.get_by_id(user_id)
+ Pleroma.DigestEmailWorker.perform(user)
+ end
+end
diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex
new file mode 100644
index 000000000..c890ffb79
--- /dev/null
+++ b/lib/pleroma/workers/publisher.ex
@@ -0,0 +1,23 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Publisher do
+ alias Pleroma.Activity
+ alias Pleroma.Web.Federator
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_outgoing",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "publish", "activity_id" => activity_id}) do
+ activity = Activity.get_by_id(activity_id)
+ Federator.perform(:publish, activity)
+ end
+
+ def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
+ Federator.perform(:publish_one, String.to_atom(module_name), params)
+ end
+end
diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex
new file mode 100644
index 000000000..d3de95716
--- /dev/null
+++ b/lib/pleroma/workers/receiver.ex
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Receiver do
+ alias Pleroma.Web.Federator
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_incoming",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "incoming_doc", "body" => doc}) do
+ Federator.perform(:incoming_doc, doc)
+ end
+
+ def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
+ Federator.perform(:incoming_ap_doc, params)
+ end
+end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
new file mode 100644
index 000000000..a49834fd8
--- /dev/null
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ScheduledActivityWorker do
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "scheduled_activities",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "execute", "activity_id" => activity_id}) do
+ Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
+ end
+end
diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex
new file mode 100644
index 000000000..6af3ad0a1
--- /dev/null
+++ b/lib/pleroma/workers/subscriber.ex
@@ -0,0 +1,29 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Subscriber do
+ alias Pleroma.Repo
+ alias Pleroma.Web.Federator
+ alias Pleroma.Web.Websub.WebsubClientSubscription
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_outgoing",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "refresh_subscriptions"}) do
+ Federator.perform(:refresh_subscriptions)
+ end
+
+ def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
+ websub = Repo.get(WebsubClientSubscription, websub_id)
+ Federator.perform(:request_subscription, websub)
+ end
+
+ def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
+ websub = Repo.get(WebsubClientSubscription, websub_id)
+ Federator.perform(:verify_websub, websub)
+ end
+end
diff --git a/lib/pleroma/workers/transmogrifier.ex b/lib/pleroma/workers/transmogrifier.ex
new file mode 100644
index 000000000..c6b4fab47
--- /dev/null
+++ b/lib/pleroma/workers/transmogrifier.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Transmogrifier do
+ alias Pleroma.User
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "transmogrifier",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "user_upgrade", "user_id" => user_id}) do
+ user = User.get_by_id(user_id)
+ Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
+ end
+end
diff --git a/lib/pleroma/workers/web_pusher.ex b/lib/pleroma/workers/web_pusher.ex
new file mode 100644
index 000000000..b99581eb0
--- /dev/null
+++ b/lib/pleroma/workers/web_pusher.ex
@@ -0,0 +1,19 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WebPusher do
+ alias Pleroma.Notification
+ alias Pleroma.Repo
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "web_push",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "web_push", "notification_id" => notification_id}) do
+ notification = Repo.get(Notification, notification_id)
+ Pleroma.Web.Push.Impl.perform(notification)
+ end
+end
diff --git a/mix.exs b/mix.exs
index 3170d6f2d..b651520ed 100644
--- a/mix.exs
+++ b/mix.exs
@@ -101,6 +101,7 @@ defmodule Pleroma.Mixfile do
{:phoenix_ecto, "~> 4.0"},
{:ecto_sql, "~> 3.1"},
{:postgrex, ">= 0.13.5"},
+ {:oban, "~> 0.6"},
{:gettext, "~> 0.15"},
{:comeonin, "~> 4.1.1"},
{:pbkdf2_elixir, "~> 0.12.3"},
diff --git a/mix.lock b/mix.lock
index 2639e96e9..52932c9ef 100644
--- a/mix.lock
+++ b/mix.lock
@@ -57,6 +57,7 @@
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
"mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
+ "oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
"pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
"phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
diff --git a/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs
new file mode 100644
index 000000000..2f201bd05
--- /dev/null
+++ b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs
@@ -0,0 +1,6 @@
+defmodule Pleroma.Repo.Migrations.AddObanJobsTable do
+ use Ecto.Migration
+
+ defdelegate up, to: Oban.Migrations
+ defdelegate down, to: Oban.Migrations
+end
diff --git a/test/activity_test.exs b/test/activity_test.exs
index b27f6fd36..658c47837 100644
--- a/test/activity_test.exs
+++ b/test/activity_test.exs
@@ -7,6 +7,7 @@ defmodule Pleroma.ActivityTest do
alias Pleroma.Activity
alias Pleroma.Bookmark
alias Pleroma.Object
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.ThreadMute
import Pleroma.Factory
@@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do
}
{:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"})
- {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params)
+ {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params)
+ {:ok, remote_activity} = ObanHelpers.perform(job)
%{local_activity: local_activity, remote_activity: remote_activity, user: user}
end
diff --git a/test/conversation_test.exs b/test/conversation_test.exs
index 4e36494f8..693427d80 100644
--- a/test/conversation_test.exs
+++ b/test/conversation_test.exs
@@ -22,6 +22,8 @@ defmodule Pleroma.ConversationTest do
{:ok, _activity} =
CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"})
+ Pleroma.Tests.ObanHelpers.perform_all()
+
Repo.delete_all(Conversation)
Repo.delete_all(Conversation.Participation)
diff --git a/test/notification_test.exs b/test/notification_test.exs
index 80ea2a085..e1c9f4f93 100644
--- a/test/notification_test.exs
+++ b/test/notification_test.exs
@@ -8,6 +8,7 @@ defmodule Pleroma.NotificationTest do
import Pleroma.Factory
alias Pleroma.Notification
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.CommonAPI
@@ -621,7 +622,8 @@ defmodule Pleroma.NotificationTest do
refute Enum.empty?(Notification.for_user(other_user))
- User.delete(user)
+ {:ok, job} = User.delete(user)
+ ObanHelpers.perform(job)
assert Enum.empty?(Notification.for_user(other_user))
end
@@ -666,6 +668,7 @@ defmodule Pleroma.NotificationTest do
}
{:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message)
+ ObanHelpers.perform_all()
assert Enum.empty?(Notification.for_user(local_user))
end
diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex
new file mode 100644
index 000000000..d379c9ec7
--- /dev/null
+++ b/test/support/oban_helpers.ex
@@ -0,0 +1,42 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Tests.ObanHelpers do
+ @moduledoc """
+ Oban test helpers.
+ """
+
+ alias Pleroma.Repo
+
+ def perform_all do
+ Oban.Job
+ |> Repo.all()
+ |> perform()
+ end
+
+ def perform(%Oban.Job{} = job) do
+ res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job])
+ Repo.delete(job)
+ res
+ end
+
+ def perform(jobs) when is_list(jobs) do
+ for job <- jobs, do: perform(job)
+ end
+
+ def member?(%{} = job_args, jobs) when is_list(jobs) do
+ Enum.any?(jobs, fn job ->
+ member?(job_args, job.args)
+ end)
+ end
+
+ def member?(%{} = test_attrs, %{} = attrs) do
+ Enum.all?(
+ test_attrs,
+ fn {k, _v} -> member?(test_attrs[k], attrs[k]) end
+ )
+ end
+
+ def member?(x, y), do: x == y
+end
diff --git a/test/tasks/digest_test.exs b/test/tasks/digest_test.exs
index 4bfa1fb93..96d762685 100644
--- a/test/tasks/digest_test.exs
+++ b/test/tasks/digest_test.exs
@@ -4,6 +4,7 @@ defmodule Mix.Tasks.Pleroma.DigestTest do
import Pleroma.Factory
import Swoosh.TestAssertions
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.CommonAPI
setup_all do
@@ -39,6 +40,8 @@ defmodule Mix.Tasks.Pleroma.DigestTest do
:ok = Mix.Tasks.Pleroma.Digest.run(["test", user2.nickname, yesterday_date])
+ ObanHelpers.perform_all()
+
assert_receive {:mix_shell, :info, [message]}
assert message =~ "Digest email have been sent"
diff --git a/test/user_test.exs b/test/user_test.exs
index 661ffc0b3..733467398 100644
--- a/test/user_test.exs
+++ b/test/user_test.exs
@@ -7,14 +7,16 @@ defmodule Pleroma.UserTest do
alias Pleroma.Builders.UserBuilder
alias Pleroma.Object
alias Pleroma.Repo
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI
use Pleroma.DataCase
+ use Oban.Testing, repo: Pleroma.Repo
- import Pleroma.Factory
import Mock
+ import Pleroma.Factory
setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -725,7 +727,9 @@ defmodule Pleroma.UserTest do
user3.nickname
]
- result = User.follow_import(user1, identifiers)
+ {:ok, job} = User.follow_import(user1, identifiers)
+ result = ObanHelpers.perform(job)
+
assert is_list(result)
assert result == [user2, user3]
end
@@ -936,7 +940,9 @@ defmodule Pleroma.UserTest do
user3.nickname
]
- result = User.blocks_import(user1, identifiers)
+ {:ok, job} = User.blocks_import(user1, identifiers)
+ result = ObanHelpers.perform(job)
+
assert is_list(result)
assert result == [user2, user3]
end
@@ -1053,7 +1059,9 @@ defmodule Pleroma.UserTest do
test "it deletes deactivated user" do
{:ok, user} = insert(:user, info: %{deactivated: true}) |> User.set_cache()
- assert {:ok, _} = User.delete(user)
+ {:ok, job} = User.delete(user)
+ {:ok, _user} = ObanHelpers.perform(job)
+
refute User.get_by_id(user.id)
end
@@ -1071,7 +1079,8 @@ defmodule Pleroma.UserTest do
{:ok, like_two, _} = CommonAPI.favorite(activity.id, follower)
{:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user)
- {:ok, _} = User.delete(user)
+ {:ok, job} = User.delete(user)
+ {:ok, _user} = ObanHelpers.perform(job)
follower = User.get_cached_by_id(follower.id)
@@ -1103,12 +1112,18 @@ defmodule Pleroma.UserTest do
{:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin")
{:ok, _} = User.follow(follower, user)
- {:ok, _user} = User.delete(user)
-
- assert called(
- Pleroma.Web.ActivityPub.Publisher.publish_one(%{
- inbox: "http://mastodon.example.org/inbox"
- })
+ {:ok, job} = User.delete(user)
+ {:ok, _user} = ObanHelpers.perform(job)
+
+ assert ObanHelpers.member?(
+ %{
+ "op" => "publish_one",
+ "params" => %{
+ "inbox" => "http://mastodon.example.org/inbox",
+ "id" => "pleroma:fakeid"
+ }
+ },
+ all_enqueued(worker: Pleroma.Workers.Publisher)
)
end
end
@@ -1153,7 +1168,8 @@ defmodule Pleroma.UserTest do
test "User.delete() plugs any possible zombie objects" do
user = insert(:user)
- {:ok, _} = User.delete(user)
+ {:ok, job} = User.delete(user)
+ {:ok, _} = ObanHelpers.perform(job)
{:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}")
diff --git a/test/web/activity_pub/activity_pub_controller_test.exs b/test/web/activity_pub/activity_pub_controller_test.exs
index 77f5e39fa..a214f57a6 100644
--- a/test/web/activity_pub/activity_pub_controller_test.exs
+++ b/test/web/activity_pub/activity_pub_controller_test.exs
@@ -4,15 +4,19 @@
defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
use Pleroma.Web.ConnCase
+ use Oban.Testing, repo: Pleroma.Repo
+
import Pleroma.Factory
alias Pleroma.Activity
alias Pleroma.Instances
alias Pleroma.Object
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ObjectView
alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.CommonAPI
+ alias Pleroma.Workers.Receiver, as: ReceiverWorker
setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -276,7 +280,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
|> post("/inbox", data)
assert "ok" == json_response(conn, 200)
- :timer.sleep(500)
+
+ ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"])
end
@@ -318,7 +323,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
|> post("/users/#{user.nickname}/inbox", data)
assert "ok" == json_response(conn, 200)
- :timer.sleep(500)
+ ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"])
end
@@ -347,7 +352,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
|> post("/users/#{recipient.nickname}/inbox", data)
assert "ok" == json_response(conn, 200)
- :timer.sleep(500)
+ ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"])
end
@@ -426,6 +431,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
|> post("/users/#{recipient.nickname}/inbox", data)
|> json_response(200)
+ ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
+
activity = Activity.get_by_ap_id(data["id"])
assert activity.id
@@ -501,6 +508,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
|> post("/users/#{user.nickname}/outbox", data)
result = json_response(conn, 201)
+
assert Activity.get_by_ap_id(result["id"])
end
diff --git a/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs b/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs
index 372e789be..95a809d25 100644
--- a/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs
+++ b/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
use Pleroma.DataCase
alias Pleroma.HTTP
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
import Mock
@@ -24,6 +25,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
test "it prefetches media proxy URIs" do
with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do
MediaProxyWarmingPolicy.filter(@message)
+
+ ObanHelpers.perform_all()
+ # Performing jobs which has been just enqueued
+ ObanHelpers.perform_all()
+
assert called(HTTP.get(:_, :_, :_))
end
end
diff --git a/test/web/activity_pub/publisher_test.exs b/test/web/activity_pub/publisher_test.exs
index 36a39c84c..26d019878 100644
--- a/test/web/activity_pub/publisher_test.exs
+++ b/test/web/activity_pub/publisher_test.exs
@@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
assert called(
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
inbox: "https://domain.com/users/nick1/inbox",
- actor: actor,
+ actor_id: actor.id,
id: note_activity.data["id"]
})
)
diff --git a/test/web/activity_pub/transmogrifier_test.exs b/test/web/activity_pub/transmogrifier_test.exs
index 629c76c97..42f9672d0 100644
--- a/test/web/activity_pub/transmogrifier_test.exs
+++ b/test/web/activity_pub/transmogrifier_test.exs
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
alias Pleroma.Object
alias Pleroma.Object.Fetcher
alias Pleroma.Repo
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
@@ -640,6 +641,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
|> Poison.decode!()
{:ok, _} = Transmogrifier.handle_incoming(data)
+ ObanHelpers.perform_all()
refute User.get_cached_by_ap_id(ap_id)
end
@@ -1202,6 +1204,8 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
assert user.info.note_count == 1
{:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye")
+ ObanHelpers.perform_all()
+
assert user.info.ap_enabled
assert user.info.note_count == 1
assert user.follower_address == "https://niu.moe/users/rye/followers"
diff --git a/test/web/digest_email_worker_test.exs b/test/web/digest_email_worker_test.exs
index 15002330f..5dfd920fa 100644
--- a/test/web/digest_email_worker_test.exs
+++ b/test/web/digest_email_worker_test.exs
@@ -7,6 +7,7 @@ defmodule Pleroma.DigestEmailWorkerTest do
import Pleroma.Factory
alias Pleroma.DigestEmailWorker
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.CommonAPI
@@ -23,6 +24,9 @@ defmodule Pleroma.DigestEmailWorkerTest do
CommonAPI.post(user, %{"status" => "hey @#{user2.nickname}!"})
DigestEmailWorker.perform()
+ ObanHelpers.perform_all()
+ # Performing job(s) enqueued at previous step
+ ObanHelpers.perform_all()
assert_received {:email, email}
assert email.to == [{user2.name, user2.email}]
diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs
index 09e54533f..5724672fd 100644
--- a/test/web/federator_test.exs
+++ b/test/web/federator_test.exs
@@ -4,9 +4,14 @@
defmodule Pleroma.Web.FederatorTest do
alias Pleroma.Instances
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator
+ alias Pleroma.Workers.Publisher, as: PublisherWorker
+
use Pleroma.DataCase
+ use Oban.Testing, repo: Pleroma.Repo
+
import Pleroma.Factory
import Mock
@@ -24,15 +29,6 @@ defmodule Pleroma.Web.FederatorTest do
clear_config([:instance, :rewrite_policy])
clear_config([:mrf_keyword])
- describe "Publisher.perform" do
- test "call `perform` with unknown task" do
- assert {
- :error,
- "Don't know what to do with this"
- } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok)
- end
- end
-
describe "Publish an activity" do
setup do
user = insert(:user)
@@ -53,6 +49,7 @@ defmodule Pleroma.Web.FederatorTest do
} do
with_mocks([relay_mock]) do
Federator.publish(activity)
+ ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
end
assert_received :relay_publish
@@ -66,6 +63,7 @@ defmodule Pleroma.Web.FederatorTest do
with_mocks([relay_mock]) do
Federator.publish(activity)
+ ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
end
refute_received :relay_publish
@@ -73,10 +71,7 @@ defmodule Pleroma.Web.FederatorTest do
end
describe "Targets reachability filtering in `publish`" do
- test_with_mock "it federates only to reachable instances via AP",
- Pleroma.Web.ActivityPub.Publisher,
- [:passthrough],
- [] do
+ test "it federates only to reachable instances via AP" do
user = insert(:user)
{inbox1, inbox2} =
@@ -104,20 +99,20 @@ defmodule Pleroma.Web.FederatorTest do
{:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
- assert called(
- Pleroma.Web.ActivityPub.Publisher.publish_one(%{
- inbox: inbox1,
- unreachable_since: dt
- })
- )
+ expected_dt = NaiveDateTime.to_iso8601(dt)
- refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2}))
+ ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
+
+ assert ObanHelpers.member?(
+ %{
+ "op" => "publish_one",
+ "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt}
+ },
+ all_enqueued(worker: PublisherWorker)
+ )
end
- test_with_mock "it federates only to reachable instances via Websub",
- Pleroma.Web.Websub,
- [:passthrough],
- [] do
+ test "it federates only to reachable instances via Websub" do
user = insert(:user)
websub_topic = Pleroma.Web.OStatus.feed_path(user)
@@ -142,23 +137,27 @@ defmodule Pleroma.Web.FederatorTest do
{:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
- assert called(
- Pleroma.Web.Websub.publish_one(%{
- callback: sub2.callback,
- unreachable_since: dt
- })
- )
+ expected_callback = sub2.callback
+ expected_dt = NaiveDateTime.to_iso8601(dt)
+
+ ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
- refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback}))
+ assert ObanHelpers.member?(
+ %{
+ "op" => "publish_one",
+ "params" => %{
+ "callback" => expected_callback,
+ "unreachable_since" => expected_dt
+ }
+ },
+ all_enqueued(worker: PublisherWorker)
+ )
end
- test_with_mock "it federates only to reachable instances via Salmon",
- Pleroma.Web.Salmon,
- [:passthrough],
- [] do
+ test "it federates only to reachable instances via Salmon" do
user = insert(:user)
- remote_user1 =
+ _remote_user1 =
insert(:user, %{
local: false,
nickname: "nick1@domain.com",
@@ -174,6 +173,8 @@ defmodule Pleroma.Web.FederatorTest do
info: %{salmon: "https://domain2.com/salmon"}
})
+ remote_user2_id = remote_user2.id
+
dt = NaiveDateTime.utc_now()
Instances.set_unreachable(remote_user2.ap_id, dt)
@@ -182,14 +183,20 @@ defmodule Pleroma.Web.FederatorTest do
{:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
- assert called(
- Pleroma.Web.Salmon.publish_one(%{
- recipient: remote_user2,
- unreachable_since: dt
- })
- )
+ expected_dt = NaiveDateTime.to_iso8601(dt)
+
+ ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
- refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1}))
+ assert ObanHelpers.member?(
+ %{
+ "op" => "publish_one",
+ "params" => %{
+ "recipient_id" => remote_user2_id,
+ "unreachable_since" => expected_dt
+ }
+ },
+ all_enqueued(worker: PublisherWorker)
+ )
end
end
@@ -209,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
}
- {:ok, _activity} = Federator.incoming_ap_doc(params)
+ assert {:ok, job} = Federator.incoming_ap_doc(params)
+ assert {:ok, _activity} = ObanHelpers.perform(job)
end
test "rejects incoming AP docs with incorrect origin" do
@@ -227,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
}
- :error = Federator.incoming_ap_doc(params)
+ assert {:ok, job} = Federator.incoming_ap_doc(params)
+ assert :error = ObanHelpers.perform(job)
end
test "it does not crash if MRF rejects the post" do
@@ -242,7 +251,8 @@ defmodule Pleroma.Web.FederatorTest do
File.read!("test/fixtures/mastodon-post-activity.json")
|> Poison.decode!()
- assert Federator.incoming_ap_doc(params) == :error
+ assert {:ok, job} = Federator.incoming_ap_doc(params)
+ assert :error = ObanHelpers.perform(job)
end
end
end
diff --git a/test/web/instances/instance_test.exs b/test/web/instances/instance_test.exs
index 3fd011fd3..0b53bc6cd 100644
--- a/test/web/instances/instance_test.exs
+++ b/test/web/instances/instance_test.exs
@@ -16,7 +16,8 @@ defmodule Pleroma.Instances.InstanceTest do
describe "set_reachable/1" do
test "clears `unreachable_since` of existing matching Instance record having non-nil `unreachable_since`" do
- instance = insert(:instance, unreachable_since: NaiveDateTime.utc_now())
+ unreachable_since = NaiveDateTime.to_iso8601(NaiveDateTime.utc_now())
+ instance = insert(:instance, unreachable_since: unreachable_since)
assert {:ok, instance} = Instance.set_reachable(instance.host)
refute instance.unreachable_since
diff --git a/test/web/mastodon_api/mastodon_api_controller_test.exs b/test/web/mastodon_api/mastodon_api_controller_test.exs
index 77430e9c9..0f40146fb 100644
--- a/test/web/mastodon_api/mastodon_api_controller_test.exs
+++ b/test/web/mastodon_api/mastodon_api_controller_test.exs
@@ -12,6 +12,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.ScheduledActivity
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI
@@ -3861,6 +3862,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do
end
test "it sends an email to user", %{user: user} do
+ ObanHelpers.perform_all()
token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)
email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token)
@@ -3921,6 +3923,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do
|> post("/api/v1/pleroma/accounts/confirmation_resend?email=#{user.email}")
|> json_response(:no_content)
+ ObanHelpers.perform_all()
+
email = Pleroma.Emails.UserEmail.account_confirmation_email(user)
notify_email = Config.get([:instance, :notify_email])
instance_name = Config.get([:instance, :name])
diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs
deleted file mode 100644
index ecb3ce5d0..000000000
--- a/test/web/retry_queue_test.exs
+++ /dev/null
@@ -1,48 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule MockActivityPub do
- def publish_one({ret, waiter}) do
- send(waiter, :complete)
- {ret, "success"}
- end
-end
-
-defmodule Pleroma.Web.Federator.RetryQueueTest do
- use Pleroma.DataCase
- alias Pleroma.Web.Federator.RetryQueue
-
- @small_retry_count 0
- @hopeless_retry_count 10
-
- setup do
- RetryQueue.reset_stats()
- end
-
- test "RetryQueue responds to stats request" do
- assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats()
- end
-
- test "failed posts are retried" do
- {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
-
- wait_task =
- Task.async(fn ->
- receive do
- :complete -> :ok
- end
- end)
-
- RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count)
- Task.await(wait_task)
- assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats()
- end
-
- test "posts that have been tried too many times are dropped" do
- {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
-
- RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count)
- assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats()
- end
-end
diff --git a/test/web/salmon/salmon_test.exs b/test/web/salmon/salmon_test.exs
index e86e76fe9..0186f3fef 100644
--- a/test/web/salmon/salmon_test.exs
+++ b/test/web/salmon/salmon_test.exs
@@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do
Salmon.publish(user, activity)
- assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user}))
+ assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))
end
end
diff --git a/test/web/twitter_api/twitter_api_controller_test.exs b/test/web/twitter_api/twitter_api_controller_test.exs
index 8ef14b4c5..598833893 100644
--- a/test/web/twitter_api/twitter_api_controller_test.exs
+++ b/test/web/twitter_api/twitter_api_controller_test.exs
@@ -12,6 +12,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.Repo
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI
@@ -1093,6 +1094,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do
end
test "it sends an email to user", %{user: user} do
+ ObanHelpers.perform_all()
token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)
email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token)
@@ -1200,6 +1202,8 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do
|> assign(:user, user)
|> post("/api/account/resend_confirmation_email?email=#{user.email}")
+ ObanHelpers.perform_all()
+
email = Pleroma.Emails.UserEmail.account_confirmation_email(user)
notify_email = Pleroma.Config.get([:instance, :notify_email])
instance_name = Pleroma.Config.get([:instance, :name])
diff --git a/test/web/twitter_api/twitter_api_test.exs b/test/web/twitter_api/twitter_api_test.exs
index cbe83852e..bf063a0de 100644
--- a/test/web/twitter_api/twitter_api_test.exs
+++ b/test/web/twitter_api/twitter_api_test.exs
@@ -7,6 +7,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do
alias Pleroma.Activity
alias Pleroma.Object
alias Pleroma.Repo
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.UserInviteToken
alias Pleroma.Web.ActivityPub.ActivityPub
@@ -321,6 +322,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do
}
{:ok, user} = TwitterAPI.register_user(data)
+ ObanHelpers.perform_all()
assert user.info.confirmation_pending
diff --git a/test/web/twitter_api/util_controller_test.exs b/test/web/twitter_api/util_controller_test.exs
index fe4ffdb59..349122c05 100644
--- a/test/web/twitter_api/util_controller_test.exs
+++ b/test/web/twitter_api/util_controller_test.exs
@@ -4,9 +4,11 @@
defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
use Pleroma.Web.ConnCase
+ use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Notification
alias Pleroma.Repo
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.CommonAPI
import Pleroma.Factory
@@ -43,8 +45,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
{File, [],
read!: fn "follow_list.txt" ->
"Account address,Show boosts\n#{user2.ap_id},true"
- end},
- {PleromaJobQueue, [:passthrough], []}
+ end}
]) do
response =
conn
@@ -52,15 +53,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
|> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}})
|> json_response(:ok)
- assert called(
- PleromaJobQueue.enqueue(
- :background,
- User,
- [:follow_import, user1, [user2.ap_id]]
- )
- )
-
assert response == "job started"
+
+ assert ObanHelpers.member?(
+ %{
+ "op" => "follow_import",
+ "follower_id" => user1.id,
+ "followed_identifiers" => [user2.ap_id]
+ },
+ all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
+ )
end
end
@@ -119,8 +121,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
user3 = insert(:user)
with_mocks([
- {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end},
- {PleromaJobQueue, [:passthrough], []}
+ {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
]) do
response =
conn
@@ -128,15 +129,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
|> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}})
|> json_response(:ok)
- assert called(
- PleromaJobQueue.enqueue(
- :background,
- User,
- [:blocks_import, user1, [user2.ap_id, user3.ap_id]]
- )
- )
-
assert response == "job started"
+
+ assert ObanHelpers.member?(
+ %{
+ "op" => "blocks_import",
+ "blocker_id" => user1.id,
+ "blocked_identifiers" => [user2.ap_id, user3.ap_id]
+ },
+ all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
+ )
end
end
end
@@ -589,6 +591,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
|> json_response(:ok)
assert response == %{"status" => "success"}
+ ObanHelpers.perform_all()
user = User.get_cached_by_id(user.id)
diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs
index 74386d7db..414610879 100644
--- a/test/web/websub/websub_test.exs
+++ b/test/web/websub/websub_test.exs
@@ -4,11 +4,14 @@
defmodule Pleroma.Web.WebsubTest do
use Pleroma.DataCase
+ use Oban.Testing, repo: Pleroma.Repo
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.Router.Helpers
alias Pleroma.Web.Websub
alias Pleroma.Web.Websub.WebsubClientSubscription
alias Pleroma.Web.Websub.WebsubServerSubscription
+ alias Pleroma.Workers.Subscriber, as: SubscriberWorker
import Pleroma.Factory
import Tesla.Mock
@@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do
})
_refresh = Websub.refresh_subscriptions()
+ ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))
assert still_good == Repo.get(WebsubClientSubscription, still_good.id)
refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id)