diff options
author | feld <feld@feld.me> | 2024-07-15 16:50:55 +0000 |
---|---|---|
committer | feld <feld@feld.me> | 2024-07-15 16:50:55 +0000 |
commit | cd535861e244c74aebe30b2f285e909429c3c079 (patch) | |
tree | bb0ff96de23889b8ea19f38ce7a66e2b47236f48 /lib | |
parent | 0ea63d824e6381723ad99e544fd8b08ea9d721fd (diff) | |
parent | 80e16de3bdf447aebd58037bb0a2c7a605e3c38e (diff) | |
download | pleroma-cd535861e244c74aebe30b2f285e909429c3c079.tar.gz pleroma-cd535861e244c74aebe30b2f285e909429c3c079.zip |
Merge branch 'oban/improvements' into 'develop'
Oban improvements
See merge request pleroma/pleroma!4176
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/instances/instance.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/user.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/workers/background_worker.ex | 12 | ||||
-rw-r--r-- | lib/pleroma/workers/cron/digest_emails_worker.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/workers/cron/new_users_digest_worker.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/workers/delete_worker.ex | 24 | ||||
-rw-r--r-- | lib/pleroma/workers/purge_expired_activity.ex | 12 | ||||
-rw-r--r-- | lib/pleroma/workers/rich_media_worker.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/workers/search_indexing_worker.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/workers/user_refresh_worker.ex | 3 |
10 files changed, 51 insertions, 19 deletions
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index c497a4fb7..288555146 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -10,7 +10,7 @@ defmodule Pleroma.Instances.Instance do alias Pleroma.Maps alias Pleroma.Repo alias Pleroma.User - alias Pleroma.Workers.BackgroundWorker + alias Pleroma.Workers.DeleteWorker use Ecto.Schema @@ -297,7 +297,7 @@ defmodule Pleroma.Instances.Instance do all of those users' activities and notifications. """ def delete_users_and_activities(host) when is_binary(host) do - BackgroundWorker.enqueue("delete_instance", %{"host" => host}) + DeleteWorker.enqueue("delete_instance", %{"host" => host}) end def perform(:delete_instance, host) when is_binary(host) do diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 7a8a68931..e28d76a7c 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -38,6 +38,7 @@ defmodule Pleroma.User do alias Pleroma.Web.OAuth alias Pleroma.Web.RelMe alias Pleroma.Workers.BackgroundWorker + alias Pleroma.Workers.DeleteWorker alias Pleroma.Workers.UserRefreshWorker require Logger @@ -1982,7 +1983,7 @@ defmodule Pleroma.User do def delete(%User{} = user) do # Purge the user immediately purge(user) - BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id}) + DeleteWorker.enqueue("delete_user", %{"user_id" => user.id}) end # *Actually* delete the user from the DB diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index dbf40ee1b..d74cc08f1 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -3,7 +3,6 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.BackgroundWorker do - alias Pleroma.Instances.Instance alias Pleroma.User use Pleroma.Workers.WorkerHelper, queue: "background" @@ -15,11 +14,6 @@ defmodule Pleroma.Workers.BackgroundWorker do User.perform(:set_activation_async, user, status) end - def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do - user = User.get_cached_by_id(user_id) - User.perform(:delete, user) - end - def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}}) do user = User.get_cached_by_id(user_id) User.perform(:force_password_reset, user) @@ -45,10 +39,6 @@ defmodule Pleroma.Workers.BackgroundWorker do User.perform(:verify_fields_links, user) end - def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do - Instance.perform(:delete_instance, host) - end - @impl Oban.Worker - def timeout(_job), do: :timer.seconds(900) + def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex index 0292bbb3b..17e92d10b 100644 --- a/lib/pleroma/workers/cron/digest_emails_worker.ex +++ b/lib/pleroma/workers/cron/digest_emails_worker.ex @@ -58,4 +58,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do User.touch_last_digest_emailed_at(user) end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex index d2abb2d3b..1f57aad4a 100644 --- a/lib/pleroma/workers/cron/new_users_digest_worker.ex +++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex @@ -60,4 +60,7 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do :ok end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/delete_worker.ex b/lib/pleroma/workers/delete_worker.ex new file mode 100644 index 000000000..97003fb69 --- /dev/null +++ b/lib/pleroma/workers/delete_worker.ex @@ -0,0 +1,24 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.DeleteWorker do + alias Pleroma.Instances.Instance + alias Pleroma.User + + use Pleroma.Workers.WorkerHelper, queue: "slow" + + @impl Oban.Worker + + def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do + user = User.get_cached_by_id(user_id) + User.perform(:delete, user) + end + + def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do + Instance.perform(:delete_instance, host) + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(900) +end diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex index a65593b6e..f48e34042 100644 --- a/lib/pleroma/workers/purge_expired_activity.ex +++ b/lib/pleroma/workers/purge_expired_activity.ex @@ -6,8 +6,8 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do @moduledoc """ Worker which purges expired activity. """ - - use Oban.Worker, queue: :slow, max_attempts: 1, unique: [period: :infinity] + @queue :background + use Oban.Worker, queue: @queue, max_attempts: 1, unique: [period: :infinity] import Ecto.Query @@ -46,20 +46,22 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do defp find_activity(id) do with nil <- Activity.get_by_id_with_object(id) do - {:error, :activity_not_found} + {:cancel, :activity_not_found} end end defp find_user(ap_id) do with nil <- Pleroma.User.get_by_ap_id(ap_id) do - {:error, :user_not_found} + {:cancel, :user_not_found} end end def get_expiration(id) do + queue = Atom.to_string(@queue) + from(j in Oban.Job, where: j.state == "scheduled", - where: j.queue == "slow", + where: j.queue == ^queue, where: fragment("?->>'activity_id' = ?", j.args, ^id) ) |> Pleroma.Repo.one() diff --git a/lib/pleroma/workers/rich_media_worker.ex b/lib/pleroma/workers/rich_media_worker.ex index f18ac658a..ecc980a28 100644 --- a/lib/pleroma/workers/rich_media_worker.ex +++ b/lib/pleroma/workers/rich_media_worker.ex @@ -16,4 +16,7 @@ defmodule Pleroma.Workers.RichMediaWorker do def perform(%Job{args: %{"op" => "backfill", "url" => _url} = args}) do Backfill.run(args) end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/search_indexing_worker.ex b/lib/pleroma/workers/search_indexing_worker.ex index 8476a2be5..8969ae378 100644 --- a/lib/pleroma/workers/search_indexing_worker.ex +++ b/lib/pleroma/workers/search_indexing_worker.ex @@ -20,4 +20,7 @@ defmodule Pleroma.Workers.SearchIndexingWorker do search_module.remove_from_index(object) end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/user_refresh_worker.ex b/lib/pleroma/workers/user_refresh_worker.ex index f43170c8f..0c04fb237 100644 --- a/lib/pleroma/workers/user_refresh_worker.ex +++ b/lib/pleroma/workers/user_refresh_worker.ex @@ -11,4 +11,7 @@ defmodule Pleroma.Workers.UserRefreshWorker do def perform(%Job{args: %{"ap_id" => ap_id}}) do User.fetch_by_ap_id(ap_id) end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) end |