diff options
| -rw-r--r-- | config/config.exs | 1 | ||||
| -rw-r--r-- | config/description.exs | 1 | ||||
| -rw-r--r-- | lib/pleroma/hashtag.ex | 60 | ||||
| -rw-r--r-- | lib/pleroma/migrators/hashtags_table_migrator.ex | 70 | ||||
| -rw-r--r-- | lib/pleroma/object.ex | 27 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/hashtags_cleanup_worker.ex | 57 | 
6 files changed, 114 insertions, 102 deletions
diff --git a/config/config.exs b/config/config.exs index 36c609936..91888c512 100644 --- a/config/config.exs +++ b/config/config.exs @@ -560,7 +560,6 @@ config :pleroma, Oban,    ],    plugins: [Oban.Plugins.Pruner],    crontab: [ -    {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},      {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},      {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}    ] diff --git a/config/description.exs b/config/description.exs index 02cdf2ff3..b2f301e2d 100644 --- a/config/description.exs +++ b/config/description.exs @@ -1964,7 +1964,6 @@ config :pleroma, :config_description, [          type: {:list, :tuple},          description: "Settings for cron background jobs",          suggestions: [ -          {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},            {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},            {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}          ] diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index 9e4c6c894..de52c4dae 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -6,14 +6,17 @@ defmodule Pleroma.Hashtag do    use Ecto.Schema    import Ecto.Changeset +  import Ecto.Query +  alias Ecto.Multi    alias Pleroma.Hashtag +  alias Pleroma.Object    alias Pleroma.Repo    schema "hashtags" do      field(:name, :string) -    many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete) +    many_to_many(:objects, Object, join_through: "hashtags_objects", on_replace: :delete)      timestamps()    end @@ -34,15 +37,27 @@ defmodule Pleroma.Hashtag do    end    def get_or_create_by_names(names) when is_list(names) do -    Enum.reduce_while(names, {:ok, []}, fn name, {:ok, list} -> -      case get_or_create_by_name(name) do -        {:ok, %Hashtag{} = hashtag} -> -          {:cont, {:ok, list ++ [hashtag]}} - -        error -> -          {:halt, error} -      end -    end) +    timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) + +    structs = +      Enum.map(names, fn name -> +        %Hashtag{} +        |> changeset(%{name: name}) +        |> Map.get(:changes) +        |> Map.merge(%{inserted_at: timestamp, updated_at: timestamp}) +      end) + +    with {:ok, %{query_op: hashtags}} <- +           Multi.new() +           |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing) +           |> Multi.run(:query_op, fn _repo, _changes -> +             {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} +           end) +           |> Repo.transaction() do +      {:ok, hashtags} +    else +      {:error, _name, value, _changes_so_far} -> {:error, value} +    end    end    def changeset(%Hashtag{} = struct, params) do @@ -52,4 +67,29 @@ defmodule Pleroma.Hashtag do      |> validate_required([:name])      |> unique_constraint(:name)    end + +  def unlink(%Object{id: object_id}) do +    with {_, hashtag_ids} <- +           from(hto in "hashtags_objects", +             where: hto.object_id == ^object_id, +             select: hto.hashtag_id +           ) +           |> Repo.delete_all() do +      delete_unreferenced(hashtag_ids) +    end +  end + +  @delete_unreferenced_query """ +  DELETE FROM hashtags WHERE id IN +    (SELECT hashtags.id FROM hashtags +      LEFT OUTER JOIN hashtags_objects +        ON hashtags_objects.hashtag_id = hashtags.id +      WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.id = ANY($1)); +  """ + +  def delete_unreferenced(ids) do +    with {:ok, %{num_rows: deleted_count}} <- Repo.query(@delete_unreferenced_query, [ids]) do +      {:ok, deleted_count} +    end +  end  end diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 9a036e0b2..c53f6be12 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -74,16 +74,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do    def handle_info(:migrate_hashtags, state) do      State.clear() -    data_migration = data_migration() +    update_status(:running) +    put_stat(:started_at, NaiveDateTime.utc_now()) +    data_migration = data_migration()      persistent_data = Map.take(data_migration.data, ["max_processed_id"])      {:ok, data_migration} =        DataMigration.update(data_migration, %{state: :running, data: persistent_data}) -    update_status(:running) -    put_stat(:started_at, NaiveDateTime.utc_now()) -      Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")      max_processed_id = data_migration.data["max_processed_id"] || 0 @@ -137,6 +136,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do      |> Stream.run()      with 0 <- failures_count(data_migration.id) do +      _ = delete_non_create_activities_hashtags() +        {:ok, data_migration} = DataMigration.update_state(data_migration, :complete)        handle_success(data_migration) @@ -150,9 +151,37 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do      {:noreply, state}    end +  @hashtags_objects_cleanup_query """ +  DELETE FROM hashtags_objects WHERE object_id IN +    (SELECT DISTINCT objects.id FROM objects +      JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities +        ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = +          (objects.data->>'id') +        AND activities.data->>'type' = 'Create' +      WHERE activities.id IS NULL); +  """ + +  @hashtags_cleanup_query """ +  DELETE FROM hashtags WHERE id IN +    (SELECT hashtags.id FROM hashtags +      LEFT OUTER JOIN hashtags_objects +        ON hashtags_objects.hashtag_id = hashtags.id +      WHERE hashtags_objects.hashtag_id IS NULL); +  """ + +  def delete_non_create_activities_hashtags do +    {:ok, %{num_rows: hashtags_objects_count}} = +      Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) + +    {:ok, %{num_rows: hashtags_count}} = +      Repo.query(@hashtags_cleanup_query, [], timeout: :infinity) + +    {:ok, hashtags_objects_count, hashtags_count} +  end +    defp query do      # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) -    # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later +    # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up      from(        object in Object,        where: @@ -182,25 +211,20 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do    defp transfer_object_hashtags(object, hashtags) do      Repo.transaction(fn ->        with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do -        for hashtag_record <- hashtag_records do -          with {:ok, _} <- -                 Repo.query( -                   "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", -                   [hashtag_record.id, object.id] -                 ) do -            nil -          else -            {:error, e} -> -              error = -                "ERROR: could not link object #{object.id} and hashtag " <> -                  "#{hashtag_record.id}: #{inspect(e)}" - -              Logger.error(error) -              Repo.rollback(object.id) -          end +        maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) +        expected_rows = length(hashtag_records) + +        with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do +          object.id +        else +          e -> +            error = +              "ERROR when inserting #{expected_rows} hashtags_objects " <> +                "for object #{object.id}: #{inspect(e)}" + +            Logger.error(error) +            Repo.rollback(object.id)          end - -        object.id        else          e ->            error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 52b77e41c..3ba749d1a 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -62,27 +62,30 @@ defmodule Pleroma.Object do      |> cast(params, [:data])      |> validate_required([:data])      |> unique_constraint(:ap_id, name: :objects_unique_apid_index) +    # Expecting `maybe_handle_hashtags_change/1` to run last:      |> maybe_handle_hashtags_change(struct)    end -  # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later +  # Note: not checking activity type (assuming non-legacy objects are associated with Create act.)    defp maybe_handle_hashtags_change(changeset, struct) do -    with data_hashtags_change = get_change(changeset, :data), -         true <- hashtags_changed?(struct, data_hashtags_change), +    with %Ecto.Changeset{valid?: true} <- changeset, +         data_hashtags_change = get_change(changeset, :data), +         {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},           {:ok, hashtag_records} <-             data_hashtags_change             |> object_data_hashtags()             |> Hashtag.get_or_create_by_names() do        put_assoc(changeset, :hashtags, hashtag_records)      else -      false -> +      %{valid?: false} ->          changeset -      {:error, hashtag_changeset} -> -        failed_hashtag = get_field(hashtag_changeset, :name) +      {:changed, false} -> +        changeset +      {:error, _} ->          validate_change(changeset, :data, fn _, _ -> -          [data: "error referencing hashtag: #{failed_hashtag}"] +          [data: "error referencing hashtags"]          end)      end    end @@ -221,9 +224,13 @@ defmodule Pleroma.Object do    def swap_object_with_tombstone(object) do      tombstone = make_tombstone(object) -    object -    |> Object.change(%{data: tombstone}) -    |> Repo.update() +    with {:ok, object} <- +           object +           |> Object.change(%{data: tombstone}) +           |> Repo.update() do +      Hashtag.unlink(object) +      {:ok, object} +    end    end    def delete(%Object{data: %{"id" => id}} = object) do diff --git a/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex b/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex deleted file mode 100644 index b319067ca..000000000 --- a/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex +++ /dev/null @@ -1,57 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Cron.HashtagsCleanupWorker do -  @moduledoc """ -  The worker to clean up unused hashtags_objects and hashtags. -  """ - -  use Oban.Worker, queue: "hashtags_cleanup" - -  alias Pleroma.Repo - -  require Logger - -  @hashtags_objects_query """ -  DELETE FROM hashtags_objects WHERE object_id IN -    (SELECT DISTINCT objects.id FROM objects -      JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities -        ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = -          (objects.data->>'id') -        AND activities.data->>'type' = 'Create' -      WHERE activities.id IS NULL); -  """ - -  @hashtags_query """ -  DELETE FROM hashtags WHERE id IN -    (SELECT hashtags.id FROM hashtags -      LEFT OUTER JOIN hashtags_objects -        ON hashtags_objects.hashtag_id = hashtags.id -      WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.inserted_at < $1); -  """ - -  @impl Oban.Worker -  def perform(_job) do -    Logger.info("Cleaning up unused `hashtags_objects` records...") - -    {:ok, %{num_rows: hashtags_objects_count}} = -      Repo.query(@hashtags_objects_query, [], timeout: :infinity) - -    Logger.info("Deleted #{hashtags_objects_count} unused `hashtags_objects` records.") - -    Logger.info("Cleaning up unused `hashtags` records...") - -    # Note: ignoring recently created hashtags since references are added after hashtag is created -    {:ok, %{num_rows: hashtags_count}} = -      Repo.query(@hashtags_query, [NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24)], -        timeout: :infinity -      ) - -    Logger.info("Deleted #{hashtags_count} unused `hashtags` records.") - -    Logger.info("HashtagsCleanupWorker complete.") - -    :ok -  end -end  | 
