diff options
| author | Maksim Pechnikov <parallel588@gmail.com> | 2020-02-11 08:35:26 +0300 | 
|---|---|---|
| committer | Maksim Pechnikov <parallel588@gmail.com> | 2020-02-11 08:35:26 +0300 | 
| commit | 58574ef15627dbae8f12ba59fd043c70d287f794 (patch) | |
| tree | e0cd96700cfe5b271336dec07570a58961b4f91d /lib | |
| parent | 3830cb538bd3aaee3fc48bc97b57230a558b98cf (diff) | |
| parent | 94e5ca11054567e0edc15ef3a350f02c386d3ead (diff) | |
| download | pleroma-58574ef15627dbae8f12ba59fd043c70d287f794.tar.gz pleroma-58574ef15627dbae8f12ba59fd043c70d287f794.zip | |
Merge branch 'develop' into issue/1276
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/pleroma/application.ex | 12 | ||||
| -rw-r--r-- | lib/pleroma/daemons/activity_expiration_daemon.ex | 66 | ||||
| -rw-r--r-- | lib/pleroma/daemons/digest_email_daemon.ex | 42 | ||||
| -rw-r--r-- | lib/pleroma/daemons/scheduled_activity_daemon.ex | 62 | ||||
| -rw-r--r-- | lib/pleroma/docs/generator.ex | 2 | ||||
| -rw-r--r-- | lib/pleroma/scheduled_activity.ex | 83 | ||||
| -rw-r--r-- | lib/pleroma/scheduler.ex | 7 | ||||
| -rw-r--r-- | lib/pleroma/stats.ex | 39 | ||||
| -rw-r--r-- | lib/pleroma/web/mastodon_api/controllers/status_controller.ex | 19 | ||||
| -rw-r--r-- | lib/pleroma/web/oauth/token/clean_worker.ex | 34 | ||||
| -rw-r--r-- | lib/pleroma/workers/activity_expiration_worker.ex | 18 | ||||
| -rw-r--r-- | lib/pleroma/workers/background_worker.ex | 5 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/clear_oauth_token_worker.ex | 21 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/digest_emails_worker.ex | 58 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/purge_expired_activities_worker.ex | 46 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/stats_worker.ex | 16 | ||||
| -rw-r--r-- | lib/pleroma/workers/digest_emails_worker.ex | 16 | ||||
| -rw-r--r-- | lib/pleroma/workers/scheduled_activity_worker.ex | 36 | 
18 files changed, 280 insertions, 302 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 2c8889ce5..27758cf94 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -42,12 +42,9 @@ defmodule Pleroma.Application do      children =        [          Pleroma.Repo, -        Pleroma.Scheduler,          Pleroma.Config.TransferTask,          Pleroma.Emoji,          Pleroma.Captcha, -        Pleroma.Daemons.ScheduledActivityDaemon, -        Pleroma.Daemons.ActivityExpirationDaemon,          Pleroma.Plugs.RateLimiter.Supervisor        ] ++          cachex_children() ++ @@ -58,7 +55,6 @@ defmodule Pleroma.Application do            {Oban, Pleroma.Config.get(Oban)}          ] ++          task_children(@env) ++ -        oauth_cleanup_child(oauth_cleanup_enabled?()) ++          streamer_child(@env) ++          chat_child(@env, chat_enabled?()) ++          [ @@ -160,20 +156,12 @@ defmodule Pleroma.Application do    defp chat_enabled?, do: Pleroma.Config.get([:chat, :enabled]) -  defp oauth_cleanup_enabled?, -    do: Pleroma.Config.get([:oauth2, :clean_expired_tokens], false) -    defp streamer_child(:test), do: []    defp streamer_child(_) do      [Pleroma.Web.Streamer.supervisor()]    end -  defp oauth_cleanup_child(true), -    do: [Pleroma.Web.OAuth.Token.CleanWorker] - -  defp oauth_cleanup_child(_), do: [] -    defp chat_child(_env, true) do      [Pleroma.Web.ChatChannel.ChatChannelState]    end diff --git a/lib/pleroma/daemons/activity_expiration_daemon.ex b/lib/pleroma/daemons/activity_expiration_daemon.ex deleted file mode 100644 index cab7628c4..000000000 --- a/lib/pleroma/daemons/activity_expiration_daemon.ex +++ /dev/null @@ -1,66 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Daemons.ActivityExpirationDaemon do -  alias Pleroma.Activity -  alias Pleroma.ActivityExpiration -  alias Pleroma.Config -  alias Pleroma.Repo -  alias Pleroma.User -  alias Pleroma.Web.CommonAPI - -  require Logger -  use GenServer -  import Ecto.Query - -  @schedule_interval :timer.minutes(1) - -  def start_link(_) do -    GenServer.start_link(__MODULE__, nil) -  end - -  @impl true -  def init(_) do -    if Config.get([ActivityExpiration, :enabled]) do -      schedule_next() -      {:ok, nil} -    else -      :ignore -    end -  end - -  def perform(:execute, expiration_id) do -    try do -      expiration = -        ActivityExpiration -        |> where([e], e.id == ^expiration_id) -        |> Repo.one!() - -      activity = Activity.get_by_id_with_object(expiration.activity_id) -      user = User.get_by_ap_id(activity.object.data["actor"]) -      CommonAPI.delete(activity.id, user) -    rescue -      error -> -        Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}") -    end -  end - -  @impl true -  def handle_info(:perform, state) do -    ActivityExpiration.due_expirations(@schedule_interval) -    |> Enum.each(fn expiration -> -      Pleroma.Workers.ActivityExpirationWorker.enqueue( -        "activity_expiration", -        %{"activity_expiration_id" => expiration.id} -      ) -    end) - -    schedule_next() -    {:noreply, state} -  end - -  defp schedule_next do -    Process.send_after(self(), :perform, @schedule_interval) -  end -end diff --git a/lib/pleroma/daemons/digest_email_daemon.ex b/lib/pleroma/daemons/digest_email_daemon.ex deleted file mode 100644 index b4c8eaad9..000000000 --- a/lib/pleroma/daemons/digest_email_daemon.ex +++ /dev/null @@ -1,42 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Daemons.DigestEmailDaemon do -  alias Pleroma.Repo -  alias Pleroma.Workers.DigestEmailsWorker - -  import Ecto.Query - -  def perform do -    config = Pleroma.Config.get([:email_notifications, :digest]) -    negative_interval = -Map.fetch!(config, :interval) -    inactivity_threshold = Map.fetch!(config, :inactivity_threshold) -    inactive_users_query = Pleroma.User.list_inactive_users_query(inactivity_threshold) - -    now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) - -    from(u in inactive_users_query, -      where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications), -      where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), -      select: u -    ) -    |> Repo.all() -    |> Enum.each(fn user -> -      DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id}) -    end) -  end - -  @doc """ -  Send digest email to the given user. -  Updates `last_digest_emailed_at` field for the user and returns the updated user. -  """ -  @spec perform(Pleroma.User.t()) :: Pleroma.User.t() -  def perform(user) do -    with %Swoosh.Email{} = email <- Pleroma.Emails.UserEmail.digest_email(user) do -      Pleroma.Emails.Mailer.deliver_async(email) -    end - -    Pleroma.User.touch_last_digest_emailed_at(user) -  end -end diff --git a/lib/pleroma/daemons/scheduled_activity_daemon.ex b/lib/pleroma/daemons/scheduled_activity_daemon.ex deleted file mode 100644 index aee5f723a..000000000 --- a/lib/pleroma/daemons/scheduled_activity_daemon.ex +++ /dev/null @@ -1,62 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Daemons.ScheduledActivityDaemon do -  @moduledoc """ -  Sends scheduled activities to the job queue. -  """ - -  alias Pleroma.Config -  alias Pleroma.ScheduledActivity -  alias Pleroma.User -  alias Pleroma.Web.CommonAPI - -  use GenServer -  require Logger - -  @schedule_interval :timer.minutes(1) - -  def start_link(_) do -    GenServer.start_link(__MODULE__, nil) -  end - -  def init(_) do -    if Config.get([ScheduledActivity, :enabled]) do -      schedule_next() -      {:ok, nil} -    else -      :ignore -    end -  end - -  def perform(:execute, scheduled_activity_id) do -    try do -      {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id) -      %User{} = user = User.get_cached_by_id(scheduled_activity.user_id) -      {:ok, _result} = CommonAPI.post(user, scheduled_activity.params) -    rescue -      error -> -        Logger.error( -          "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}" -        ) -    end -  end - -  def handle_info(:perform, state) do -    ScheduledActivity.due_activities(@schedule_interval) -    |> Enum.each(fn scheduled_activity -> -      Pleroma.Workers.ScheduledActivityWorker.enqueue( -        "execute", -        %{"activity_id" => scheduled_activity.id} -      ) -    end) - -    schedule_next() -    {:noreply, state} -  end - -  defp schedule_next do -    Process.send_after(self(), :perform, @schedule_interval) -  end -end diff --git a/lib/pleroma/docs/generator.ex b/lib/pleroma/docs/generator.ex index 6b12dcdd9..e0fc8cd02 100644 --- a/lib/pleroma/docs/generator.ex +++ b/lib/pleroma/docs/generator.ex @@ -13,7 +13,7 @@ defmodule Pleroma.Docs.Generator do        |> Enum.filter(&String.ends_with?(&1, ".ex"))        |> Enum.map(fn filename ->          module = filename |> String.trim_trailing(".ex") |> Macro.camelize() -        String.to_existing_atom(start <> module) +        String.to_atom(start <> module)        end)      end    end diff --git a/lib/pleroma/scheduled_activity.ex b/lib/pleroma/scheduled_activity.ex index fea2cf3ff..e81bfcd7d 100644 --- a/lib/pleroma/scheduled_activity.ex +++ b/lib/pleroma/scheduled_activity.ex @@ -5,15 +5,19 @@  defmodule Pleroma.ScheduledActivity do    use Ecto.Schema +  alias Ecto.Multi    alias Pleroma.Config    alias Pleroma.Repo    alias Pleroma.ScheduledActivity    alias Pleroma.User    alias Pleroma.Web.CommonAPI.Utils +  alias Pleroma.Workers.ScheduledActivityWorker    import Ecto.Query    import Ecto.Changeset +  @type t :: %__MODULE__{} +    @min_offset :timer.minutes(5)    schema "scheduled_activities" do @@ -105,16 +109,32 @@ defmodule Pleroma.ScheduledActivity do    end    def new(%User{} = user, attrs) do -    %ScheduledActivity{user_id: user.id} -    |> changeset(attrs) +    changeset(%ScheduledActivity{user_id: user.id}, attrs)    end +  @doc """ +  Creates ScheduledActivity and add to queue to perform at scheduled_at date +  """ +  @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}    def create(%User{} = user, attrs) do -    user -    |> new(attrs) -    |> Repo.insert() +    Multi.new() +    |> Multi.insert(:scheduled_activity, new(user, attrs)) +    |> maybe_add_jobs(Config.get([ScheduledActivity, :enabled])) +    |> Repo.transaction() +    |> transaction_response +  end + +  defp maybe_add_jobs(multi, true) do +    multi +    |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} -> +      %{activity_id: activity.id} +      |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at) +      |> Oban.insert() +    end)    end +  defp maybe_add_jobs(multi, _), do: multi +    def get(%User{} = user, scheduled_activity_id) do      ScheduledActivity      |> where(user_id: ^user.id) @@ -122,25 +142,43 @@ defmodule Pleroma.ScheduledActivity do      |> Repo.one()    end -  def update(%ScheduledActivity{} = scheduled_activity, attrs) do -    scheduled_activity -    |> update_changeset(attrs) -    |> Repo.update() +  @spec update(ScheduledActivity.t(), map()) :: +          {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()} +  def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do +    with {:error, %Ecto.Changeset{valid?: true} = changeset} <- +           {:error, update_changeset(scheduled_activity, attrs)} do +      Multi.new() +      |> Multi.update(:scheduled_activity, changeset) +      |> Multi.update_all(:scheduled_job, job_query(id), +        set: [scheduled_at: get_field(changeset, :scheduled_at)] +      ) +      |> Repo.transaction() +      |> transaction_response +    end    end -  def delete(%ScheduledActivity{} = scheduled_activity) do -    scheduled_activity -    |> Repo.delete() +  @doc "Deletes a ScheduledActivity and linked jobs." +  @spec delete(ScheduledActivity.t() | binary() | integer) :: +          {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()} +  def delete(%ScheduledActivity{id: id} = scheduled_activity) do +    Multi.new() +    |> Multi.delete(:scheduled_activity, scheduled_activity, stale_error_field: :id) +    |> Multi.delete_all(:jobs, job_query(id)) +    |> Repo.transaction() +    |> transaction_response    end    def delete(id) when is_binary(id) or is_integer(id) do -    ScheduledActivity -    |> where(id: ^id) -    |> select([sa], sa) -    |> Repo.delete_all() -    |> case do -      {1, [scheduled_activity]} -> {:ok, scheduled_activity} -      _ -> :error +    delete(%__MODULE__{id: id}) +  end + +  defp transaction_response(result) do +    case result do +      {:ok, %{scheduled_activity: scheduled_activity}} -> +        {:ok, scheduled_activity} + +      {:error, _, changeset, _} -> +        {:error, changeset}      end    end @@ -158,4 +196,11 @@ defmodule Pleroma.ScheduledActivity do      |> where([sa], sa.scheduled_at < ^naive_datetime)      |> Repo.all()    end + +  def job_query(scheduled_activity_id) do +    from(j in Oban.Job, +      where: j.queue == "scheduled_activities", +      where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id)) +    ) +  end  end diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex deleted file mode 100644 index d84cd99ad..000000000 --- a/lib/pleroma/scheduler.ex +++ /dev/null @@ -1,7 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Scheduler do -  use Quantum.Scheduler, otp_app: :pleroma -end diff --git a/lib/pleroma/stats.ex b/lib/pleroma/stats.ex index 8154a09b7..cf590fb01 100644 --- a/lib/pleroma/stats.ex +++ b/lib/pleroma/stats.ex @@ -9,22 +9,43 @@ defmodule Pleroma.Stats do    use GenServer -  @interval 1000 * 60 * 60 +  @init_state %{ +    peers: [], +    stats: %{ +      domain_count: 0, +      status_count: 0, +      user_count: 0 +    } +  }    def start_link(_) do -    GenServer.start_link(__MODULE__, initial_data(), name: __MODULE__) +    GenServer.start_link( +      __MODULE__, +      @init_state, +      name: __MODULE__ +    )    end +  @doc "Performs update stats"    def force_update do      GenServer.call(__MODULE__, :force_update)    end +  @doc "Performs collect stats" +  def do_collect do +    GenServer.cast(__MODULE__, :run_update) +  end + +  @doc "Returns stats data" +  @spec get_stats() :: %{domain_count: integer(), status_count: integer(), user_count: integer()}    def get_stats do      %{stats: stats} = GenServer.call(__MODULE__, :get_state)      stats    end +  @doc "Returns list peers" +  @spec get_peers() :: list(String.t())    def get_peers do      %{peers: peers} = GenServer.call(__MODULE__, :get_state) @@ -32,7 +53,6 @@ defmodule Pleroma.Stats do    end    def init(args) do -    Process.send(self(), :run_update, [])      {:ok, args}    end @@ -45,17 +65,12 @@ defmodule Pleroma.Stats do      {:reply, state, state}    end -  def handle_info(:run_update, _state) do +  def handle_cast(:run_update, _state) do      new_stats = get_stat_data() -    Process.send_after(self(), :run_update, @interval)      {:noreply, new_stats}    end -  defp initial_data do -    %{peers: [], stats: %{}} -  end -    defp get_stat_data do      peers =        from( @@ -74,7 +89,11 @@ defmodule Pleroma.Stats do      %{        peers: peers, -      stats: %{domain_count: domain_count, status_count: status_count, user_count: user_count} +      stats: %{ +        domain_count: domain_count, +        status_count: status_count, +        user_count: user_count +      }      }    end  end diff --git a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex index 1149fb469..287d1631c 100644 --- a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex @@ -124,15 +124,18 @@ defmodule Pleroma.Web.MastodonAPI.StatusController do        ) do      params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"]) -    if ScheduledActivity.far_enough?(scheduled_at) do -      with {:ok, scheduled_activity} <- -             ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do -        conn -        |> put_view(ScheduledActivityView) -        |> render("show.json", scheduled_activity: scheduled_activity) -      end +    with {:far_enough, true} <- {:far_enough, ScheduledActivity.far_enough?(scheduled_at)}, +         attrs <- %{"params" => params, "scheduled_at" => scheduled_at}, +         {:ok, scheduled_activity} <- ScheduledActivity.create(user, attrs) do +      conn +      |> put_view(ScheduledActivityView) +      |> render("show.json", scheduled_activity: scheduled_activity)      else -      create(conn, Map.drop(params, ["scheduled_at"])) +      {:far_enough, _} -> +        create(conn, Map.drop(params, ["scheduled_at"])) + +      error -> +        error      end    end diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex deleted file mode 100644 index 3c9c580d5..000000000 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ /dev/null @@ -1,34 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.OAuth.Token.CleanWorker do -  @moduledoc """ -  The module represents functions to clean an expired oauth tokens. -  """ -  use GenServer - -  @ten_seconds 10_000 -  @one_day 86_400_000 - -  alias Pleroma.Web.OAuth.Token -  alias Pleroma.Workers.BackgroundWorker - -  def start_link(_), do: GenServer.start_link(__MODULE__, %{}) - -  def init(_) do -    Process.send_after(self(), :perform, @ten_seconds) -    {:ok, nil} -  end - -  @doc false -  def handle_info(:perform, state) do -    BackgroundWorker.enqueue("clean_expired_tokens", %{}) -    interval = Pleroma.Config.get([:oauth2, :clean_expired_tokens_interval], @one_day) - -    Process.send_after(self(), :perform, interval) -    {:noreply, state} -  end - -  def perform(:clean), do: Token.delete_expired_tokens() -end diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex deleted file mode 100644 index 4e3e4195f..000000000 --- a/lib/pleroma/workers/activity_expiration_worker.ex +++ /dev/null @@ -1,18 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.ActivityExpirationWorker do -  use Pleroma.Workers.WorkerHelper, queue: "activity_expiration" - -  @impl Oban.Worker -  def perform( -        %{ -          "op" => "activity_expiration", -          "activity_expiration_id" => activity_expiration_id -        }, -        _job -      ) do -    Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id) -  end -end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 323a4da1e..ac2fe6946 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -6,7 +6,6 @@ defmodule Pleroma.Workers.BackgroundWorker do    alias Pleroma.Activity    alias Pleroma.User    alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy -  alias Pleroma.Web.OAuth.Token.CleanWorker    use Pleroma.Workers.WorkerHelper, queue: "background" @@ -55,10 +54,6 @@ defmodule Pleroma.Workers.BackgroundWorker do      User.perform(:follow_import, follower, followed_identifiers)    end -  def perform(%{"op" => "clean_expired_tokens"}, _job) do -    CleanWorker.perform(:clean) -  end -    def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do      MediaProxyWarmingPolicy.perform(:preload, message)    end diff --git a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex new file mode 100644 index 000000000..a24407874 --- /dev/null +++ b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do +  @moduledoc """ +  The worker to cleanup expired oAuth tokens. +  """ + +  use Oban.Worker, queue: "background" + +  alias Pleroma.Config +  alias Pleroma.Web.OAuth.Token + +  @impl Oban.Worker +  def perform(_opts, _job) do +    if Config.get([:oauth2, :clean_expired_tokens], false) do +      Token.delete_expired_tokens() +    end +  end +end diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex new file mode 100644 index 000000000..0a00129df --- /dev/null +++ b/lib/pleroma/workers/cron/digest_emails_worker.ex @@ -0,0 +1,58 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Cron.DigestEmailsWorker do +  @moduledoc """ +  The worker to send digest emails. +  """ + +  use Oban.Worker, queue: "digest_emails" + +  alias Pleroma.Config +  alias Pleroma.Emails +  alias Pleroma.Repo +  alias Pleroma.User + +  import Ecto.Query + +  require Logger + +  @impl Oban.Worker +  def perform(_opts, _job) do +    config = Config.get([:email_notifications, :digest]) + +    if config[:active] do +      negative_interval = -Map.fetch!(config, :interval) +      inactivity_threshold = Map.fetch!(config, :inactivity_threshold) +      inactive_users_query = User.list_inactive_users_query(inactivity_threshold) + +      now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) + +      from(u in inactive_users_query, +        where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications), +        where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), +        select: u +      ) +      |> Repo.all() +      |> send_emails +    end +  end + +  def send_emails(users) do +    Enum.each(users, &send_email/1) +  end + +  @doc """ +  Send digest email to the given user. +  Updates `last_digest_emailed_at` field for the user and returns the updated user. +  """ +  @spec send_email(User.t()) :: User.t() +  def send_email(user) do +    with %Swoosh.Email{} = email <- Emails.UserEmail.digest_email(user) do +      Emails.Mailer.deliver_async(email) +    end + +    User.touch_last_digest_emailed_at(user) +  end +end diff --git a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex new file mode 100644 index 000000000..7a52860a9 --- /dev/null +++ b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex @@ -0,0 +1,46 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do +  @moduledoc """ +  The worker to purge expired activities. +  """ + +  use Oban.Worker, queue: "activity_expiration" + +  alias Pleroma.Activity +  alias Pleroma.ActivityExpiration +  alias Pleroma.Config +  alias Pleroma.User +  alias Pleroma.Web.CommonAPI + +  require Logger + +  @interval :timer.minutes(1) + +  @impl Oban.Worker +  def perform(_opts, _job) do +    if Config.get([ActivityExpiration, :enabled]) do +      Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1) +    end +  end + +  def delete_activity(%ActivityExpiration{activity_id: activity_id}) do +    with {:activity, %Activity{} = activity} <- +           {:activity, Activity.get_by_id_with_object(activity_id)}, +         {:user, %User{} = user} <- {:user, User.get_by_ap_id(activity.object.data["actor"])} do +      CommonAPI.delete(activity.id, user) +    else +      {:activity, _} -> +        Logger.error( +          "#{__MODULE__} Couldn't delete expired activity: not found activity ##{activity_id}" +        ) + +      {:user, _} -> +        Logger.error( +          "#{__MODULE__} Couldn't delete expired activity: not found actorof ##{activity_id}" +        ) +    end +  end +end diff --git a/lib/pleroma/workers/cron/stats_worker.ex b/lib/pleroma/workers/cron/stats_worker.ex new file mode 100644 index 000000000..425ad41ca --- /dev/null +++ b/lib/pleroma/workers/cron/stats_worker.ex @@ -0,0 +1,16 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Cron.StatsWorker do +  @moduledoc """ +  The worker to update peers statistics. +  """ + +  use Oban.Worker, queue: "background" + +  @impl Oban.Worker +  def perform(_opts, _job) do +    Pleroma.Stats.do_collect() +  end +end diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex deleted file mode 100644 index 3e5a836d0..000000000 --- a/lib/pleroma/workers/digest_emails_worker.ex +++ /dev/null @@ -1,16 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.DigestEmailsWorker do -  alias Pleroma.User - -  use Pleroma.Workers.WorkerHelper, queue: "digest_emails" - -  @impl Oban.Worker -  def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do -    user_id -    |> User.get_cached_by_id() -    |> Pleroma.Daemons.DigestEmailDaemon.perform() -  end -end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex index ca7d53af1..bd41ab4ce 100644 --- a/lib/pleroma/workers/scheduled_activity_worker.ex +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -3,10 +3,42 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Workers.ScheduledActivityWorker do +  @moduledoc """ +  The worker to post scheduled activity. +  """ +    use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" +  alias Pleroma.Config +  alias Pleroma.ScheduledActivity +  alias Pleroma.User +  alias Pleroma.Web.CommonAPI + +  require Logger +    @impl Oban.Worker -  def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do -    Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id) +  def perform(%{"activity_id" => activity_id}, _job) do +    if Config.get([ScheduledActivity, :enabled]) do +      case Pleroma.Repo.get(ScheduledActivity, activity_id) do +        %ScheduledActivity{} = scheduled_activity -> +          post_activity(scheduled_activity) + +        _ -> +          Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}") +      end +    end +  end + +  defp post_activity(%ScheduledActivity{user_id: user_id, params: params} = scheduled_activity) do +    with {:delete, {:ok, _}} <- {:delete, ScheduledActivity.delete(scheduled_activity)}, +         {:user, %User{} = user} <- {:user, User.get_cached_by_id(user_id)}, +         {:post, {:ok, _}} <- {:post, CommonAPI.post(user, params)} do +      :ok +    else +      error -> +        Logger.error( +          "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}" +        ) +    end    end  end | 
