diff options
| author | Maksim Pechnikov <parallel588@gmail.com> | 2020-06-23 15:09:01 +0300 | 
|---|---|---|
| committer | Maksim Pechnikov <parallel588@gmail.com> | 2020-06-23 15:09:01 +0300 | 
| commit | a8d967762ec5436ca9b478fbbedfec39b5d9e35e (patch) | |
| tree | ecda2fabd4200b1e2a676c56f87636f4cc0f6a10 /lib | |
| parent | b7f297f0bd214416f3faded4a6221899b68edabd (diff) | |
| download | pleroma-a8d967762ec5436ca9b478fbbedfec39b5d9e35e.tar.gz pleroma-a8d967762ec5436ca9b478fbbedfec39b5d9e35e.zip | |
migrate to oban 2.0-rc1
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/application.ex | 14 | ||||
| -rw-r--r-- | lib/pleroma/workers/attachments_cleanup_worker.ex | 11 | ||||
| -rw-r--r-- | lib/pleroma/workers/background_worker.ex | 34 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/clear_oauth_token_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/digest_emails_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/new_users_digest_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/purge_expired_activities_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/stats_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/mailer_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/publisher_worker.ex | 6 | ||||
| -rw-r--r-- | lib/pleroma/workers/receiver_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/remote_fetcher_worker.ex | 8 | ||||
| -rw-r--r-- | lib/pleroma/workers/scheduled_activity_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/transmogrifier_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/web_pusher_worker.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/workers/worker_helper.ex | 4 | 
16 files changed, 52 insertions, 45 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 9615af122..fb2731f97 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -80,7 +80,7 @@ defmodule Pleroma.Application do          [            Pleroma.Stats,            Pleroma.JobQueueMonitor, -          {Oban, Config.get(Oban)} +          {Oban, oban_config()}          ] ++          task_children(@env) ++          streamer_child(@env) ++ @@ -138,6 +138,18 @@ defmodule Pleroma.Application do      Pleroma.Web.Endpoint.Instrumenter.setup()    end +  defp oban_config do +    config = Config.get(Oban) + +    if Code.ensure_loaded?(IEx) and IEx.started?() do +      config +      |> Keyword.put(:crontab, false) +      |> Keyword.put(:queues, false) +    else +      config +    end +  end +    defp cachex_children do      [        build_cachex("used_captcha", ttl_interval: seconds_valid_interval()), diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex index 8deeabda0..58226b395 100644 --- a/lib/pleroma/workers/attachments_cleanup_worker.ex +++ b/lib/pleroma/workers/attachments_cleanup_worker.ex @@ -11,13 +11,12 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do    use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"    @impl Oban.Worker -  def perform( -        %{ +  def perform(%Job{ +        args: %{            "op" => "cleanup_attachments",            "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}} -        }, -        _job -      ) do +        } +      }) do      attachments      |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)      |> fetch_objects @@ -28,7 +27,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do      {:ok, :success}    end -  def perform(%{"op" => "cleanup_attachments", "object" => _object}, _job), do: {:ok, :skip} +  def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}    defp do_clean({object_ids, attachment_urls}) do      uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 57c3a9c3a..cec5a7462 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -11,59 +11,59 @@ defmodule Pleroma.Workers.BackgroundWorker do    @impl Oban.Worker -  def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do +  def perform(%Job{args: %{"op" => "deactivate_user", "user_id" => user_id, "status" => status}}) do      user = User.get_cached_by_id(user_id)      User.perform(:deactivate_async, user, status)    end -  def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do +  def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do      user = User.get_cached_by_id(user_id)      User.perform(:delete, user)    end -  def perform(%{"op" => "force_password_reset", "user_id" => user_id}, _job) do +  def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}}) do      user = User.get_cached_by_id(user_id)      User.perform(:force_password_reset, user)    end -  def perform( -        %{ +  def perform(%Job{ +        args: %{            "op" => "blocks_import",            "blocker_id" => blocker_id,            "blocked_identifiers" => blocked_identifiers -        }, -        _job -      ) do +        } +      }) do      blocker = User.get_cached_by_id(blocker_id)      {:ok, User.perform(:blocks_import, blocker, blocked_identifiers)}    end -  def perform( -        %{ +  def perform(%Job{ +        args: %{            "op" => "follow_import",            "follower_id" => follower_id,            "followed_identifiers" => followed_identifiers -        }, -        _job -      ) do +        } +      }) do      follower = User.get_cached_by_id(follower_id)      {:ok, User.perform(:follow_import, follower, followed_identifiers)}    end -  def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do +  def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do      MediaProxyWarmingPolicy.perform(:preload, message)    end -  def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do +  def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do      MediaProxyWarmingPolicy.perform(:prefetch, url)    end -  def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do +  def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do      activity = Activity.get_by_id(activity_id)      Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)    end -  def perform(%{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id}, _) do +  def perform(%Job{ +        args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id} +      }) do      origin = User.get_cached_by_id(origin_id)      target = User.get_cached_by_id(target_id) diff --git a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex index a4c3b9516..d41be4e87 100644 --- a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex +++ b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex @@ -13,7 +13,7 @@ defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do    alias Pleroma.Web.OAuth.Token    @impl Oban.Worker -  def perform(_opts, _job) do +  def perform(_job) do      if Config.get([:oauth2, :clean_expired_tokens], false) do        Token.delete_expired_tokens()      else diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex index 7f09ff3cf..ee646229f 100644 --- a/lib/pleroma/workers/cron/digest_emails_worker.ex +++ b/lib/pleroma/workers/cron/digest_emails_worker.ex @@ -19,7 +19,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do    require Logger    @impl Oban.Worker -  def perform(_opts, _job) do +  def perform(_job) do      config = Config.get([:email_notifications, :digest])      if config[:active] do diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex index 5c816b3fe..abc8a5e95 100644 --- a/lib/pleroma/workers/cron/new_users_digest_worker.ex +++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex @@ -12,7 +12,7 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do    use Pleroma.Workers.WorkerHelper, queue: "new_users_digest"    @impl Oban.Worker -  def perform(_args, _job) do +  def perform(_job) do      if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do        today = NaiveDateTime.utc_now() |> Timex.beginning_of_day() diff --git a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex index 84b3b84de..e926c5dc8 100644 --- a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex +++ b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex @@ -20,7 +20,7 @@ defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do    @interval :timer.minutes(1)    @impl Oban.Worker -  def perform(_opts, _job) do +  def perform(_job) do      if Config.get([ActivityExpiration, :enabled]) do        Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)      else diff --git a/lib/pleroma/workers/cron/stats_worker.ex b/lib/pleroma/workers/cron/stats_worker.ex index e9b8d59c4..e54bd9a7f 100644 --- a/lib/pleroma/workers/cron/stats_worker.ex +++ b/lib/pleroma/workers/cron/stats_worker.ex @@ -10,7 +10,7 @@ defmodule Pleroma.Workers.Cron.StatsWorker do    use Oban.Worker, queue: "background"    @impl Oban.Worker -  def perform(_opts, _job) do +  def perform(_job) do      Pleroma.Stats.do_collect()    end  end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex index 6955338a5..32273cfa5 100644 --- a/lib/pleroma/workers/mailer_worker.ex +++ b/lib/pleroma/workers/mailer_worker.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Workers.MailerWorker do    use Pleroma.Workers.WorkerHelper, queue: "mailer"    @impl Oban.Worker -  def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do +  def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do      encoded_email      |> Base.decode64!()      |> :erlang.binary_to_term() diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex index daf79efc0..e739c3cd0 100644 --- a/lib/pleroma/workers/publisher_worker.ex +++ b/lib/pleroma/workers/publisher_worker.ex @@ -8,17 +8,17 @@ defmodule Pleroma.Workers.PublisherWorker do    use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" -  def backoff(attempt) when is_integer(attempt) do +  def backoff(%Job{attempt: attempt}) when is_integer(attempt) do      Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)    end    @impl Oban.Worker -  def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do +  def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do      activity = Activity.get_by_id(activity_id)      Federator.perform(:publish, activity)    end -  def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do +  def perform(%Job{args: %{"op" => "publish_one", "module" => module_name, "params" => params}}) do      params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)      Federator.perform(:publish_one, String.to_atom(module_name), params)    end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index f7a7124f3..1b97af1a8 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -8,7 +8,7 @@ defmodule Pleroma.Workers.ReceiverWorker do    use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"    @impl Oban.Worker -  def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do +  def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do      Federator.perform(:incoming_ap_doc, params)    end  end diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex index ec6534f21..27e2e3386 100644 --- a/lib/pleroma/workers/remote_fetcher_worker.ex +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -8,13 +8,7 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do    use Pleroma.Workers.WorkerHelper, queue: "remote_fetcher"    @impl Oban.Worker -  def perform( -        %{ -          "op" => "fetch_remote", -          "id" => id -        } = args, -        _job -      ) do +  def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do      {:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"])    end  end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex index 97d1efbfb..dd9986fe4 100644 --- a/lib/pleroma/workers/scheduled_activity_worker.ex +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -17,7 +17,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do    require Logger    @impl Oban.Worker -  def perform(%{"activity_id" => activity_id}, _job) do +  def perform(%Job{args: %{"activity_id" => activity_id}}) do      if Config.get([ScheduledActivity, :enabled]) do        case Pleroma.Repo.get(ScheduledActivity, activity_id) do          %ScheduledActivity{} = scheduled_activity -> diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex index 11239ca5e..15f36375c 100644 --- a/lib/pleroma/workers/transmogrifier_worker.ex +++ b/lib/pleroma/workers/transmogrifier_worker.ex @@ -8,7 +8,7 @@ defmodule Pleroma.Workers.TransmogrifierWorker do    use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"    @impl Oban.Worker -  def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do +  def perform(%Job{args: %{"op" => "user_upgrade", "user_id" => user_id}}) do      user = User.get_cached_by_id(user_id)      Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)    end diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex index 58ad25e39..0cfdc6a6f 100644 --- a/lib/pleroma/workers/web_pusher_worker.ex +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -9,7 +9,7 @@ defmodule Pleroma.Workers.WebPusherWorker do    use Pleroma.Workers.WorkerHelper, queue: "web_push"    @impl Oban.Worker -  def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do +  def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do      notification =        Notification        |> Repo.get(notification_id) diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex index d1f90c35b..7d1289be2 100644 --- a/lib/pleroma/workers/worker_helper.ex +++ b/lib/pleroma/workers/worker_helper.ex @@ -32,6 +32,8 @@ defmodule Pleroma.Workers.WorkerHelper do          queue: unquote(queue),          max_attempts: 1 +      alias Oban.Job +        def enqueue(op, params, worker_args \\ []) do          params = Map.merge(%{"op" => op}, params)          queue_atom = String.to_atom(unquote(queue)) @@ -39,7 +41,7 @@ defmodule Pleroma.Workers.WorkerHelper do          unquote(caller_module)          |> apply(:new, [params, worker_args]) -        |> Pleroma.Repo.insert() +        |> Oban.insert()        end      end    end | 
