From ee221277b05d2f682c340c1e1b81fbce4931735a Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Mon, 21 Dec 2020 22:54:26 +0300 Subject: Encapsulation of tags / hashtags fetching from objects. --- lib/pleroma/activity/ir/topics.ex | 10 ++--- lib/pleroma/object.ex | 45 +++++++++++++++++++--- lib/pleroma/web/activity_pub/mrf/simple_policy.ex | 8 ++-- lib/pleroma/web/activity_pub/transmogrifier.ex | 29 +++++++------- lib/pleroma/web/feed/feed_view.ex | 1 + lib/pleroma/web/mastodon_api/views/status_view.ex | 6 ++- .../web/templates/feed/feed/_activity.atom.eex | 2 +- .../web/templates/feed/feed/_activity.rss.eex | 2 +- .../web/templates/feed/feed/_tag_activity.atom.eex | 2 +- 9 files changed, 71 insertions(+), 34 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex index fe2e8cb5c..2c74ac2bf 100644 --- a/lib/pleroma/activity/ir/topics.ex +++ b/lib/pleroma/activity/ir/topics.ex @@ -48,14 +48,12 @@ defmodule Pleroma.Activity.Ir.Topics do tags end - defp hashtags_to_topics(%{data: %{"tag" => tags}}) do - tags - |> Enum.filter(&is_bitstring(&1)) - |> Enum.map(fn tag -> "hashtag:" <> tag end) + defp hashtags_to_topics(object) do + object + |> Object.hashtags() + |> Enum.map(fn hashtag -> "hashtag:" <> hashtag end) end - defp hashtags_to_topics(_), do: [] - defp remote_topics(%{local: true}), do: [] defp remote_topics(%{actor: actor}) when is_binary(actor), diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 052ad413b..2088c7656 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -47,17 +47,33 @@ defmodule Pleroma.Object do end def create(data) do - Object.change(%Object{}, %{data: data}) + %Object{} + |> Object.change(%{data: data}) |> Repo.insert() end def change(struct, params \\ %{}) do - struct - |> cast(params, [:data]) - |> validate_required([:data]) - |> unique_constraint(:ap_id, name: :objects_unique_apid_index) + changeset = + struct + |> cast(params, [:data]) + |> validate_required([:data]) + |> unique_constraint(:ap_id, name: :objects_unique_apid_index) + + if hashtags_changed?(struct, get_change(changeset, :data)) do + # TODO: modify assoc once it's introduced + changeset + else + changeset + end + end + + defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do + Enum.sort(embedded_hashtags(struct)) != + Enum.sort(object_data_hashtags(data)) end + defp hashtags_changed?(_, _), do: false + def get_by_id(nil), do: nil def get_by_id(id), do: Repo.get(Object, id) @@ -344,4 +360,23 @@ defmodule Pleroma.Object do def self_replies(object, opts \\ []), do: replies(object, Keyword.put(opts, :self_only, true)) + + def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags + + def tags(_), do: [] + + def hashtags(object), do: embedded_hashtags(object) + + defp embedded_hashtags(%Object{data: data}) do + object_data_hashtags(data) + end + + defp embedded_hashtags(_), do: [] + + defp object_data_hashtags(%{"tag" => tags}) when is_list(tags) do + # Note: AS2 map-type elements are ignored + Enum.filter(tags, &is_bitstring(&1)) + end + + defp object_data_hashtags(_), do: [] end diff --git a/lib/pleroma/web/activity_pub/mrf/simple_policy.ex b/lib/pleroma/web/activity_pub/mrf/simple_policy.ex index 6cd91826d..e92091d66 100644 --- a/lib/pleroma/web/activity_pub/mrf/simple_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/simple_policy.ex @@ -74,9 +74,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.SimplePolicy do object = if MRF.subdomain_match?(media_nsfw, actor_host) do - tags = (child_object["tag"] || []) ++ ["nsfw"] - child_object = Map.put(child_object, "tag", tags) - child_object = Map.put(child_object, "sensitive", true) + child_object = + child_object + |> Map.put("tag", (child_object["tag"] || []) ++ ["nsfw"]) + |> Map.put("sensitive", true) + Map.put(object, "object", child_object) else object diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 565d32433..fd17793d0 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -32,18 +32,18 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do """ def fix_object(object, options \\ []) do object - |> strip_internal_fields - |> fix_actor - |> fix_url - |> fix_attachments - |> fix_context + |> strip_internal_fields() + |> fix_actor() + |> fix_url() + |> fix_attachments() + |> fix_context() |> fix_in_reply_to(options) - |> fix_emoji - |> fix_tag - |> set_sensitive - |> fix_content_map - |> fix_addressing - |> fix_summary + |> fix_emoji() + |> fix_tag() + |> set_sensitive() + |> fix_content_map() + |> fix_addressing() + |> fix_summary() |> fix_type(options) end @@ -315,10 +315,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do tags = tag |> Enum.filter(fn data -> data["type"] == "Hashtag" and data["name"] end) - |> Enum.map(fn %{"name" => name} -> - name - |> String.slice(1..-1) - |> String.downcase() + |> Enum.map(fn + %{"name" => "#" <> hashtag} -> String.downcase(hashtag) + %{"name" => hashtag} -> String.downcase(hashtag) end) Map.put(object, "tag", tag ++ tags) diff --git a/lib/pleroma/web/feed/feed_view.ex b/lib/pleroma/web/feed/feed_view.ex index 30e0a2a55..1155c6a39 100644 --- a/lib/pleroma/web/feed/feed_view.ex +++ b/lib/pleroma/web/feed/feed_view.ex @@ -32,6 +32,7 @@ defmodule Pleroma.Web.Feed.FeedView do %{ activity: activity, + object: object, data: Map.get(object, :data), actor: actor } diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex index 2301e21cf..bd08aa203 100644 --- a/lib/pleroma/web/mastodon_api/views/status_view.ex +++ b/lib/pleroma/web/mastodon_api/views/status_view.ex @@ -201,8 +201,10 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do like_count = object.data["like_count"] || 0 announcement_count = object.data["announcement_count"] || 0 - tags = object.data["tag"] || [] - sensitive = object.data["sensitive"] || Enum.member?(tags, "nsfw") + hashtags = Object.hashtags(object) + sensitive = object.data["sensitive"] || Enum.member?(hashtags, "nsfw") + + tags = Object.tags(object) tag_mentions = tags diff --git a/lib/pleroma/web/templates/feed/feed/_activity.atom.eex b/lib/pleroma/web/templates/feed/feed/_activity.atom.eex index 3fd150c4e..6688830ba 100644 --- a/lib/pleroma/web/templates/feed/feed/_activity.atom.eex +++ b/lib/pleroma/web/templates/feed/feed/_activity.atom.eex @@ -22,7 +22,7 @@ <% end %> - <%= for tag <- @data["tag"] || [] do %> + <%= for tag <- Pleroma.Object.hashtags(@object) do %> <% end %> diff --git a/lib/pleroma/web/templates/feed/feed/_activity.rss.eex b/lib/pleroma/web/templates/feed/feed/_activity.rss.eex index 42960de7d..fc6d74b42 100644 --- a/lib/pleroma/web/templates/feed/feed/_activity.rss.eex +++ b/lib/pleroma/web/templates/feed/feed/_activity.rss.eex @@ -21,7 +21,7 @@ <%= @data["external_url"] %> <% end %> - <%= for tag <- @data["tag"] || [] do %> + <%= for tag <- Pleroma.Object.hashtags(@object) do %> <% end %> diff --git a/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex b/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex index cf5874a91..c2de28fe4 100644 --- a/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex +++ b/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex @@ -41,7 +41,7 @@ <% end %> <% end %> - <%= for tag <- @data["tag"] || [] do %> + <%= for tag <- Pleroma.Object.hashtags(@object) do %> <% end %> -- cgit v1.2.3 From e369b1306b2f8b9732c21333b9957f7e4e408f90 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 22 Dec 2020 22:04:33 +0300 Subject: Added Hashtag entity and objects-hashtags association with auto-sync with `data.tag` on Object update. --- lib/pleroma/hashtag.ex | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pleroma/object.ex | 37 +++++++++++++++++++++++--------- 2 files changed, 85 insertions(+), 10 deletions(-) create mode 100644 lib/pleroma/hashtag.ex (limited to 'lib') diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex new file mode 100644 index 000000000..b05927563 --- /dev/null +++ b/lib/pleroma/hashtag.ex @@ -0,0 +1,58 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Hashtag do + use Ecto.Schema + + import Ecto.Changeset + + alias Pleroma.Hashtag + alias Pleroma.Repo + + @derive {Jason.Encoder, only: [:data]} + + schema "hashtags" do + field(:name, :string) + field(:data, :map, default: %{}) + + many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete) + + timestamps() + end + + def get_by_name(name) do + Repo.get_by(Hashtag, name: name) + end + + def get_or_create_by_name(name) when is_bitstring(name) do + with %Hashtag{} = hashtag <- get_by_name(name) do + {:ok, hashtag} + else + _ -> + %Hashtag{} + |> changeset(%{name: name}) + |> Repo.insert() + end + end + + def get_or_create_by_names(names) when is_list(names) do + Enum.reduce_while(names, {:ok, []}, fn name, {:ok, list} -> + case get_or_create_by_name(name) do + {:ok, %Hashtag{} = hashtag} -> + {:cont, {:ok, list ++ [hashtag]}} + + error -> + {:halt, error} + end + end) + end + + def changeset(%Hashtag{} = struct, params) do + struct + |> cast(params, [:name, :data]) + |> update_change(:name, &String.downcase/1) + |> validate_required([:name]) + |> unique_constraint(:name) + end +end diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 2088c7656..357a3b504 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -10,6 +10,7 @@ defmodule Pleroma.Object do alias Pleroma.Activity alias Pleroma.Config + alias Pleroma.Hashtag alias Pleroma.Object alias Pleroma.Object.Fetcher alias Pleroma.ObjectTombstone @@ -26,6 +27,8 @@ defmodule Pleroma.Object do schema "objects" do field(:data, :map) + many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete) + timestamps() end @@ -53,17 +56,31 @@ defmodule Pleroma.Object do end def change(struct, params \\ %{}) do - changeset = - struct - |> cast(params, [:data]) - |> validate_required([:data]) - |> unique_constraint(:ap_id, name: :objects_unique_apid_index) - - if hashtags_changed?(struct, get_change(changeset, :data)) do - # TODO: modify assoc once it's introduced - changeset + struct + |> cast(params, [:data]) + |> validate_required([:data]) + |> unique_constraint(:ap_id, name: :objects_unique_apid_index) + |> maybe_handle_hashtags_change(struct) + end + + defp maybe_handle_hashtags_change(changeset, struct) do + with data_hashtags_change = get_change(changeset, :data), + true <- hashtags_changed?(struct, data_hashtags_change), + {:ok, hashtag_records} <- + data_hashtags_change + |> object_data_hashtags() + |> Hashtag.get_or_create_by_names() do + put_assoc(changeset, :hashtags, hashtag_records) else - changeset + false -> + changeset + + {:error, hashtag_changeset} -> + failed_hashtag = get_field(hashtag_changeset, :name) + + validate_change(changeset, :data, fn _, _ -> + [data: "error referencing hashtag: #{failed_hashtag}"] + end) end end -- cgit v1.2.3 From cbb19d0e1882f5ce641f30b51d7156336f81aba9 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 26 Dec 2020 22:20:55 +0300 Subject: [#3213] Hashtag-filtering functions in ActivityPub. Mix task for migrating hashtags to `hashtags` table. --- lib/mix/tasks/pleroma/database.ex | 64 ++++++++++ lib/pleroma/web/activity_pub/activity_pub.ex | 167 ++++++++++++++++++++------- 2 files changed, 190 insertions(+), 41 deletions(-) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 22151ce08..093c7dd30 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -4,14 +4,18 @@ defmodule Mix.Tasks.Pleroma.Database do alias Pleroma.Conversation + alias Pleroma.Hashtag alias Pleroma.Maintenance alias Pleroma.Object alias Pleroma.Repo alias Pleroma.User + require Logger require Pleroma.Constants + import Ecto.Query import Mix.Pleroma + use Mix.Task @shortdoc "A collection of database related tasks" @@ -128,6 +132,66 @@ defmodule Mix.Tasks.Pleroma.Database do |> Stream.run() end + def run(["transfer_hashtags"]) do + import Ecto.Query + + start_pleroma() + + from( + object in Object, + left_join: hashtag in assoc(object, :hashtags), + where: is_nil(hashtag.id), + where: fragment("(?)->>'tag' != '[]'", object.data), + select: %{ + id: object.id, + inserted_at: object.inserted_at, + tag: fragment("(?)->>'tag'", object.data) + }, + order_by: [desc: object.id] + ) + |> Pleroma.Repo.chunk_stream(100, :batches) + |> Stream.each(fn objects -> + chunk_start = List.first(objects) + chunk_end = List.last(objects) + + Logger.info( + "transfer_hashtags: " <> + "#{chunk_start.id} (#{chunk_start.inserted_at}) -- " <> + "#{chunk_end.id} (#{chunk_end.inserted_at})" + ) + + Enum.map( + objects, + fn object -> + hashtags = + object.tag + |> Jason.decode!() + |> Enum.filter(&is_bitstring(&1)) + + with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do + Repo.transaction(fn -> + for hashtag_record <- hashtag_records do + with {:error, _} <- + Ecto.Adapters.SQL.query( + Repo, + "insert into hashtags_objects(hashtag_id, object_id) values " <> + "(#{hashtag_record.id}, #{object.id});" + ) do + Logger.warn( + "ERROR: could not link object #{object.id} and hashtag #{hashtag_record.id}" + ) + end + end + end) + else + e -> Logger.warn("ERROR: could not process object #{object.id}: #{inspect(e)}") + end + end + ) + end) + |> Stream.run() + end + def run(["vacuum", args]) do start_pleroma() diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 1c91bc074..2e25412c6 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -660,33 +660,41 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_since(query, _), do: query defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do - raise "Can't use the child object without preloading!" + raise_on_missing_preload() end - defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do + defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end + defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do + restrict_tag_reject(query, %{tag_reject: [tag_reject]}) + end + defp restrict_tag_reject(query, _), do: query defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do - raise "Can't use the child object without preloading!" + raise_on_missing_preload() end - defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do + defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) ) end + defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do + restrict_tag(query, %{tag: tag}) + end + defp restrict_tag_all(query, _), do: query defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do - raise "Can't use the child object without preloading!" + raise_on_missing_preload() end defp restrict_tag(query, %{tag: tag}) when is_list(tag) do @@ -697,13 +705,79 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do + restrict_tag(query, %{tag: [tag]}) + end + + defp restrict_tag(query, _), do: query + + defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do + if has_named_binding?(query, :thread_mute) do + from( + [activity, object, thread_mute] in query, + group_by: [activity.id, object.id, thread_mute.id] + ) + else + from( + [activity, object] in query, + group_by: [activity.id, object.id] + ) + end + |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) + |> having( + [hashtag: hashtag], + fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject) + ) + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do + restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + end + + defp restrict_hashtag_reject_any(query, _), do: query + + defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do + Enum.reduce( + tags, + query, + fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end + ) + end + + defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do + restrict_hashtag_any(query, %{tag: tag}) + end + + defp restrict_hashtag_all(query, _), do: query + + defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do from( [_activity, object] in query, - where: fragment("(?)->'tag' \\? (?)", object.data, ^tag) + join: hashtag in assoc(object, :hashtags), + where: hashtag.name in ^tags ) end - defp restrict_tag(query, _), do: query + defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do + restrict_hashtag_any(query, %{tag: [tag]}) + end + + defp restrict_hashtag_any(query, _), do: query + + defp raise_on_missing_preload do + raise "Can't use the child object without preloading!" + end defp restrict_recipients(query, [], _user), do: query @@ -1088,40 +1162,51 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do skip_thread_containment: Config.get([:instance, :skip_thread_containment]) } - Activity - |> maybe_preload_objects(opts) - |> maybe_preload_bookmarks(opts) - |> maybe_preload_report_notes(opts) - |> maybe_set_thread_muted_field(opts) - |> maybe_order(opts) - |> restrict_recipients(recipients, opts[:user]) - |> restrict_replies(opts) - |> restrict_tag(opts) - |> restrict_tag_reject(opts) - |> restrict_tag_all(opts) - |> restrict_since(opts) - |> restrict_local(opts) - |> restrict_actor(opts) - |> restrict_type(opts) - |> restrict_state(opts) - |> restrict_favorited_by(opts) - |> restrict_blocked(restrict_blocked_opts) - |> restrict_muted(restrict_muted_opts) - |> restrict_filtered(opts) - |> restrict_media(opts) - |> restrict_visibility(opts) - |> restrict_thread_visibility(opts, config) - |> restrict_reblogs(opts) - |> restrict_pinned(opts) - |> restrict_muted_reblogs(restrict_muted_reblogs_opts) - |> restrict_instance(opts) - |> restrict_announce_object_actor(opts) - |> restrict_filtered(opts) - |> Activity.restrict_deactivated_users() - |> exclude_poll_votes(opts) - |> exclude_chat_messages(opts) - |> exclude_invisible_actors(opts) - |> exclude_visibility(opts) + query = + Activity + |> distinct([a], true) + |> maybe_preload_objects(opts) + |> maybe_preload_bookmarks(opts) + |> maybe_preload_report_notes(opts) + |> maybe_set_thread_muted_field(opts) + |> maybe_order(opts) + |> restrict_recipients(recipients, opts[:user]) + |> restrict_replies(opts) + |> restrict_since(opts) + |> restrict_local(opts) + |> restrict_actor(opts) + |> restrict_type(opts) + |> restrict_state(opts) + |> restrict_favorited_by(opts) + |> restrict_blocked(restrict_blocked_opts) + |> restrict_muted(restrict_muted_opts) + |> restrict_filtered(opts) + |> restrict_media(opts) + |> restrict_visibility(opts) + |> restrict_thread_visibility(opts, config) + |> restrict_reblogs(opts) + |> restrict_pinned(opts) + |> restrict_muted_reblogs(restrict_muted_reblogs_opts) + |> restrict_instance(opts) + |> restrict_announce_object_actor(opts) + |> restrict_filtered(opts) + |> Activity.restrict_deactivated_users() + |> exclude_poll_votes(opts) + |> exclude_chat_messages(opts) + |> exclude_invisible_actors(opts) + |> exclude_visibility(opts) + + if Config.get([:instance, :improved_hashtag_timeline]) do + query + |> restrict_hashtag_any(opts) + |> restrict_hashtag_all(opts) + |> restrict_hashtag_reject_any(opts) + else + query + |> restrict_tag(opts) + |> restrict_tag_reject(opts) + |> restrict_tag_all(opts) + end end def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do -- cgit v1.2.3 From 14fae94c0e4b04123c7af148260d0a4a51042570 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Mon, 28 Dec 2020 00:08:09 +0300 Subject: [#3213] Made Object.hashtags/1 work with :hashtags assoc. Adjusted tests. --- lib/pleroma/config.ex | 2 ++ lib/pleroma/object.ex | 14 +++++++++++++- lib/pleroma/web/activity_pub/activity_pub.ex | 12 ++++++------ 3 files changed, 21 insertions(+), 7 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/config.ex b/lib/pleroma/config.ex index 86d4f6b72..ee0167f4e 100644 --- a/lib/pleroma/config.ex +++ b/lib/pleroma/config.ex @@ -96,6 +96,8 @@ defmodule Pleroma.Config do end end + def object_embedded_hashtags?, do: !get([:instance, :improved_hashtag_timeline]) + def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], []) def oauth_consumer_enabled?, do: oauth_consumer_strategies() != [] diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 1d756bcd1..08114d4f2 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -384,7 +384,19 @@ defmodule Pleroma.Object do def tags(_), do: [] - def hashtags(object), do: embedded_hashtags(object) + def hashtags(%Object{} = object) do + cond do + Config.object_embedded_hashtags?() -> + embedded_hashtags(object) + + object.id == "pleroma:fake_object_id" -> + [] + + true -> + hashtag_records = Repo.preload(object, :hashtags).hashtags + Enum.map(hashtag_records, & &1.name) + end + end defp embedded_hashtags(%Object{data: data}) do object_data_hashtags(data) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 54d1a2350..626cad336 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -1199,16 +1199,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) - if Config.get([:instance, :improved_hashtag_timeline]) do - query - |> restrict_hashtag_any(opts) - |> restrict_hashtag_all(opts) - |> restrict_hashtag_reject_any(opts) - else + if Config.object_embedded_hashtags?() do query |> restrict_tag(opts) |> restrict_tag_reject(opts) |> restrict_tag_all(opts) + else + query + |> restrict_hashtag_any(opts) + |> restrict_hashtag_all(opts) + |> restrict_hashtag_reject_any(opts) end end -- cgit v1.2.3 From a25c1e8ec0b6f4ef2e9f68c4ad5e48e18f5f01a7 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Wed, 30 Dec 2020 14:35:19 +0300 Subject: [#3213] Improved `database.transfer_hashtags` mix task: proper rollback, speedup. --- lib/mix/tasks/pleroma/database.ex | 46 +++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 21 deletions(-) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 093c7dd30..d44bd3478 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -137,6 +137,8 @@ defmodule Mix.Tasks.Pleroma.Database do start_pleroma() + Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") + from( object in Object, left_join: hashtag in assoc(object, :hashtags), @@ -144,21 +146,12 @@ defmodule Mix.Tasks.Pleroma.Database do where: fragment("(?)->>'tag' != '[]'", object.data), select: %{ id: object.id, - inserted_at: object.inserted_at, tag: fragment("(?)->>'tag'", object.data) - }, - order_by: [desc: object.id] + } ) |> Pleroma.Repo.chunk_stream(100, :batches) |> Stream.each(fn objects -> - chunk_start = List.first(objects) - chunk_end = List.last(objects) - - Logger.info( - "transfer_hashtags: " <> - "#{chunk_start.id} (#{chunk_start.inserted_at}) -- " <> - "#{chunk_end.id} (#{chunk_end.inserted_at})" - ) + Logger.info("Processing #{length(objects)} objects...") Enum.map( objects, @@ -168,28 +161,39 @@ defmodule Mix.Tasks.Pleroma.Database do |> Jason.decode!() |> Enum.filter(&is_bitstring(&1)) - with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do - Repo.transaction(fn -> + Repo.transaction(fn -> + with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do for hashtag_record <- hashtag_records do - with {:error, _} <- + with {:ok, _} <- Ecto.Adapters.SQL.query( Repo, "insert into hashtags_objects(hashtag_id, object_id) values " <> "(#{hashtag_record.id}, #{object.id});" ) do - Logger.warn( - "ERROR: could not link object #{object.id} and hashtag #{hashtag_record.id}" - ) + :noop + else + {:error, e} -> + error = + "ERROR: could not link object #{object.id} and hashtag " <> + "#{hashtag_record.id}: #{inspect(e)}" + + Logger.error(error) + Repo.rollback(error) end end - end) - else - e -> Logger.warn("ERROR: could not process object #{object.id}: #{inspect(e)}") - end + else + e -> + error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" + Logger.error(error) + Repo.rollback(error) + end + end) end ) end) |> Stream.run() + + Logger.info("Done transferring hashtags. Please check logs to ensure no errors.") end def run(["vacuum", args]) do -- cgit v1.2.3 From e0b5edb6d5a423bfd247e0774d2f5bc642b2fb80 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Wed, 30 Dec 2020 14:42:35 +0300 Subject: [#3213] Fixed Object.object_data_hashtags/1 to process only AS2 elements of `data.tag` (basing on #2984). --- lib/pleroma/object.ex | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 08114d4f2..dad572f2b 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -405,8 +405,16 @@ defmodule Pleroma.Object do defp embedded_hashtags(_), do: [] defp object_data_hashtags(%{"tag" => tags}) when is_list(tags) do - # Note: AS2 map-type elements are ignored - Enum.filter(tags, &is_bitstring(&1)) + # Note: Old format with copy of hashtags as strings is ignored, using AS2 + tags + |> Enum.filter(fn + %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name") + _ -> false + end) + |> Enum.map(fn + %{"name" => "#" <> hashtag} -> String.downcase(hashtag) + %{"name" => hashtag} -> String.downcase(hashtag) + end) end defp object_data_hashtags(_), do: [] -- cgit v1.2.3 From 8d1a0c1afd46f8683e9022523cecffb9b60c9f8c Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Wed, 30 Dec 2020 15:22:49 +0300 Subject: [#3213] Made Object.object_data_hashtags/1 handle both AS2 and plain text hashtags. --- lib/pleroma/object.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index dad572f2b..7e79e15ee 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -405,16 +405,18 @@ defmodule Pleroma.Object do defp embedded_hashtags(_), do: [] defp object_data_hashtags(%{"tag" => tags}) when is_list(tags) do - # Note: Old format with copy of hashtags as strings is ignored, using AS2 tags |> Enum.filter(fn %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name") + plain_text when is_bitstring(plain_text) -> true _ -> false end) |> Enum.map(fn %{"name" => "#" <> hashtag} -> String.downcase(hashtag) %{"name" => hashtag} -> String.downcase(hashtag) + hashtag when is_bitstring(hashtag) -> String.downcase(hashtag) end) + |> Enum.uniq() end defp object_data_hashtags(_), do: [] -- cgit v1.2.3 From 367f0c31c3c15f75aed1d3ba66914e4197c19596 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 31 Dec 2020 09:36:26 +0300 Subject: [#3213] Added query options support for Repo.chunk_stream/4. Used infinite timeout in transfer_hashtags select query. --- lib/mix/tasks/pleroma/database.ex | 11 +++++------ lib/pleroma/repo.ex | 6 +++--- 2 files changed, 8 insertions(+), 9 deletions(-) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index d44bd3478..f903cf75b 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -149,9 +149,9 @@ defmodule Mix.Tasks.Pleroma.Database do tag: fragment("(?)->>'tag'", object.data) } ) - |> Pleroma.Repo.chunk_stream(100, :batches) + |> Repo.chunk_stream(100, :batches, timeout: :infinity) |> Stream.each(fn objects -> - Logger.info("Processing #{length(objects)} objects...") + Logger.info("Processing #{length(objects)} objects starting from id #{hd(objects).id}...") Enum.map( objects, @@ -165,10 +165,9 @@ defmodule Mix.Tasks.Pleroma.Database do with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do for hashtag_record <- hashtag_records do with {:ok, _} <- - Ecto.Adapters.SQL.query( - Repo, - "insert into hashtags_objects(hashtag_id, object_id) values " <> - "(#{hashtag_record.id}, #{object.id});" + Repo.query( + "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", + [hashtag_record.id, object.id] ) do :noop else diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex index 4524bd5e2..78711e6ac 100644 --- a/lib/pleroma/repo.ex +++ b/lib/pleroma/repo.ex @@ -63,8 +63,8 @@ defmodule Pleroma.Repo do iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches) """ @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t() - def chunk_stream(query, chunk_size, returns_as \\ :one) do - # We don't actually need start and end funcitons of resource streaming, + def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do + # We don't actually need start and end functions of resource streaming, # but it seems to be the only way to not fetch records one-by-one and # have individual records be the elements of the stream, instead of # lists of records @@ -76,7 +76,7 @@ defmodule Pleroma.Repo do |> order_by(asc: :id) |> where([r], r.id > ^last_id) |> limit(^chunk_size) - |> all() + |> all(query_options) |> case do [] -> {:halt, last_id} -- cgit v1.2.3 From 303055456f19152821ec5ec1df88d60c03f60905 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 31 Dec 2020 12:45:23 +0300 Subject: Alternative implementation of hashtag-filtering queries in ActivityPub. Fixed GROUP BY clause for aggregation on hashtags. --- lib/pleroma/activity.ex | 2 + lib/pleroma/web/activity_pub/activity_pub.ex | 120 ++++++++++++++++++++++----- 2 files changed, 100 insertions(+), 22 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex index 9d970a808..df216e4de 100644 --- a/lib/pleroma/activity.ex +++ b/lib/pleroma/activity.ex @@ -113,6 +113,7 @@ defmodule Pleroma.Activity do from([a] in query, left_join: b in Bookmark, on: b.user_id == ^user.id and b.activity_id == a.id, + as: :bookmark, preload: [bookmark: b] ) end @@ -123,6 +124,7 @@ defmodule Pleroma.Activity do from([a] in query, left_join: r in ReportNote, on: a.id == r.activity_id, + as: :report_note, preload: [report_notes: r] ) end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 626cad336..339843330 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -713,22 +713,92 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_tag(query, _), do: query + defp restrict_hashtag(query, opts) do + [tag_any, tag_all, tag_reject] = + [:tag, :tag_all, :tag_reject] + |> Enum.map(&opts[&1]) + |> Enum.map(&List.wrap(&1)) + + has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) + + cond do + !has_conditions -> + query + + opts[:skip_preload] -> + raise_on_missing_preload() + + true -> + query + |> group_by_all_bindings() + |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) + |> maybe_restrict_hashtag_any(tag_any) + |> maybe_restrict_hashtag_all(tag_all) + |> maybe_restrict_hashtag_reject_any(tag_reject) + end + end + + # Groups by all bindings to allow aggregation on hashtags + defp group_by_all_bindings(query) do + # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note + cond do + Enum.count(query.aliases) == 4 -> + from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id]) + + Enum.count(query.aliases) == 3 -> + from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id]) + + Enum.count(query.aliases) == 2 -> + from([a, o, b3] in query, group_by: [a.id, o.id, b3.id]) + + true -> + from([a, o] in query, group_by: [a.id, o.id]) + end + end + + defp maybe_restrict_hashtag_any(query, []) do + query + end + + defp maybe_restrict_hashtag_any(query, tags) do + having( + query, + [hashtag: hashtag], + fragment("array_agg(?) && (?)", hashtag.name, ^tags) + ) + end + + defp maybe_restrict_hashtag_all(query, []) do + query + end + + defp maybe_restrict_hashtag_all(query, tags) do + having( + query, + [hashtag: hashtag], + fragment("array_agg(?) @> (?)", hashtag.name, ^tags) + ) + end + + defp maybe_restrict_hashtag_reject_any(query, []) do + query + end + + defp maybe_restrict_hashtag_reject_any(query, tags) do + having( + query, + [hashtag: hashtag], + fragment("not(array_agg(?) && (?))", hashtag.name, ^tags) + ) + end + defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do raise_on_missing_preload() end defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do - if has_named_binding?(query, :thread_mute) do - from( - [activity, object, thread_mute] in query, - group_by: [activity.id, object.id, thread_mute.id] - ) - else - from( - [activity, object] in query, - group_by: [activity.id, object.id] - ) - end + query + |> group_by_all_bindings() |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) |> having( [hashtag: hashtag], @@ -1167,7 +1237,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do query = Activity - |> distinct([a], true) |> maybe_preload_objects(opts) |> maybe_preload_bookmarks(opts) |> maybe_preload_report_notes(opts) @@ -1199,16 +1268,23 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) - if Config.object_embedded_hashtags?() do - query - |> restrict_tag(opts) - |> restrict_tag_reject(opts) - |> restrict_tag_all(opts) - else - query - |> restrict_hashtag_any(opts) - |> restrict_hashtag_all(opts) - |> restrict_hashtag_reject_any(opts) + cond do + Config.object_embedded_hashtags?() -> + query + |> restrict_tag(opts) + |> restrict_tag_reject(opts) + |> restrict_tag_all(opts) + + # TODO: benchmark (initial approach preferring non-aggregate ops when possible) + Config.get([:instance, :improved_hashtag_timeline]) == :join -> + query + |> distinct([activity], true) + |> restrict_hashtag_any(opts) + |> restrict_hashtag_all(opts) + |> restrict_hashtag_reject_any(opts) + + true -> + restrict_hashtag(query, opts) end end -- cgit v1.2.3 From 0d521022fe6157ce9a346c6915ce38292e653bb3 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 7 Jan 2021 12:20:29 +0300 Subject: [#3213] Removed PK from hashtags_objects table. Improved hashtags_transfer mix task (logging of failed ids). --- lib/mix/tasks/pleroma/database.ex | 29 +++++++++++++++++------------ lib/pleroma/object.ex | 4 ++-- 2 files changed, 19 insertions(+), 14 deletions(-) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index f903cf75b..918752dc2 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -139,6 +139,7 @@ defmodule Mix.Tasks.Pleroma.Database do Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) from( object in Object, left_join: hashtag in assoc(object, :hashtags), @@ -153,13 +154,10 @@ defmodule Mix.Tasks.Pleroma.Database do |> Stream.each(fn objects -> Logger.info("Processing #{length(objects)} objects starting from id #{hd(objects).id}...") - Enum.map( - objects, - fn object -> - hashtags = - object.tag - |> Jason.decode!() - |> Enum.filter(&is_bitstring(&1)) + failed_ids = + objects + |> Enum.map(fn object -> + hashtags = Object.object_data_hashtags(%{"tag" => Jason.decode!(object.tag)}) Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do @@ -169,7 +167,7 @@ defmodule Mix.Tasks.Pleroma.Database do "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", [hashtag_record.id, object.id] ) do - :noop + nil else {:error, e} -> error = @@ -177,18 +175,25 @@ defmodule Mix.Tasks.Pleroma.Database do "#{hashtag_record.id}: #{inspect(e)}" Logger.error(error) - Repo.rollback(error) + Repo.rollback(object.id) end end + + object.id else e -> error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" Logger.error(error) - Repo.rollback(error) + Repo.rollback(object.id) end end) - end - ) + end) + |> Enum.filter(&(elem(&1, 0) == :error)) + |> Enum.map(&elem(&1, 1)) + + if Enum.any?(failed_ids) do + Logger.error("ERROR: transfer_hashtags iteration failed for ids: #{inspect(failed_ids)}") + end end) |> Stream.run() diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 7e79e15ee..61f2ffa19 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -404,7 +404,7 @@ defmodule Pleroma.Object do defp embedded_hashtags(_), do: [] - defp object_data_hashtags(%{"tag" => tags}) when is_list(tags) do + def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do tags |> Enum.filter(fn %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name") @@ -419,5 +419,5 @@ defmodule Pleroma.Object do |> Enum.uniq() end - defp object_data_hashtags(_), do: [] + def object_data_hashtags(_), do: [] end -- cgit v1.2.3 From 8c972de0457199098c5f3378313d08a9dd2d64ce Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 10 Jan 2021 11:44:39 +0300 Subject: [#3213] transfer_hashtags mix task refactoring. --- lib/mix/tasks/pleroma/database.ex | 127 ++++++++++++++++++-------------------- 1 file changed, 59 insertions(+), 68 deletions(-) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 918752dc2..e9686fc1b 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -132,74 +132,6 @@ defmodule Mix.Tasks.Pleroma.Database do |> Stream.run() end - def run(["transfer_hashtags"]) do - import Ecto.Query - - start_pleroma() - - Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") - - # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - from( - object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: is_nil(hashtag.id), - where: fragment("(?)->>'tag' != '[]'", object.data), - select: %{ - id: object.id, - tag: fragment("(?)->>'tag'", object.data) - } - ) - |> Repo.chunk_stream(100, :batches, timeout: :infinity) - |> Stream.each(fn objects -> - Logger.info("Processing #{length(objects)} objects starting from id #{hd(objects).id}...") - - failed_ids = - objects - |> Enum.map(fn object -> - hashtags = Object.object_data_hashtags(%{"tag" => Jason.decode!(object.tag)}) - - Repo.transaction(fn -> - with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do - for hashtag_record <- hashtag_records do - with {:ok, _} <- - Repo.query( - "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", - [hashtag_record.id, object.id] - ) do - nil - else - {:error, e} -> - error = - "ERROR: could not link object #{object.id} and hashtag " <> - "#{hashtag_record.id}: #{inspect(e)}" - - Logger.error(error) - Repo.rollback(object.id) - end - end - - object.id - else - e -> - error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" - Logger.error(error) - Repo.rollback(object.id) - end - end) - end) - |> Enum.filter(&(elem(&1, 0) == :error)) - |> Enum.map(&elem(&1, 1)) - - if Enum.any?(failed_ids) do - Logger.error("ERROR: transfer_hashtags iteration failed for ids: #{inspect(failed_ids)}") - end - end) - |> Stream.run() - - Logger.info("Done transferring hashtags. Please check logs to ensure no errors.") - end - def run(["vacuum", args]) do start_pleroma() @@ -239,4 +171,63 @@ defmodule Mix.Tasks.Pleroma.Database do end) |> Stream.run() end + + def run(["transfer_hashtags"]) do + import Ecto.Query + + start_pleroma() + + Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") + + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + from( + object in Object, + left_join: hashtag in assoc(object, :hashtags), + where: is_nil(hashtag.id), + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + |> Repo.chunk_stream(100, :one, timeout: :infinity) + |> Stream.each(&transfer_object_hashtags(&1)) + |> Stream.run() + + Logger.info("Done transferring hashtags. Please check logs to ensure no errors.") + end + + defp transfer_object_hashtags(object) do + hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) + + Repo.transaction(fn -> + with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do + for hashtag_record <- hashtag_records do + with {:ok, _} <- + Repo.query( + "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", + [hashtag_record.id, object.id] + ) do + nil + else + {:error, e} -> + error = + "ERROR: could not link object #{object.id} and hashtag " <> + "#{hashtag_record.id}: #{inspect(e)}" + + Logger.error(error) + Repo.rollback(object.id) + end + end + + object.id + else + e -> + error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" + Logger.error(error) + Repo.rollback(object.id) + end + end) + end end -- cgit v1.2.3 From 3e4d84729a4ca8d9779d439a9aa2c8c23b3acd1d Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Wed, 13 Jan 2021 22:07:38 +0300 Subject: [#3213] Prototype of data migrations functionality / HashtagsTableMigrator. --- lib/mix/tasks/pleroma/database.ex | 60 ------- lib/pleroma/application.ex | 3 +- lib/pleroma/config.ex | 4 +- lib/pleroma/data_migration.ex | 46 +++++ lib/pleroma/delivery.ex | 1 - lib/pleroma/ecto_enums.ex | 8 + lib/pleroma/migrators/hashtags_table_migrator.ex | 211 +++++++++++++++++++++++ lib/pleroma/web/activity_pub/activity_pub.ex | 2 +- 8 files changed, 271 insertions(+), 64 deletions(-) create mode 100644 lib/pleroma/data_migration.ex create mode 100644 lib/pleroma/migrators/hashtags_table_migrator.ex (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index e9686fc1b..08ede9eef 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -4,7 +4,6 @@ defmodule Mix.Tasks.Pleroma.Database do alias Pleroma.Conversation - alias Pleroma.Hashtag alias Pleroma.Maintenance alias Pleroma.Object alias Pleroma.Repo @@ -171,63 +170,4 @@ defmodule Mix.Tasks.Pleroma.Database do end) |> Stream.run() end - - def run(["transfer_hashtags"]) do - import Ecto.Query - - start_pleroma() - - Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") - - # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - from( - object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: is_nil(hashtag.id), - where: - fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), - select: %{ - id: object.id, - tag: fragment("(?)->'tag'", object.data) - } - ) - |> Repo.chunk_stream(100, :one, timeout: :infinity) - |> Stream.each(&transfer_object_hashtags(&1)) - |> Stream.run() - - Logger.info("Done transferring hashtags. Please check logs to ensure no errors.") - end - - defp transfer_object_hashtags(object) do - hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) - - Repo.transaction(fn -> - with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do - for hashtag_record <- hashtag_records do - with {:ok, _} <- - Repo.query( - "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", - [hashtag_record.id, object.id] - ) do - nil - else - {:error, e} -> - error = - "ERROR: could not link object #{object.id} and hashtag " <> - "#{hashtag_record.id}: #{inspect(e)}" - - Logger.error(error) - Repo.rollback(object.id) - end - end - - object.id - else - e -> - error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" - Logger.error(error) - Repo.rollback(object.id) - end - end) - end end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index bd568d858..962529dfd 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -104,7 +104,8 @@ defmodule Pleroma.Application do chat_child(chat_enabled?()) ++ [ Pleroma.Web.Endpoint, - Pleroma.Gopher.Server + Pleroma.Gopher.Server, + Pleroma.Migrators.HashtagsTableMigrator ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/pleroma/config.ex b/lib/pleroma/config.ex index ee0167f4e..dbfb114d6 100644 --- a/lib/pleroma/config.ex +++ b/lib/pleroma/config.ex @@ -96,7 +96,9 @@ defmodule Pleroma.Config do end end - def object_embedded_hashtags?, do: !get([:instance, :improved_hashtag_timeline]) + def improved_hashtag_timeline_path, do: [:instance, :improved_hashtag_timeline] + def improved_hashtag_timeline, do: get(improved_hashtag_timeline_path()) + def object_embedded_hashtags?, do: !improved_hashtag_timeline() def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], []) diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex new file mode 100644 index 000000000..64fa155ff --- /dev/null +++ b/lib/pleroma/data_migration.ex @@ -0,0 +1,46 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.DataMigration do + use Ecto.Schema + + alias Pleroma.DataMigration + alias Pleroma.DataMigration.State + alias Pleroma.Repo + + import Ecto.Changeset + + schema "data_migrations" do + field(:name, :string) + field(:state, State, default: :pending) + field(:feature_lock, :boolean, default: false) + field(:params, :map, default: %{}) + field(:data, :map, default: %{}) + + timestamps() + end + + def changeset(data_migration, params \\ %{}) do + data_migration + |> cast(params, [:name, :state, :feature_lock, :params, :data]) + |> validate_required([:name]) + |> unique_constraint(:name) + end + + def update(data_migration, params \\ %{}) do + data_migration + |> changeset(params) + |> Repo.update() + end + + def update_state(data_migration, new_state) do + update(data_migration, %{state: new_state}) + end + + def get_by_name(name) do + Repo.get_by(DataMigration, name: name) + end + + def populate_hashtags_table, do: get_by_name("populate_hashtags_table") +end diff --git a/lib/pleroma/delivery.ex b/lib/pleroma/delivery.ex index 0ded2855c..baf79dda7 100644 --- a/lib/pleroma/delivery.ex +++ b/lib/pleroma/delivery.ex @@ -9,7 +9,6 @@ defmodule Pleroma.Delivery do alias Pleroma.Object alias Pleroma.Repo alias Pleroma.User - alias Pleroma.User import Ecto.Changeset import Ecto.Query diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex index 6fc47620c..f0ae658a4 100644 --- a/lib/pleroma/ecto_enums.ex +++ b/lib/pleroma/ecto_enums.ex @@ -17,3 +17,11 @@ defenum(Pleroma.FollowingRelationship.State, follow_accept: 2, follow_reject: 3 ) + +defenum(Pleroma.DataMigration.State, + pending: 1, + running: 2, + complete: 3, + failed: 4, + manual: 5 +) diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex new file mode 100644 index 000000000..a7e3de542 --- /dev/null +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -0,0 +1,211 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.HashtagsTableMigrator do + defmodule State do + use Agent + + @init_state %{} + + def start_link(_) do + Agent.start_link(fn -> @init_state end, name: __MODULE__) + end + + def get do + Agent.get(__MODULE__, & &1) + end + + def put(key, value) do + Agent.update(__MODULE__, fn state -> + Map.put(state, key, value) + end) + end + + def increment(key, increment \\ 1) do + Agent.update(__MODULE__, fn state -> + updated_value = (state[key] || 0) + increment + Map.put(state, key, updated_value) + end) + end + end + + use GenServer + + require Logger + + import Ecto.Query + + alias Pleroma.Config + alias Pleroma.DataMigration + alias Pleroma.Hashtag + alias Pleroma.Object + alias Pleroma.Repo + + defdelegate state(), to: State, as: :get + defdelegate put_state(key, value), to: State, as: :put + defdelegate increment_state(key, increment), to: State, as: :increment + + defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @impl true + def init(_) do + {:ok, nil, {:continue, :init_state}} + end + + @impl true + def handle_continue(:init_state, _state) do + {:ok, _} = State.start_link(nil) + + put_state(:status, :init) + + dm = data_migration() + + cond do + Config.get(:env) == :test -> + put_state(:status, :noop) + + is_nil(dm) -> + put_state(:status, :halt) + put_state(:message, "Data migration does not exist.") + + dm.state == :manual -> + put_state(:status, :noop) + put_state(:message, "Data migration is in manual execution state.") + + dm.state == :complete -> + handle_success() + + true -> + send(self(), :migrate_hashtags) + end + + {:noreply, nil} + end + + @impl true + def handle_info(:migrate_hashtags, state) do + data_migration = data_migration() + + {:ok, data_migration} = DataMigration.update_state(data_migration, :running) + put_state(:status, :running) + + Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") + + max_processed_id = data_migration.data["max_processed_id"] || 0 + + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + from( + object in Object, + left_join: hashtag in assoc(object, :hashtags), + where: object.id > ^max_processed_id, + where: is_nil(hashtag.id), + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + |> Repo.chunk_stream(100, :batches, timeout: :infinity) + |> Stream.each(fn objects -> + object_ids = Enum.map(objects, & &1.id) + + failed_ids = + objects + |> Enum.map(&transfer_object_hashtags(&1)) + |> Enum.filter(&(elem(&1, 0) == :error)) + |> Enum.map(&elem(&1, 1)) + + for failed_id <- failed_ids do + _ = + Repo.query( + "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> + "VALUES ($1, $2) ON CONFLICT DO NOTHING;", + [data_migration.id, failed_id] + ) + end + + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)", + [object_ids -- failed_ids] + ) + + max_object_id = Enum.at(object_ids, -1) + _ = DataMigration.update(data_migration, %{data: %{"max_processed_id" => max_object_id}}) + + increment_state(:processed_count, length(object_ids)) + increment_state(:failed_count, length(failed_ids)) + + # A quick and dirty approach to controlling the load this background migration imposes + sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) + Process.sleep(sleep_interval) + end) + |> Stream.run() + + with {:ok, %{rows: [[0]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration.id] + ) do + put_state(:status, :complete) + _ = DataMigration.update_state(data_migration, :complete) + + handle_success() + else + _ -> + put_state(:status, :failed) + put_state(:message, "Please check data_migration_failed_ids records.") + end + + {:noreply, state} + end + + defp transfer_object_hashtags(object) do + hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) + + Repo.transaction(fn -> + with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do + for hashtag_record <- hashtag_records do + with {:ok, _} <- + Repo.query( + "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", + [hashtag_record.id, object.id] + ) do + nil + else + {:error, e} -> + error = + "ERROR: could not link object #{object.id} and hashtag " <> + "#{hashtag_record.id}: #{inspect(e)}" + + Logger.error(error) + Repo.rollback(object.id) + end + end + + object.id + else + e -> + error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" + Logger.error(error) + Repo.rollback(object.id) + end + end) + end + + defp handle_success do + put_state(:status, :complete) + + unless Config.improved_hashtag_timeline() do + Config.put(Config.improved_hashtag_timeline_path(), true) + end + + :ok + end +end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 339843330..6131ae85b 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -1276,7 +1276,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> restrict_tag_all(opts) # TODO: benchmark (initial approach preferring non-aggregate ops when possible) - Config.get([:instance, :improved_hashtag_timeline]) == :join -> + Config.improved_hashtag_timeline() == :join -> query |> distinct([activity], true) |> restrict_hashtag_any(opts) -- cgit v1.2.3 From f5f267fa764f53ef617bc9504c7ecb68b5d3d7ab Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 14 Jan 2021 22:41:27 +0300 Subject: [#3213] Refactoring of HashtagsTableMigrator. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 98 ++++++++++++---------- .../migrators/hashtags_table_migrator/state.ex | 26 ++++++ 2 files changed, 78 insertions(+), 46 deletions(-) create mode 100644 lib/pleroma/migrators/hashtags_table_migrator/state.ex (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index a7e3de542..9f1a00f9c 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -3,39 +3,13 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Migrators.HashtagsTableMigrator do - defmodule State do - use Agent - - @init_state %{} - - def start_link(_) do - Agent.start_link(fn -> @init_state end, name: __MODULE__) - end - - def get do - Agent.get(__MODULE__, & &1) - end - - def put(key, value) do - Agent.update(__MODULE__, fn state -> - Map.put(state, key, value) - end) - end - - def increment(key, increment \\ 1) do - Agent.update(__MODULE__, fn state -> - updated_value = (state[key] || 0) + increment - Map.put(state, key, updated_value) - end) - end - end - use GenServer require Logger import Ecto.Query + alias __MODULE__.State alias Pleroma.Config alias Pleroma.DataMigration alias Pleroma.Hashtag @@ -43,13 +17,23 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do alias Pleroma.Repo defdelegate state(), to: State, as: :get - defdelegate put_state(key, value), to: State, as: :put - defdelegate increment_state(key, increment), to: State, as: :increment + defdelegate put_stat(key, value), to: State, as: :put + defdelegate increment_stat(key, increment), to: State, as: :increment defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + @reg_name {:global, __MODULE__} + + def whereis, do: GenServer.whereis(@reg_name) + def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) + case whereis() do + nil -> + GenServer.start_link(__MODULE__, nil, name: @reg_name) + + pid -> + {:ok, pid} + end end @impl true @@ -61,21 +45,22 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - put_state(:status, :init) + put_stat(:status, :init) dm = data_migration() + manual_migrations = Config.get([:instance, :manual_data_migrations], []) cond do Config.get(:env) == :test -> - put_state(:status, :noop) + put_stat(:status, :noop) is_nil(dm) -> - put_state(:status, :halt) - put_state(:message, "Data migration does not exist.") + put_stat(:status, :halt) + put_stat(:message, "Data migration does not exist.") - dm.state == :manual -> - put_state(:status, :noop) - put_state(:message, "Data migration is in manual execution state.") + dm.state == :manual or dm.name in manual_migrations -> + put_stat(:status, :noop) + put_stat(:message, "Data migration is in manual execution state.") dm.state == :complete -> handle_success() @@ -91,8 +76,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_info(:migrate_hashtags, state) do data_migration = data_migration() - {:ok, data_migration} = DataMigration.update_state(data_migration, :running) - put_state(:status, :running) + persistent_data = Map.take(data_migration.data, ["max_processed_id"]) + + {:ok, data_migration} = + DataMigration.update(data_migration, %{state: :running, data: persistent_data}) + + put_stat(:status, :running) Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") @@ -137,10 +126,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do ) max_object_id = Enum.at(object_ids, -1) - _ = DataMigration.update(data_migration, %{data: %{"max_processed_id" => max_object_id}}) - increment_state(:processed_count, length(object_ids)) - increment_state(:failed_count, length(failed_ids)) + put_stat(:max_processed_id, max_object_id) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + + persist_stats(data_migration) # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) @@ -153,14 +144,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", [data_migration.id] ) do - put_state(:status, :complete) _ = DataMigration.update_state(data_migration, :complete) handle_success() else _ -> - put_state(:status, :failed) - put_state(:message, "Please check data_migration_failed_ids records.") + _ = DataMigration.update_state(data_migration, :failed) + + put_stat(:status, :failed) + put_stat(:message, "Please check data_migration_failed_ids records.") end {:noreply, state} @@ -199,8 +191,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end + defp persist_stats(data_migration) do + runner_state = Map.drop(state(), [:status]) + _ = DataMigration.update(data_migration, %{data: runner_state}) + end + defp handle_success do - put_state(:status, :complete) + put_stat(:status, :complete) unless Config.improved_hashtag_timeline() do Config.put(Config.improved_hashtag_timeline_path(), true) @@ -208,4 +205,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do :ok end + + def force_continue do + send(whereis(), :migrate_hashtags) + end + + def force_restart do + {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) + force_continue() + end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex new file mode 100644 index 000000000..79926892c --- /dev/null +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -0,0 +1,26 @@ +defmodule Pleroma.Migrators.HashtagsTableMigrator.State do + use Agent + + @init_state %{} + + def start_link(_) do + Agent.start_link(fn -> @init_state end, name: __MODULE__) + end + + def get do + Agent.get(__MODULE__, & &1) + end + + def put(key, value) do + Agent.update(__MODULE__, fn state -> + Map.put(state, key, value) + end) + end + + def increment(key, increment \\ 1) do + Agent.update(__MODULE__, fn state -> + updated_value = (state[key] || 0) + increment + Map.put(state, key, updated_value) + end) + end +end -- cgit v1.2.3 From 48b399cedb7d46ea0f08181cfbe4df222861f65b Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 16 Jan 2021 20:22:14 +0300 Subject: [#3213] Refactoring of HashtagsTableMigrator. Hashtag timeline performance optimization (auto switch to non-aggregate join strategy when efficient). --- lib/pleroma/migrators/hashtags_table_migrator.ex | 47 +++++++++------ .../migrators/hashtags_table_migrator/state.ex | 9 +-- lib/pleroma/web/activity_pub/activity_pub.ex | 68 +++++++++++++--------- 3 files changed, 75 insertions(+), 49 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 9f1a00f9c..b40578d50 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -45,25 +45,23 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - put_stat(:status, :init) + update_status(:init) - dm = data_migration() + data_migration = data_migration() manual_migrations = Config.get([:instance, :manual_data_migrations], []) cond do Config.get(:env) == :test -> - put_stat(:status, :noop) + update_status(:noop) - is_nil(dm) -> - put_stat(:status, :halt) - put_stat(:message, "Data migration does not exist.") + is_nil(data_migration) -> + update_status(:halt, "Data migration does not exist.") - dm.state == :manual or dm.name in manual_migrations -> - put_stat(:status, :noop) - put_stat(:message, "Data migration is in manual execution state.") + data_migration.state == :manual or data_migration.name in manual_migrations -> + update_status(:noop, "Data migration is in manual execution state.") - dm.state == :complete -> - handle_success() + data_migration.state == :complete -> + handle_success(data_migration) true -> send(self(), :migrate_hashtags) @@ -81,7 +79,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do {:ok, data_migration} = DataMigration.update(data_migration, %{state: :running, data: persistent_data}) - put_stat(:status, :running) + update_status(:running) Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") @@ -146,13 +144,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do ) do _ = DataMigration.update_state(data_migration, :complete) - handle_success() + handle_success(data_migration) else _ -> _ = DataMigration.update_state(data_migration, :failed) - put_stat(:status, :failed) - put_stat(:message, "Please check data_migration_failed_ids records.") + update_status(:failed, "Please check data_migration_failed_ids records.") end {:noreply, state} @@ -196,16 +193,25 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do _ = DataMigration.update(data_migration, %{data: runner_state}) end - defp handle_success do - put_stat(:status, :complete) + defp handle_success(data_migration) do + update_status(:complete) - unless Config.improved_hashtag_timeline() do + unless data_migration.feature_lock || Config.improved_hashtag_timeline() do Config.put(Config.improved_hashtag_timeline_path(), true) end :ok end + def failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) + |> order_by([o], asc: o.id) + end + def force_continue do send(whereis(), :migrate_hashtags) end @@ -214,4 +220,9 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) force_continue() end + + defp update_status(status, message \\ nil) do + put_stat(:status, status) + put_stat(:message, message) + end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index 79926892c..c1a2709fc 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -2,23 +2,24 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do use Agent @init_state %{} + @reg_name {:global, __MODULE__} def start_link(_) do - Agent.start_link(fn -> @init_state end, name: __MODULE__) + Agent.start_link(fn -> @init_state end, name: @reg_name) end def get do - Agent.get(__MODULE__, & &1) + Agent.get(@reg_name, & &1) end def put(key, value) do - Agent.update(__MODULE__, fn state -> + Agent.update(@reg_name, fn state -> Map.put(state, key, value) end) end def increment(key, increment \\ 1) do - Agent.update(__MODULE__, fn state -> + Agent.update(@reg_name, fn state -> updated_value = (state[key] || 0) + increment Map.put(state, key, updated_value) end) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index f5563b0fd..0609827ec 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -669,63 +669,66 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_since(query, _), do: query - defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do + defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end - defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do - restrict_tag_reject(query, %{tag_reject: [tag_reject]}) + defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) + when is_binary(tag_reject) do + restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]}) end - defp restrict_tag_reject(query, _), do: query + defp restrict_embedded_tag_reject(query, _), do: query - defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do + defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do + defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) ) end - defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do - restrict_tag(query, %{tag: tag}) + defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do + restrict_embedded_tag(query, %{tag: tag}) end - defp restrict_tag_all(query, _), do: query + defp restrict_embedded_tag_all(query, _), do: query - defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do + defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_tag(query, %{tag: tag}) when is_list(tag) do + defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) ) end - defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do - restrict_tag(query, %{tag: [tag]}) + defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do + restrict_embedded_tag(query, %{tag: [tag]}) end - defp restrict_tag(query, _), do: query + defp restrict_embedded_tag(query, _), do: query - defp restrict_hashtag(query, opts) do - [tag_any, tag_all, tag_reject] = - [:tag, :tag_all, :tag_reject] - |> Enum.map(&opts[&1]) - |> Enum.map(&List.wrap(&1)) + defp hashtag_conditions(opts) do + [:tag, :tag_all, :tag_reject] + |> Enum.map(&opts[&1]) + |> Enum.map(&List.wrap(&1)) + end + defp restrict_hashtag_agg(query, opts) do + [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) cond do @@ -1275,15 +1278,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) + hashtag_timeline_strategy = Config.improved_hashtag_timeline() + cond do - Config.object_embedded_hashtags?() -> + !hashtag_timeline_strategy -> query - |> restrict_tag(opts) - |> restrict_tag_reject(opts) - |> restrict_tag_all(opts) + |> restrict_embedded_tag(opts) + |> restrict_embedded_tag_reject(opts) + |> restrict_embedded_tag_all(opts) + + hashtag_timeline_strategy == :prefer_aggregation -> + restrict_hashtag_agg(query, opts) - # TODO: benchmark (initial approach preferring non-aggregate ops when possible) - Config.improved_hashtag_timeline() == :join -> + hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) -> query |> distinct([activity], true) |> restrict_hashtag_any(opts) @@ -1291,10 +1298,17 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> restrict_hashtag_reject_any(opts) true -> - restrict_hashtag(query, opts) + restrict_hashtag_agg(query, opts) end end + defp avoid_hashtags_aggregation?(opts) do + [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) + + joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0 + Enum.empty?(tag_reject) and joins_count <= 2 + end + def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do list_memberships = Pleroma.List.memberships(opts[:user]) -- cgit v1.2.3 From 85f7ef4d13adea9d64d279d1395d17c6ebc20678 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 17 Jan 2021 10:57:06 +0300 Subject: [#3213] Feature lock adjustment for HashtagsTableMigrator. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index b40578d50..47de5e134 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -196,11 +196,17 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defp handle_success(data_migration) do update_status(:complete) - unless data_migration.feature_lock || Config.improved_hashtag_timeline() do - Config.put(Config.improved_hashtag_timeline_path(), true) - end + cond do + data_migration.feature_lock -> + :noop + + not is_nil(Config.improved_hashtag_timeline()) -> + :noop - :ok + true -> + Config.put(Config.improved_hashtag_timeline_path(), true) + :ok + end end def failed_objects_query do -- cgit v1.2.3 From 9d28a7ebfbc7bd8fb893cf1e2ad555ed71f4c812 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 17 Jan 2021 21:58:15 +0300 Subject: [#3213] Missing copyright header for HashtagsTableMigrator.State. --- lib/pleroma/migrators/hashtags_table_migrator/state.ex | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index c1a2709fc..43f7270e2 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -1,3 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.Migrators.HashtagsTableMigrator.State do use Agent -- cgit v1.2.3 From 7f07909a7b56eb368b3f8aab4752def1551c12fe Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 19 Jan 2021 21:13:32 +0300 Subject: [#3213] Added `HashtagsTableMigrator.count/1`. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 42 ++++++++++++++++-------- 1 file changed, 29 insertions(+), 13 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 47de5e134..048f3c8ee 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -85,19 +85,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do max_processed_id = data_migration.data["max_processed_id"] || 0 - # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - from( - object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: object.id > ^max_processed_id, - where: is_nil(hashtag.id), - where: - fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), - select: %{ - id: object.id, - tag: fragment("(?)->'tag'", object.data) - } - ) + query() + |> where([object], object.id > ^max_processed_id) |> Repo.chunk_stream(100, :batches, timeout: :infinity) |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) @@ -155,6 +144,21 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do {:noreply, state} end + defp query do + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + from( + object in Object, + left_join: hashtag in assoc(object, :hashtags), + where: is_nil(hashtag.id), + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + end + defp transfer_object_hashtags(object) do hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) @@ -188,6 +192,18 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end + def count(force \\ false) do + stored_count = state()[:count] + + if stored_count && !force do + stored_count + else + count = Repo.aggregate(query(), :count, :id) + put_stat(:count, count) + count + end + end + defp persist_stats(data_migration) do runner_state = Map.drop(state(), [:status]) _ = DataMigration.update(data_migration, %{data: runner_state}) -- cgit v1.2.3 From f0f0f2af00e8b73a7013c1308289795961b23f4b Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 19 Jan 2021 21:17:06 +0300 Subject: [#3213] `timeout` option for `HashtagsTableMigrator.count/_`. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 048f3c8ee..6a6a7b5b8 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -192,13 +192,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end - def count(force \\ false) do + def count(force \\ false, timeout \\ :infinity) do stored_count = state()[:count] if stored_count && !force do stored_count else - count = Repo.aggregate(query(), :count, :id) + count = Repo.aggregate(query(), :count, :id, timeout: timeout) put_stat(:count, count) count end -- cgit v1.2.3 From b830605577f369d6b1a8730a5b3476ceea4fef5a Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 19 Jan 2021 22:03:25 +0300 Subject: [#3213] Performance-related stat in HashtagsTableMigrator. Reworked `count/_` to indicate approximate total count for current iteration. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 6a6a7b5b8..e9dd9b70c 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -80,6 +80,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do DataMigration.update(data_migration, %{state: :running, data: persistent_data}) update_status(:running) + put_stat(:started_at, NaiveDateTime.utc_now()) Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") @@ -118,6 +119,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) + put_stat( + :records_per_second, + state()[:processed_count] / + Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1]) + ) + persist_stats(data_migration) # A quick and dirty approach to controlling the load this background migration imposes @@ -192,13 +199,18 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end + @doc "Approximate count for current iteration (including processed records count)" def count(force \\ false, timeout \\ :infinity) do stored_count = state()[:count] if stored_count && !force do stored_count else - count = Repo.aggregate(query(), :count, :id, timeout: timeout) + processed_count = state()[:processed_count] || 0 + max_processed_id = data_migration().data["max_processed_id"] || 0 + query = where(query(), [object], object.id > ^max_processed_id) + + count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count put_stat(:count, count) count end -- cgit v1.2.3 From c041e9c6300726a40a00146bba04d3ec752219d9 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 21 Jan 2021 20:19:09 +0300 Subject: [#3213] HashtagsTableMigrator: failures handling fix, retry function. Changed default hashtags filtering strategy to non-aggregate approach. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 52 ++++++++++++++++++++---- lib/pleroma/web/activity_pub/activity_pub.ex | 13 +----- 2 files changed, 45 insertions(+), 20 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index e9dd9b70c..8ad2c8c73 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -109,8 +109,9 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do _ = Repo.query( - "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)", - [object_ids -- failed_ids] + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration.id, object_ids -- failed_ids] ) max_object_id = Enum.at(object_ids, -1) @@ -133,12 +134,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) |> Stream.run() - with {:ok, %{rows: [[0]]}} <- - Repo.query( - "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration.id] - ) do - _ = DataMigration.update_state(data_migration, :complete) + with 0 <- failures_count(data_migration.id) do + {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) handle_success(data_migration) else @@ -167,7 +164,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end defp transfer_object_hashtags(object) do - hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) + embedded_tags = (Map.has_key?(object, :tag) && object.tag) || object.data["tag"] + hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do @@ -246,6 +244,36 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> order_by([o], asc: o.id) end + def failures_count(data_migration_id \\ nil) do + data_migration_id = data_migration_id || data_migration().id + + with {:ok, %{rows: [[count]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration_id] + ) do + count + end + end + + def retry_failed do + data_migration = data_migration() + + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {:ok, _} <- transfer_object_hashtags(object) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration.id, object.id] + ) + end + end) + |> Stream.run() + end + def force_continue do send(whereis(), :migrate_hashtags) end @@ -255,6 +283,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do force_continue() end + def force_complete do + {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) + + handle_success(data_migration) + end + defp update_status(status, message \\ nil) do put_stat(:status, status) put_stat(:message, message) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 0609827ec..dbfd3839d 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -727,6 +727,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> Enum.map(&List.wrap(&1)) end + # Note: times out on larger instances (with default timeout), intended for complex queries defp restrict_hashtag_agg(query, opts) do [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) @@ -1290,25 +1291,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do hashtag_timeline_strategy == :prefer_aggregation -> restrict_hashtag_agg(query, opts) - hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) -> + true -> query |> distinct([activity], true) |> restrict_hashtag_any(opts) |> restrict_hashtag_all(opts) |> restrict_hashtag_reject_any(opts) - - true -> - restrict_hashtag_agg(query, opts) end end - defp avoid_hashtags_aggregation?(opts) do - [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) - - joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0 - Enum.empty?(tag_reject) and joins_count <= 2 - end - def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do list_memberships = Pleroma.List.memberships(opts[:user]) -- cgit v1.2.3 From ca7f24064304945587fc232325dce4b834ff6c94 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 21 Jan 2021 20:50:06 +0300 Subject: [#3213] Ignoring of blank elements from objects.data->tag. --- lib/pleroma/object.ex | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib') diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 5102be1de..9b5c1bec1 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -420,6 +420,8 @@ defmodule Pleroma.Object do hashtag when is_bitstring(hashtag) -> String.downcase(hashtag) end) |> Enum.uniq() + # Note: "" elements (plain text) might occur in `data.tag` for incoming objects + |> Enum.filter(&(&1 not in [nil, ""])) end def object_data_hashtags(_), do: [] -- cgit v1.2.3 From f264d930cc00c463d0f506a94f6f6b494aab7022 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 24 Jan 2021 23:27:02 +0300 Subject: [#3213] Speedup of HashtagsTableMigrator (query optimization). State handling fix. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 18 +++++++++++++++--- lib/pleroma/migrators/hashtags_table_migrator/state.ex | 4 ++++ 2 files changed, 19 insertions(+), 3 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 8ad2c8c73..6a1c9592c 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -72,6 +72,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do @impl true def handle_info(:migrate_hashtags, state) do + State.clear() + data_migration = data_migration() persistent_data = Map.take(data_migration.data, ["max_processed_id"]) @@ -152,8 +154,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) from( object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: is_nil(hashtag.id), where: fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), select: %{ @@ -161,12 +161,24 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do tag: fragment("(?)->'tag'", object.data) } ) + |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"), + on: hashtags_objects.object_id == o.id + ) + |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) end defp transfer_object_hashtags(object) do - embedded_tags = (Map.has_key?(object, :tag) && object.tag) || object.data["tag"] + embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) + if Enum.any?(hashtags) do + transfer_object_hashtags(object, hashtags) + else + {:ok, object.id} + end + end + + defp transfer_object_hashtags(object, hashtags) do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do for hashtag_record <- hashtag_records do diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index 43f7270e2..901563426 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -12,6 +12,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do Agent.start_link(fn -> @init_state end, name: @reg_name) end + def clear do + Agent.update(@reg_name, fn _state -> @init_state end) + end + def get do Agent.get(@reg_name, & &1) end -- cgit v1.2.3 From ea4785213a449f3bcd68bcb4ecb3bb6d794736b1 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Mon, 25 Jan 2021 20:12:09 +0300 Subject: [#3213] Switched to using embedded hashtags in Object.hashtags/1 (to avoid extra joins / preload in timeline queries). --- lib/pleroma/config.ex | 1 - lib/pleroma/object.ex | 18 +++++------------- 2 files changed, 5 insertions(+), 14 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/config.ex b/lib/pleroma/config.ex index ceb8c8b5a..0a6ac0ad0 100644 --- a/lib/pleroma/config.ex +++ b/lib/pleroma/config.ex @@ -98,7 +98,6 @@ defmodule Pleroma.Config do def improved_hashtag_timeline_path, do: [:instance, :improved_hashtag_timeline] def improved_hashtag_timeline, do: get(improved_hashtag_timeline_path()) - def object_embedded_hashtags?, do: !improved_hashtag_timeline() def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], []) diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 9b5c1bec1..9edf43e04 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -388,24 +388,16 @@ defmodule Pleroma.Object do def tags(_), do: [] def hashtags(%Object{} = object) do - cond do - Config.object_embedded_hashtags?() -> - embedded_hashtags(object) - - object.id == "pleroma:fake_object_id" -> - [] - - true -> - hashtag_records = Repo.preload(object, :hashtags).hashtags - Enum.map(hashtag_records, & &1.name) - end + # Note: always using embedded hashtags regardless whether they are migrated to hashtags table + # (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle) + embedded_hashtags(object) end - defp embedded_hashtags(%Object{data: data}) do + def embedded_hashtags(%Object{data: data}) do object_data_hashtags(data) end - defp embedded_hashtags(_), do: [] + def embedded_hashtags(_), do: [] def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do tags -- cgit v1.2.3 From e7864a32d7c9930e5f6c62bd77cef64c68f1eb21 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Mon, 25 Jan 2021 22:31:23 +0300 Subject: [#3213] Removed DISTINCT clause from ActivityPub.fetch_activities_query/2. --- lib/pleroma/web/activity_pub/activity_pub.ex | 1 - 1 file changed, 1 deletion(-) (limited to 'lib') diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index fbda89a25..be81e0833 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -1293,7 +1293,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do true -> query - |> distinct([activity], true) |> restrict_hashtag_any(opts) |> restrict_hashtag_all(opts) |> restrict_hashtag_reject_any(opts) -- cgit v1.2.3 From 380d0cce6b802baf4d13031a4a39f169dd65bffd Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Fri, 29 Jan 2021 00:17:33 +0300 Subject: [#3213] Reinstated DISTINCT clause for hashtag "any" filtering with 2+ terms. Added test. --- lib/pleroma/web/activity_pub/activity_pub.ex | 17 ++++++++++++----- .../web/mastodon_api/controllers/timeline_controller.ex | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index be81e0833..0a21ac2f2 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -846,11 +846,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do - from( - [_activity, object] in query, - join: hashtag in assoc(object, :hashtags), - where: hashtag.name in ^tags - ) + query = + from( + [_activity, object] in query, + join: hashtag in assoc(object, :hashtags), + where: hashtag.name in ^tags + ) + + if length(tags) > 1 do + distinct(query, [activity], true) + else + query + end end defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do diff --git a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex index 08e6f23b9..1fb954a9b 100644 --- a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex @@ -134,9 +134,9 @@ defmodule Pleroma.Web.MastodonAPI.TimelineController do tags = [params[:tag], params[:any]] |> List.flatten() - |> Enum.uniq() |> Enum.reject(&is_nil/1) |> Enum.map(&String.downcase/1) + |> Enum.uniq() tag_all = params -- cgit v1.2.3 From 9948ff3356f9e9e214584207a53eba614c73383c Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 31 Jan 2021 18:24:19 +0300 Subject: [#3213] Added HashtagsCleanupWorker periodic job. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 1 + lib/pleroma/object.ex | 1 + .../workers/cron/hashtags_cleanup_worker.ex | 57 ++++++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 lib/pleroma/workers/cron/hashtags_cleanup_worker.ex (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 6a1c9592c..07b42a7f4 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -152,6 +152,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defp query do # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later from( object in Object, where: diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 9edf43e04..52b77e41c 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -65,6 +65,7 @@ defmodule Pleroma.Object do |> maybe_handle_hashtags_change(struct) end + # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later defp maybe_handle_hashtags_change(changeset, struct) do with data_hashtags_change = get_change(changeset, :data), true <- hashtags_changed?(struct, data_hashtags_change), diff --git a/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex b/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex new file mode 100644 index 000000000..b319067ca --- /dev/null +++ b/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex @@ -0,0 +1,57 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Cron.HashtagsCleanupWorker do + @moduledoc """ + The worker to clean up unused hashtags_objects and hashtags. + """ + + use Oban.Worker, queue: "hashtags_cleanup" + + alias Pleroma.Repo + + require Logger + + @hashtags_objects_query """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ + + @hashtags_query """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.inserted_at < $1); + """ + + @impl Oban.Worker + def perform(_job) do + Logger.info("Cleaning up unused `hashtags_objects` records...") + + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(@hashtags_objects_query, [], timeout: :infinity) + + Logger.info("Deleted #{hashtags_objects_count} unused `hashtags_objects` records.") + + Logger.info("Cleaning up unused `hashtags` records...") + + # Note: ignoring recently created hashtags since references are added after hashtag is created + {:ok, %{num_rows: hashtags_count}} = + Repo.query(@hashtags_query, [NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24)], + timeout: :infinity + ) + + Logger.info("Deleted #{hashtags_count} unused `hashtags` records.") + + Logger.info("HashtagsCleanupWorker complete.") + + :ok + end +end -- cgit v1.2.3 From 6fd4163ab60be07b1a20ac8911e105ddca8e2095 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 31 Jan 2021 20:37:33 +0300 Subject: [#3213] ActivityPub: implemented subqueries-based hashtags filtering, removed aggregation-based hashtags filtering. --- lib/pleroma/web/activity_pub/activity_pub.ex | 228 +++++++++------------------ 1 file changed, 78 insertions(+), 150 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 0a21ac2f2..cda8d3f54 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -669,24 +669,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_since(query, _), do: query - defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do - raise_on_missing_preload() - end - - defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do - from( - [_activity, object] in query, - where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) - ) - end - - defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) - when is_binary(tag_reject) do - restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]}) - end - - defp restrict_embedded_tag_reject(query, _), do: query - defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do raise_on_missing_preload() end @@ -699,139 +681,65 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do - restrict_embedded_tag(query, %{tag: tag}) + restrict_embedded_tag_any(query, %{tag: tag}) end defp restrict_embedded_tag_all(query, _), do: query - defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do + defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do + defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) ) end - defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do - restrict_embedded_tag(query, %{tag: [tag]}) - end - - defp restrict_embedded_tag(query, _), do: query - - defp hashtag_conditions(opts) do - [:tag, :tag_all, :tag_reject] - |> Enum.map(&opts[&1]) - |> Enum.map(&List.wrap(&1)) - end - - # Note: times out on larger instances (with default timeout), intended for complex queries - defp restrict_hashtag_agg(query, opts) do - [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) - has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) - - cond do - !has_conditions -> - query - - opts[:skip_preload] -> - raise_on_missing_preload() - - true -> - query - |> group_by_all_bindings() - |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) - |> maybe_restrict_hashtag_any(tag_any) - |> maybe_restrict_hashtag_all(tag_all) - |> maybe_restrict_hashtag_reject_any(tag_reject) - end - end - - # Groups by all bindings to allow aggregation on hashtags - defp group_by_all_bindings(query) do - # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note - cond do - Enum.count(query.aliases) == 4 -> - from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id]) - - Enum.count(query.aliases) == 3 -> - from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id]) - - Enum.count(query.aliases) == 2 -> - from([a, o, b3] in query, group_by: [a.id, o.id, b3.id]) - - true -> - from([a, o] in query, group_by: [a.id, o.id]) - end - end - - defp maybe_restrict_hashtag_any(query, []) do - query - end - - defp maybe_restrict_hashtag_any(query, tags) do - having( - query, - [hashtag: hashtag], - fragment("array_agg(?) && (?)", hashtag.name, ^tags) - ) - end - - defp maybe_restrict_hashtag_all(query, []) do - query + defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do + restrict_embedded_tag_any(query, %{tag: [tag]}) end - defp maybe_restrict_hashtag_all(query, tags) do - having( - query, - [hashtag: hashtag], - fragment("array_agg(?) @> (?)", hashtag.name, ^tags) - ) - end + defp restrict_embedded_tag_any(query, _), do: query - defp maybe_restrict_hashtag_reject_any(query, []) do - query - end - - defp maybe_restrict_hashtag_reject_any(query, tags) do - having( - query, - [hashtag: hashtag], - fragment("not(array_agg(?) && (?))", hashtag.name, ^tags) - ) - end - - defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do - query - |> group_by_all_bindings() - |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) - |> having( - [hashtag: hashtag], - fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject) + defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject}) + when is_list(tag_reject) do + from( + [_activity, object] in query, + where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end - defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do - restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject}) + when is_binary(tag_reject) do + restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]}) end - defp restrict_hashtag_reject_any(query, _), do: query + defp restrict_embedded_tag_reject_any(query, _), do: query defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do raise_on_missing_preload() end defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do - Enum.reduce( - tags, - query, - fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end + from( + [_activity, object] in query, + where: + fragment( + """ + (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ?) @> ? + """, + ^tags, + object.id, + ^tags + ) ) end @@ -846,18 +754,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do - query = - from( - [_activity, object] in query, - join: hashtag in assoc(object, :hashtags), - where: hashtag.name in ^tags - ) - - if length(tags) > 1 do - distinct(query, [activity], true) - else - query - end + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags, + object.id + ) + ) end defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do @@ -866,6 +775,32 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_hashtag_any(query, _), do: query + defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do + from( + [_activity, object] in query, + where: + fragment( + """ + NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags_reject, + object.id + ) + ) + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do + restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + end + + defp restrict_hashtag_reject_any(query, _), do: query + defp raise_on_missing_preload do raise "Can't use the child object without preloading!" end @@ -1286,23 +1221,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) - hashtag_timeline_strategy = Config.improved_hashtag_timeline() - - cond do - !hashtag_timeline_strategy -> - query - |> restrict_embedded_tag(opts) - |> restrict_embedded_tag_reject(opts) - |> restrict_embedded_tag_all(opts) - - hashtag_timeline_strategy == :prefer_aggregation -> - restrict_hashtag_agg(query, opts) - - true -> - query - |> restrict_hashtag_any(opts) - |> restrict_hashtag_all(opts) - |> restrict_hashtag_reject_any(opts) + if Config.improved_hashtag_timeline() do + query + |> restrict_hashtag_any(opts) + |> restrict_hashtag_all(opts) + |> restrict_hashtag_reject_any(opts) + else + query + |> restrict_embedded_tag_any(opts) + |> restrict_embedded_tag_all(opts) + |> restrict_embedded_tag_reject_any(opts) end end -- cgit v1.2.3 From 108e90b18edcfb57b9839e7c5d6d444a63ae2069 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 31 Jan 2021 22:03:59 +0300 Subject: [#3213] Explicitly defined PKs in hashtags_objects and data_migration_failed_ids. Added "pleroma.database rollback" task to revert a single migration. --- lib/mix/tasks/pleroma/database.ex | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 4ddace9c9..30c0d2bf1 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -20,6 +20,30 @@ defmodule Mix.Tasks.Pleroma.Database do @shortdoc "A collection of database related tasks" @moduledoc File.read!("docs/administration/CLI_tasks/database.md") + # Rolls back a specific migration (leaving subsequent migrations applied) + # Based on https://stackoverflow.com/a/53825840 + def run(["rollback", version]) do + start_pleroma() + + version = String.to_integer(version) + re = ~r/^#{version}_.*\.exs/ + path = Application.app_dir(:pleroma, Path.join(["priv", "repo", "migrations"])) + + result = + with {:find, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))}, + {:compile, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))}, + {:rollback, :ok} <- {:rollback, Ecto.Migrator.down(Repo, version, mod)} do + {:ok, "Reversed migration: #{file}"} + else + {:find, _} -> {:error, "No migration found with version prefix: #{version}"} + {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"} + {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"} + e -> {:error, "Something unexpected happened: #{inspect(e)}"} + end + + IO.inspect(result) + end + def run(["remove_embedded_objects" | args]) do {options, [], []} = OptionParser.parse( -- cgit v1.2.3 From 10207f840ce3515dddfde36288575f203c52840f Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 31 Jan 2021 22:36:46 +0300 Subject: [#3213] ActivityPub: temporarily reverted to previous hashtags filtering implementation due to blank results issue. --- lib/pleroma/web/activity_pub/activity_pub.ex | 106 ++++++++++++++------------- 1 file changed, 54 insertions(+), 52 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index cda8d3f54..fd0144aad 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -722,24 +722,53 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_embedded_tag_reject_any(query, _), do: query + # Groups by all bindings to allow aggregation on hashtags + defp group_by_all_bindings(query) do + # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note + cond do + Enum.count(query.aliases) == 4 -> + from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id]) + + Enum.count(query.aliases) == 3 -> + from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id]) + + Enum.count(query.aliases) == 2 -> + from([a, o, b3] in query, group_by: [a.id, o.id, b3.id]) + + true -> + from([a, o] in query, group_by: [a.id, o.id]) + end + end + + defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do + query + |> group_by_all_bindings() + |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) + |> having( + [hashtag: hashtag], + fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject) + ) + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do + restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + end + + defp restrict_hashtag_reject_any(query, _), do: query + defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do raise_on_missing_preload() end defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do - from( - [_activity, object] in query, - where: - fragment( - """ - (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) - AND hashtags_objects.object_id = ?) @> ? - """, - ^tags, - object.id, - ^tags - ) + Enum.reduce( + tags, + query, + fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end ) end @@ -754,19 +783,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do - from( - [_activity, object] in query, - where: - fragment( - """ - EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) - AND hashtags_objects.object_id = ? LIMIT 1) - """, - ^tags, - object.id - ) - ) + query = + from( + [_activity, object] in query, + join: hashtag in assoc(object, :hashtags), + where: hashtag.name in ^tags + ) + + if length(tags) > 1 do + distinct(query, [activity], true) + else + query + end end defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do @@ -775,32 +803,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_hashtag_any(query, _), do: query - defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do - raise_on_missing_preload() - end - - defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do - from( - [_activity, object] in query, - where: - fragment( - """ - NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) - AND hashtags_objects.object_id = ? LIMIT 1) - """, - ^tags_reject, - object.id - ) - ) - end - - defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do - restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) - end - - defp restrict_hashtag_reject_any(query, _), do: query - defp raise_on_missing_preload do raise "Can't use the child object without preloading!" end -- cgit v1.2.3 From cf4765af4098098fa4d6996193432bd19c439a75 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 31 Jan 2021 23:06:38 +0300 Subject: [#3213] ActivityPub: fixed subquery-based hashtags filtering implementation (addressed empty list options issue). Added regression test. --- lib/pleroma/web/activity_pub/activity_pub.ex | 115 +++++++++++++-------------- 1 file changed, 56 insertions(+), 59 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index fd0144aad..6cf4093fb 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -673,7 +673,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do raise_on_missing_preload() end - defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do + defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) @@ -690,7 +690,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do raise_on_missing_preload() end - defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do + defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag}) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) @@ -707,8 +707,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do raise_on_missing_preload() end - defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject}) - when is_list(tag_reject) do + defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) @@ -722,86 +721,84 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_embedded_tag_reject_any(query, _), do: query - # Groups by all bindings to allow aggregation on hashtags - defp group_by_all_bindings(query) do - # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note - cond do - Enum.count(query.aliases) == 4 -> - from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id]) - - Enum.count(query.aliases) == 3 -> - from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id]) - - Enum.count(query.aliases) == 2 -> - from([a, o, b3] in query, group_by: [a.id, o.id, b3.id]) - - true -> - from([a, o] in query, group_by: [a.id, o.id]) - end - end - - defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do - query - |> group_by_all_bindings() - |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) - |> having( - [hashtag: hashtag], - fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject) + defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do + from( + [_activity, object] in query, + where: + fragment( + """ + (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ?) @> ? + """, + ^tags, + object.id, + ^tags + ) ) end - defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do - restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do + restrict_hashtag_any(query, %{tag: tag}) end - defp restrict_hashtag_reject_any(query, _), do: query + defp restrict_hashtag_all(query, _), do: query - defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do + defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do - Enum.reduce( - tags, - query, - fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end + defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags, + object.id + ) ) end - defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do - restrict_hashtag_any(query, %{tag: tag}) + defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do + restrict_hashtag_any(query, %{tag: [tag]}) end - defp restrict_hashtag_all(query, _), do: query + defp restrict_hashtag_any(query, _), do: query - defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do + defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do - query = - from( - [_activity, object] in query, - join: hashtag in assoc(object, :hashtags), - where: hashtag.name in ^tags - ) - - if length(tags) > 1 do - distinct(query, [activity], true) - else - query - end + defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do + from( + [_activity, object] in query, + where: + fragment( + """ + NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags_reject, + object.id + ) + ) end - defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do - restrict_hashtag_any(query, %{tag: [tag]}) + defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do + restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) end - defp restrict_hashtag_any(query, _), do: query + defp restrict_hashtag_reject_any(query, _), do: query defp raise_on_missing_preload do raise "Can't use the child object without preloading!" -- cgit v1.2.3 From d1c6dd97aa503ca7c897d67d98fe8c924e113a61 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 7 Feb 2021 22:24:12 +0300 Subject: [#3213] Partially addressed code review points. migration rollback task changes, hashtags-related config handling tweaks, `hashtags.data` deletion (unused). --- lib/mix/tasks/pleroma/database.ex | 53 +++++++++++++----------- lib/pleroma/config.ex | 3 -- lib/pleroma/hashtag.ex | 5 +-- lib/pleroma/migrators/hashtags_table_migrator.ex | 4 +- lib/pleroma/web/activity_pub/activity_pub.ex | 2 +- 5 files changed, 33 insertions(+), 34 deletions(-) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 30c0d2bf1..7c4f54141 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -20,30 +20,6 @@ defmodule Mix.Tasks.Pleroma.Database do @shortdoc "A collection of database related tasks" @moduledoc File.read!("docs/administration/CLI_tasks/database.md") - # Rolls back a specific migration (leaving subsequent migrations applied) - # Based on https://stackoverflow.com/a/53825840 - def run(["rollback", version]) do - start_pleroma() - - version = String.to_integer(version) - re = ~r/^#{version}_.*\.exs/ - path = Application.app_dir(:pleroma, Path.join(["priv", "repo", "migrations"])) - - result = - with {:find, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))}, - {:compile, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))}, - {:rollback, :ok} <- {:rollback, Ecto.Migrator.down(Repo, version, mod)} do - {:ok, "Reversed migration: #{file}"} - else - {:find, _} -> {:error, "No migration found with version prefix: #{version}"} - {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"} - {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"} - e -> {:error, "Something unexpected happened: #{inspect(e)}"} - end - - IO.inspect(result) - end - def run(["remove_embedded_objects" | args]) do {options, [], []} = OptionParser.parse( @@ -194,4 +170,33 @@ defmodule Mix.Tasks.Pleroma.Database do end) |> Stream.run() end + + # Rolls back a specific migration (leaving subsequent migrations applied). + # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility. + # Based on https://stackoverflow.com/a/53825840 + def run(["rollback", version]) do + prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?" + + if shell_prompt(prompt, "n") in ~w(Yn Y y) do + {_, result, _} = + Ecto.Migrator.with_repo(Pleroma.Repo, fn repo -> + version = String.to_integer(version) + re = ~r/^#{version}_.*\.exs/ + path = Ecto.Migrator.migrations_path(repo) + + with {:find, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))}, + {:compile, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))}, + {:rollback, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do + {:ok, "Reversed migration: #{file}"} + else + {:find, _} -> {:error, "No migration found with version prefix: #{version}"} + {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"} + {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"} + e -> {:error, "Something unexpected happened: #{inspect(e)}"} + end + end) + + IO.inspect(result) + end + end end diff --git a/lib/pleroma/config.ex b/lib/pleroma/config.ex index 0a6ac0ad0..f17e14128 100644 --- a/lib/pleroma/config.ex +++ b/lib/pleroma/config.ex @@ -96,9 +96,6 @@ defmodule Pleroma.Config do end end - def improved_hashtag_timeline_path, do: [:instance, :improved_hashtag_timeline] - def improved_hashtag_timeline, do: get(improved_hashtag_timeline_path()) - def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], []) def oauth_consumer_enabled?, do: oauth_consumer_strategies() != [] diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index b05927563..9e4c6c894 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -10,11 +10,8 @@ defmodule Pleroma.Hashtag do alias Pleroma.Hashtag alias Pleroma.Repo - @derive {Jason.Encoder, only: [:data]} - schema "hashtags" do field(:name, :string) - field(:data, :map, default: %{}) many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete) @@ -50,7 +47,7 @@ defmodule Pleroma.Hashtag do def changeset(%Hashtag{} = struct, params) do struct - |> cast(params, [:name, :data]) + |> cast(params, [:name]) |> update_change(:name, &String.downcase/1) |> validate_required([:name]) |> unique_constraint(:name) diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 07b42a7f4..9a036e0b2 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -239,11 +239,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do data_migration.feature_lock -> :noop - not is_nil(Config.improved_hashtag_timeline()) -> + not is_nil(Config.get([:database, :improved_hashtag_timeline])) -> :noop true -> - Config.put(Config.improved_hashtag_timeline_path(), true) + Config.put([:database, :improved_hashtag_timeline], true) :ok end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 573b4243c..7ac18e5c5 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -1227,7 +1227,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) - if Config.improved_hashtag_timeline() do + if Config.get([:database, :improved_hashtag_timeline]) do query |> restrict_hashtag_any(opts) |> restrict_hashtag_all(opts) -- cgit v1.2.3 From a996ab46a54acbfa7a19da3eae12c78ed6466a1a Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 11 Feb 2021 19:30:21 +0300 Subject: [#3213] Reorganized hashtags cleanup. Transaction-wrapped Hashtag.get_or_create_by_names/1. Misc. improvements. --- lib/pleroma/hashtag.ex | 60 +++++++++++++++---- lib/pleroma/migrators/hashtags_table_migrator.ex | 70 +++++++++++++++------- lib/pleroma/object.ex | 27 +++++---- .../workers/cron/hashtags_cleanup_worker.ex | 57 ------------------ 4 files changed, 114 insertions(+), 100 deletions(-) delete mode 100644 lib/pleroma/workers/cron/hashtags_cleanup_worker.ex (limited to 'lib') diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index 9e4c6c894..de52c4dae 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -6,14 +6,17 @@ defmodule Pleroma.Hashtag do use Ecto.Schema import Ecto.Changeset + import Ecto.Query + alias Ecto.Multi alias Pleroma.Hashtag + alias Pleroma.Object alias Pleroma.Repo schema "hashtags" do field(:name, :string) - many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete) + many_to_many(:objects, Object, join_through: "hashtags_objects", on_replace: :delete) timestamps() end @@ -34,15 +37,27 @@ defmodule Pleroma.Hashtag do end def get_or_create_by_names(names) when is_list(names) do - Enum.reduce_while(names, {:ok, []}, fn name, {:ok, list} -> - case get_or_create_by_name(name) do - {:ok, %Hashtag{} = hashtag} -> - {:cont, {:ok, list ++ [hashtag]}} - - error -> - {:halt, error} - end - end) + timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) + + structs = + Enum.map(names, fn name -> + %Hashtag{} + |> changeset(%{name: name}) + |> Map.get(:changes) + |> Map.merge(%{inserted_at: timestamp, updated_at: timestamp}) + end) + + with {:ok, %{query_op: hashtags}} <- + Multi.new() + |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing) + |> Multi.run(:query_op, fn _repo, _changes -> + {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} + end) + |> Repo.transaction() do + {:ok, hashtags} + else + {:error, _name, value, _changes_so_far} -> {:error, value} + end end def changeset(%Hashtag{} = struct, params) do @@ -52,4 +67,29 @@ defmodule Pleroma.Hashtag do |> validate_required([:name]) |> unique_constraint(:name) end + + def unlink(%Object{id: object_id}) do + with {_, hashtag_ids} <- + from(hto in "hashtags_objects", + where: hto.object_id == ^object_id, + select: hto.hashtag_id + ) + |> Repo.delete_all() do + delete_unreferenced(hashtag_ids) + end + end + + @delete_unreferenced_query """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.id = ANY($1)); + """ + + def delete_unreferenced(ids) do + with {:ok, %{num_rows: deleted_count}} <- Repo.query(@delete_unreferenced_query, [ids]) do + {:ok, deleted_count} + end + end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 9a036e0b2..c53f6be12 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -74,16 +74,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_info(:migrate_hashtags, state) do State.clear() - data_migration = data_migration() + update_status(:running) + put_stat(:started_at, NaiveDateTime.utc_now()) + data_migration = data_migration() persistent_data = Map.take(data_migration.data, ["max_processed_id"]) {:ok, data_migration} = DataMigration.update(data_migration, %{state: :running, data: persistent_data}) - update_status(:running) - put_stat(:started_at, NaiveDateTime.utc_now()) - Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") max_processed_id = data_migration.data["max_processed_id"] || 0 @@ -137,6 +136,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> Stream.run() with 0 <- failures_count(data_migration.id) do + _ = delete_non_create_activities_hashtags() + {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) handle_success(data_migration) @@ -150,9 +151,37 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do {:noreply, state} end + @hashtags_objects_cleanup_query """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ + + @hashtags_cleanup_query """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL); + """ + + def delete_non_create_activities_hashtags do + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) + + {:ok, %{num_rows: hashtags_count}} = + Repo.query(@hashtags_cleanup_query, [], timeout: :infinity) + + {:ok, hashtags_objects_count, hashtags_count} + end + defp query do # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later + # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up from( object in Object, where: @@ -182,25 +211,20 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defp transfer_object_hashtags(object, hashtags) do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do - for hashtag_record <- hashtag_records do - with {:ok, _} <- - Repo.query( - "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);", - [hashtag_record.id, object.id] - ) do - nil - else - {:error, e} -> - error = - "ERROR: could not link object #{object.id} and hashtag " <> - "#{hashtag_record.id}: #{inspect(e)}" - - Logger.error(error) - Repo.rollback(object.id) - end + maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) + expected_rows = length(hashtag_records) + + with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do + object.id + else + e -> + error = + "ERROR when inserting #{expected_rows} hashtags_objects " <> + "for object #{object.id}: #{inspect(e)}" + + Logger.error(error) + Repo.rollback(object.id) end - - object.id else e -> error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 52b77e41c..3ba749d1a 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -62,27 +62,30 @@ defmodule Pleroma.Object do |> cast(params, [:data]) |> validate_required([:data]) |> unique_constraint(:ap_id, name: :objects_unique_apid_index) + # Expecting `maybe_handle_hashtags_change/1` to run last: |> maybe_handle_hashtags_change(struct) end - # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later + # Note: not checking activity type (assuming non-legacy objects are associated with Create act.) defp maybe_handle_hashtags_change(changeset, struct) do - with data_hashtags_change = get_change(changeset, :data), - true <- hashtags_changed?(struct, data_hashtags_change), + with %Ecto.Changeset{valid?: true} <- changeset, + data_hashtags_change = get_change(changeset, :data), + {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)}, {:ok, hashtag_records} <- data_hashtags_change |> object_data_hashtags() |> Hashtag.get_or_create_by_names() do put_assoc(changeset, :hashtags, hashtag_records) else - false -> + %{valid?: false} -> changeset - {:error, hashtag_changeset} -> - failed_hashtag = get_field(hashtag_changeset, :name) + {:changed, false} -> + changeset + {:error, _} -> validate_change(changeset, :data, fn _, _ -> - [data: "error referencing hashtag: #{failed_hashtag}"] + [data: "error referencing hashtags"] end) end end @@ -221,9 +224,13 @@ defmodule Pleroma.Object do def swap_object_with_tombstone(object) do tombstone = make_tombstone(object) - object - |> Object.change(%{data: tombstone}) - |> Repo.update() + with {:ok, object} <- + object + |> Object.change(%{data: tombstone}) + |> Repo.update() do + Hashtag.unlink(object) + {:ok, object} + end end def delete(%Object{data: %{"id" => id}} = object) do diff --git a/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex b/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex deleted file mode 100644 index b319067ca..000000000 --- a/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex +++ /dev/null @@ -1,57 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Cron.HashtagsCleanupWorker do - @moduledoc """ - The worker to clean up unused hashtags_objects and hashtags. - """ - - use Oban.Worker, queue: "hashtags_cleanup" - - alias Pleroma.Repo - - require Logger - - @hashtags_objects_query """ - DELETE FROM hashtags_objects WHERE object_id IN - (SELECT DISTINCT objects.id FROM objects - JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities - ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = - (objects.data->>'id') - AND activities.data->>'type' = 'Create' - WHERE activities.id IS NULL); - """ - - @hashtags_query """ - DELETE FROM hashtags WHERE id IN - (SELECT hashtags.id FROM hashtags - LEFT OUTER JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id - WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.inserted_at < $1); - """ - - @impl Oban.Worker - def perform(_job) do - Logger.info("Cleaning up unused `hashtags_objects` records...") - - {:ok, %{num_rows: hashtags_objects_count}} = - Repo.query(@hashtags_objects_query, [], timeout: :infinity) - - Logger.info("Deleted #{hashtags_objects_count} unused `hashtags_objects` records.") - - Logger.info("Cleaning up unused `hashtags` records...") - - # Note: ignoring recently created hashtags since references are added after hashtag is created - {:ok, %{num_rows: hashtags_count}} = - Repo.query(@hashtags_query, [NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24)], - timeout: :infinity - ) - - Logger.info("Deleted #{hashtags_count} unused `hashtags` records.") - - Logger.info("HashtagsCleanupWorker complete.") - - :ok - end -end -- cgit v1.2.3 From 349b8b0f4fb1c2b86f913e1840f15c052ff43c24 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 13 Feb 2021 22:01:11 +0300 Subject: [#3213] `rescue` around potentially-raising `Repo.insert_all/_` calls. Misc. improvements (docs etc.). --- lib/pleroma/hashtag.ex | 29 ++++++++++++++---------- lib/pleroma/migrators/hashtags_table_migrator.ex | 21 ++++++++++------- 2 files changed, 30 insertions(+), 20 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index de52c4dae..0d6a4d09e 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -47,16 +47,20 @@ defmodule Pleroma.Hashtag do |> Map.merge(%{inserted_at: timestamp, updated_at: timestamp}) end) - with {:ok, %{query_op: hashtags}} <- - Multi.new() - |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing) - |> Multi.run(:query_op, fn _repo, _changes -> - {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} - end) - |> Repo.transaction() do - {:ok, hashtags} - else - {:error, _name, value, _changes_so_far} -> {:error, value} + try do + with {:ok, %{query_op: hashtags}} <- + Multi.new() + |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing) + |> Multi.run(:query_op, fn _repo, _changes -> + {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} + end) + |> Repo.transaction() do + {:ok, hashtags} + else + {:error, _name, value, _changes_so_far} -> {:error, value} + end + rescue + e -> {:error, e} end end @@ -74,8 +78,9 @@ defmodule Pleroma.Hashtag do where: hto.object_id == ^object_id, select: hto.hashtag_id ) - |> Repo.delete_all() do - delete_unreferenced(hashtag_ids) + |> Repo.delete_all(), + {:ok, unreferenced_count} <- delete_unreferenced(hashtag_ids) do + {:ok, length(hashtag_ids), unreferenced_count} end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index c53f6be12..432c3401a 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -214,15 +214,20 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) expected_rows = length(hashtag_records) - with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do - object.id - else + base_error = + "ERROR when inserting #{expected_rows} hashtags_objects for obj. #{object.id}" + + try do + with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do + object.id + else + e -> + Logger.error("#{base_error}: #{inspect(e)}") + Repo.rollback(object.id) + end + rescue e -> - error = - "ERROR when inserting #{expected_rows} hashtags_objects " <> - "for object #{object.id}: #{inspect(e)}" - - Logger.error(error) + Logger.error("#{base_error}: #{inspect(e)}") Repo.rollback(object.id) end else -- cgit v1.2.3 From 1dac7d14623f36744953a523650211540d90d1fc Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Mon, 15 Feb 2021 21:13:14 +0300 Subject: [#3213] Fixed `hashtags.name` lookup (must use `citext` type to do index scan). Fixed embedded hashtags lookup (lowercasing), adjusted tests. --- lib/pleroma/hashtag.ex | 8 ++++++-- lib/pleroma/web/activity_pub/activity_pub.ex | 22 ++++++++++++++++------ 2 files changed, 22 insertions(+), 8 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index 0d6a4d09e..a6d033816 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -22,7 +22,9 @@ defmodule Pleroma.Hashtag do end def get_by_name(name) do - Repo.get_by(Hashtag, name: name) + from(h in Hashtag) + |> where([h], fragment("name = ?::citext", ^String.downcase(name))) + |> Repo.one() end def get_or_create_by_name(name) when is_bitstring(name) do @@ -37,6 +39,7 @@ defmodule Pleroma.Hashtag do end def get_or_create_by_names(names) when is_list(names) do + names = Enum.map(names, &String.downcase/1) timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) structs = @@ -52,7 +55,8 @@ defmodule Pleroma.Hashtag do Multi.new() |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing) |> Multi.run(:query_op, fn _repo, _changes -> - {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} + {:ok, + Repo.all(from(ht in Hashtag, where: ht.name in fragment("?::citext[]", ^names)))} end) |> Repo.transaction() do {:ok, hashtags} diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 9623e635a..e012f2779 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -698,6 +698,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do + tag_all = Enum.map(tag_all, &String.downcase/1) + from( [_activity, object] in query, where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) @@ -714,10 +716,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do raise_on_missing_preload() end - defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag}) do + defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do + tag_any = Enum.map(tag_any, &String.downcase/1) + from( [_activity, object] in query, - where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) + where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any) ) end @@ -732,6 +736,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do + tag_reject = Enum.map(tag_reject, &String.downcase/1) + from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) @@ -749,6 +755,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do raise_on_missing_preload() end + defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do + restrict_hashtag_any(query, %{tag: single_tag}) + end + defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do from( [_activity, object] in query, @@ -756,7 +766,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do fragment( """ (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) AND hashtags_objects.object_id = ?) @> ? """, ^tags, @@ -767,7 +777,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do - restrict_hashtag_any(query, %{tag: tag}) + restrict_hashtag_all(query, %{tag_all: [tag]}) end defp restrict_hashtag_all(query, _), do: query @@ -783,7 +793,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do fragment( """ EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) AND hashtags_objects.object_id = ? LIMIT 1) """, ^tags, @@ -809,7 +819,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do fragment( """ NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) AND hashtags_objects.object_id = ? LIMIT 1) """, ^tags_reject, -- cgit v1.2.3 From 938823c73040f6b55896581daf5baf732f859f02 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 16 Feb 2021 23:14:15 +0300 Subject: [#3213] HashtagsTableMigrator state management refactoring & improvements (proper stats serialization etc.). --- lib/pleroma/data_migration.ex | 15 ++-- lib/pleroma/migrators/hashtags_table_migrator.ex | 87 ++++++++++------------ .../migrators/hashtags_table_migrator/state.ex | 87 +++++++++++++++++++--- 3 files changed, 122 insertions(+), 67 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex index 64fa155ff..1377af16e 100644 --- a/lib/pleroma/data_migration.ex +++ b/lib/pleroma/data_migration.ex @@ -10,6 +10,7 @@ defmodule Pleroma.DataMigration do alias Pleroma.Repo import Ecto.Changeset + import Ecto.Query schema "data_migrations" do field(:name, :string) @@ -28,14 +29,12 @@ defmodule Pleroma.DataMigration do |> unique_constraint(:name) end - def update(data_migration, params \\ %{}) do - data_migration - |> changeset(params) - |> Repo.update() - end - - def update_state(data_migration, new_state) do - update(data_migration, %{state: new_state}) + def update_one_by_id(id, params \\ %{}) do + with {1, _} <- + from(dm in DataMigration, where: dm.id == ^id) + |> Repo.update_all(set: params) do + :ok + end end def get_by_name(name) do diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 432c3401a..a226d9d29 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -11,16 +11,16 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do alias __MODULE__.State alias Pleroma.Config - alias Pleroma.DataMigration alias Pleroma.Hashtag alias Pleroma.Object alias Pleroma.Repo - defdelegate state(), to: State, as: :get - defdelegate put_stat(key, value), to: State, as: :put - defdelegate increment_stat(key, increment), to: State, as: :increment + defdelegate data_migration(), to: State - defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + defdelegate state(), to: State + defdelegate get_stat(key, value), to: State, as: :get_data_key + defdelegate put_stat(key, value), to: State, as: :put_data_key + defdelegate increment_stat(key, increment), to: State, as: :increment_data_key @reg_name {:global, __MODULE__} @@ -45,7 +45,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - update_status(:init) + update_status(:pending) data_migration = data_migration() manual_migrations = Config.get([:instance, :manual_data_migrations], []) @@ -55,13 +55,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do update_status(:noop) is_nil(data_migration) -> - update_status(:halt, "Data migration does not exist.") + update_status(:failed, "Data migration does not exist.") data_migration.state == :manual or data_migration.name in manual_migrations -> - update_status(:noop, "Data migration is in manual execution state.") + update_status(:manual, "Data migration is in manual execution state.") data_migration.state == :complete -> - handle_success(data_migration) + on_complete(data_migration) true -> send(self(), :migrate_hashtags) @@ -72,20 +72,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do @impl true def handle_info(:migrate_hashtags, state) do - State.clear() + State.reinit() update_status(:running) put_stat(:started_at, NaiveDateTime.utc_now()) - data_migration = data_migration() - persistent_data = Map.take(data_migration.data, ["max_processed_id"]) - - {:ok, data_migration} = - DataMigration.update(data_migration, %{state: :running, data: persistent_data}) - - Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") + %{id: data_migration_id} = data_migration() + max_processed_id = get_stat(:max_processed_id, 0) - max_processed_id = data_migration.data["max_processed_id"] || 0 + Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") query() |> where([object], object.id > ^max_processed_id) @@ -104,7 +99,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do Repo.query( "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> "VALUES ($1, $2) ON CONFLICT DO NOTHING;", - [data_migration.id, failed_id] + [data_migration_id, failed_id] ) end @@ -112,7 +107,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do Repo.query( "DELETE FROM data_migration_failed_ids " <> "WHERE data_migration_id = $1 AND record_id = ANY($2)", - [data_migration.id, object_ids -- failed_ids] + [data_migration_id, object_ids -- failed_ids] ) max_object_id = Enum.at(object_ids, -1) @@ -120,14 +115,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do put_stat(:max_processed_id, max_object_id) increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) - - put_stat( - :records_per_second, - state()[:processed_count] / - Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1]) - ) - - persist_stats(data_migration) + put_stat(:records_per_second, records_per_second()) + _ = State.persist_to_db() # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) @@ -135,22 +124,25 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) |> Stream.run() - with 0 <- failures_count(data_migration.id) do + with 0 <- failures_count(data_migration_id) do _ = delete_non_create_activities_hashtags() - - {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) - - handle_success(data_migration) + set_complete() else _ -> - _ = DataMigration.update_state(data_migration, :failed) - update_status(:failed, "Please check data_migration_failed_ids records.") end {:noreply, state} end + defp records_per_second do + get_stat(:processed_count, 0) / Enum.max([running_time(), 1]) + end + + defp running_time do + NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now())) + end + @hashtags_objects_cleanup_query """ DELETE FROM hashtags_objects WHERE object_id IN (SELECT DISTINCT objects.id FROM objects @@ -169,6 +161,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do WHERE hashtags_objects.hashtag_id IS NULL); """ + @doc """ + Deletes `hashtags_objects` for legacy objects not asoociated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ def delete_non_create_activities_hashtags do {:ok, %{num_rows: hashtags_objects_count}} = Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) @@ -256,14 +252,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end end - defp persist_stats(data_migration) do - runner_state = Map.drop(state(), [:status]) - _ = DataMigration.update(data_migration, %{data: runner_state}) - end - - defp handle_success(data_migration) do - update_status(:complete) - + defp on_complete(data_migration) do cond do data_migration.feature_lock -> :noop @@ -321,18 +310,18 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end def force_restart do - {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) + :ok = State.reset() force_continue() end - def force_complete do - {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) - - handle_success(data_migration) + def set_complete do + update_status(:complete) + _ = State.persist_to_db() + on_complete(data_migration()) end defp update_status(status, message \\ nil) do - put_stat(:status, status) + put_stat(:state, status) put_stat(:message, message) end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index 901563426..ed9848824 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -5,31 +5,98 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do use Agent - @init_state %{} + alias Pleroma.DataMigration + + defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + @reg_name {:global, __MODULE__} def start_link(_) do - Agent.start_link(fn -> @init_state end, name: @reg_name) + Agent.start_link(fn -> load_state_from_db() end, name: @reg_name) + end + + defp load_state_from_db do + data_migration = data_migration() + + data = + if data_migration do + Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end) + else + %{} + end + + %{ + data_migration_id: data_migration && data_migration.id, + data: data + } end - def clear do - Agent.update(@reg_name, fn _state -> @init_state end) + def persist_to_db do + %{data_migration_id: data_migration_id, data: data} = state() + + if data_migration_id do + DataMigration.update_one_by_id(data_migration_id, data: data) + else + {:error, :nil_data_migration_id} + end + end + + def reset do + %{data_migration_id: data_migration_id} = state() + + with false <- is_nil(data_migration_id), + :ok <- + DataMigration.update_one_by_id(data_migration_id, + state: :pending, + data: %{} + ) do + reinit() + else + true -> {:error, :nil_data_migration_id} + e -> e + end end - def get do + def reinit do + Agent.update(@reg_name, fn _state -> load_state_from_db() end) + end + + def state do Agent.get(@reg_name, & &1) end - def put(key, value) do + def get_data_key(key, default \\ nil) do + get_in(state(), [:data, key]) || default + end + + def put_data_key(key, value) do + _ = persist_non_data_change(key, value) + Agent.update(@reg_name, fn state -> - Map.put(state, key, value) + put_in(state, [:data, key], value) end) end - def increment(key, increment \\ 1) do + def increment_data_key(key, increment \\ 1) do Agent.update(@reg_name, fn state -> - updated_value = (state[key] || 0) + increment - Map.put(state, key, updated_value) + initial_value = get_in(state, [:data, key]) || 0 + updated_value = initial_value + increment + put_in(state, [:data, key], updated_value) end) end + + defp persist_non_data_change(:state, value) do + with true <- get_data_key(:state) != value, + true <- value in Pleroma.DataMigration.State.__valid_values__(), + %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do + DataMigration.update_one_by_id(data_migration_id, state: value) + else + false -> :ok + _ -> {:error, :nil_data_migration_id} + end + end + + defp persist_non_data_change(_, _) do + nil + end end -- cgit v1.2.3 From 854ea1aefb5ff4e03e9e9af6e8dd50f66c61c913 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Wed, 17 Feb 2021 09:23:35 +0300 Subject: [#3213] Fixed `HashtagsTableMigrator.count/1`. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index a226d9d29..ac17f91cc 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -18,7 +18,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defdelegate data_migration(), to: State defdelegate state(), to: State - defdelegate get_stat(key, value), to: State, as: :get_data_key + defdelegate persist_state(), to: State, as: :persist_to_db + defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key defdelegate put_stat(key, value), to: State, as: :put_data_key defdelegate increment_stat(key, increment), to: State, as: :increment_data_key @@ -116,7 +117,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) put_stat(:records_per_second, records_per_second()) - _ = State.persist_to_db() + persist_state() # A quick and dirty approach to controlling the load this background migration imposes sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) @@ -237,17 +238,19 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do @doc "Approximate count for current iteration (including processed records count)" def count(force \\ false, timeout \\ :infinity) do - stored_count = state()[:count] + stored_count = get_stat(:count) if stored_count && !force do stored_count else - processed_count = state()[:processed_count] || 0 - max_processed_id = data_migration().data["max_processed_id"] || 0 + processed_count = get_stat(:processed_count, 0) + max_processed_id = get_stat(:max_processed_id, 0) query = where(query(), [object], object.id > ^max_processed_id) count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count put_stat(:count, count) + persist_state() + count end end @@ -316,7 +319,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def set_complete do update_status(:complete) - _ = State.persist_to_db() + persist_state() on_complete(data_migration()) end -- cgit v1.2.3 From b981edad8a7d8f27b231bc6164fc0546efbdb646 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 18 Feb 2021 20:40:10 +0300 Subject: [#3213] HashtagsTableMigrator: fault rate allowance to enable the feature (defaults to 1%), counting of affected objects, misc. tweaks. --- lib/pleroma/migrators/hashtags_table_migrator.ex | 101 +++++++++++++++------ .../migrators/hashtags_table_migrator/state.ex | 4 +- 2 files changed, 74 insertions(+), 31 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index ac17f91cc..45dab8470 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -15,7 +15,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do alias Pleroma.Object alias Pleroma.Repo - defdelegate data_migration(), to: State + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table + defdelegate data_migration_id(), to: State defdelegate state(), to: State defdelegate persist_state(), to: State, as: :persist_to_db @@ -23,10 +24,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defdelegate put_stat(key, value), to: State, as: :put_data_key defdelegate increment_stat(key, increment), to: State, as: :increment_data_key + @feature_config_path [:database, :improved_hashtag_timeline] @reg_name {:global, __MODULE__} def whereis, do: GenServer.whereis(@reg_name) + def feature_state, do: Config.get(@feature_config_path) + def start_link(_) do case whereis() do nil -> @@ -46,8 +50,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - update_status(:pending) - data_migration = data_migration() manual_migrations = Config.get([:instance, :manual_data_migrations], []) @@ -56,10 +58,14 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do update_status(:noop) is_nil(data_migration) -> - update_status(:failed, "Data migration does not exist.") + message = "Data migration does not exist." + update_status(:failed, message) + Logger.error("#{__MODULE__}: #{message}") data_migration.state == :manual or data_migration.name in manual_migrations -> - update_status(:manual, "Data migration is in manual execution state.") + message = "Data migration is in manual execution or manual fix mode." + update_status(:manual, message) + Logger.warn("#{__MODULE__}: #{message}") data_migration.state == :complete -> on_complete(data_migration) @@ -78,7 +84,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do update_status(:running) put_stat(:started_at, NaiveDateTime.utc_now()) - %{id: data_migration_id} = data_migration() + data_migration_id = data_migration_id() max_processed_id = get_stat(:max_processed_id, 0) Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") @@ -89,12 +95,19 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) + results = Enum.map(objects, &transfer_object_hashtags(&1)) + failed_ids = - objects - |> Enum.map(&transfer_object_hashtags(&1)) + results |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) + # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags) + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + for failed_id <- failed_ids do _ = Repo.query( @@ -116,6 +129,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do put_stat(:max_processed_id, max_object_id) increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) + increment_stat(:affected_count, chunk_affected_count) put_stat(:records_per_second, records_per_second()) persist_state() @@ -125,17 +139,42 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) |> Stream.run() - with 0 <- failures_count(data_migration_id) do - _ = delete_non_create_activities_hashtags() - set_complete() - else - _ -> - update_status(:failed, "Please check data_migration_failed_ids records.") + fault_rate = fault_rate() + put_stat(:fault_rate, fault_rate) + fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) + + cond do + fault_rate == 0 -> + set_complete() + + is_float(fault_rate) and fault_rate <= fault_rate_allowance -> + message = """ + Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. + Putting data migration to manual fix mode. Check `retry_failed/0`. + """ + + Logger.warn("#{__MODULE__}: #{message}") + update_status(:manual, message) + on_complete(data_migration()) + + true -> + message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`." + Logger.error("#{__MODULE__}: #{message}") + update_status(:failed, message) end + persist_state() {:noreply, state} end + def fault_rate do + with failures_count when is_integer(failures_count) <- failures_count() do + failures_count / Enum.max([get_stat(:affected_count, 0), 1]) + else + _ -> :error + end + end + defp records_per_second do get_stat(:processed_count, 0) / Enum.max([running_time(), 1]) end @@ -194,6 +233,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) end + @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()} defp transfer_object_hashtags(object) do embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) @@ -201,7 +241,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do if Enum.any?(hashtags) do transfer_object_hashtags(object, hashtags) else - {:ok, object.id} + {:noop, object.id} end end @@ -209,13 +249,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) - expected_rows = length(hashtag_records) - - base_error = - "ERROR when inserting #{expected_rows} hashtags_objects for obj. #{object.id}" + base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}" try do - with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do + with {rows_count, _} when is_integer(rows_count) <- + Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do object.id else e -> @@ -260,11 +298,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do data_migration.feature_lock -> :noop - not is_nil(Config.get([:database, :improved_hashtag_timeline])) -> + not is_nil(feature_state()) -> :noop true -> - Config.put([:database, :improved_hashtag_timeline], true) + Config.put(@feature_config_path, true) :ok end end @@ -274,38 +312,41 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), on: dmf.record_id == o.id ) - |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) |> order_by([o], asc: o.id) end - def failures_count(data_migration_id \\ nil) do - data_migration_id = data_migration_id || data_migration().id - + def failures_count do with {:ok, %{rows: [[count]]}} <- Repo.query( "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration_id] + [data_migration_id()] ) do count end end def retry_failed do - data_migration = data_migration() + data_migration_id = data_migration_id() failed_objects_query() |> Repo.chunk_stream(100, :one) |> Stream.each(fn object -> - with {:ok, _} <- transfer_object_hashtags(object) do + with {res, _} when res != :error <- transfer_object_hashtags(object) do _ = Repo.query( "DELETE FROM data_migration_failed_ids " <> "WHERE data_migration_id = $1 AND record_id = $2", - [data_migration.id, object.id] + [data_migration_id, object.id] ) end end) |> Stream.run() + + put_stat(:failed_count, failures_count()) + persist_state() + + force_continue() end def force_continue do diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index ed9848824..ee0009b2e 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do alias Pleroma.DataMigration - defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator @reg_name {:global, __MODULE__} @@ -99,4 +99,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do defp persist_non_data_change(_, _) do nil end + + def data_migration_id, do: Map.get(state(), :data_migration_id) end -- cgit v1.2.3 From 998437d4a4111055e019f28dd84a8af1f9a27047 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 18 Feb 2021 21:03:06 +0300 Subject: [#3213] Experimental / debug feature: `database: [improved_hashtag_timeline: :preselect_hashtag_ids]`. --- lib/pleroma/web/activity_pub/activity_pub.ex | 47 +++++++++++++++++++++------- 1 file changed, 35 insertions(+), 12 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index e012f2779..5392ce7c9 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -787,19 +787,42 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do - from( - [_activity, object] in query, - where: - fragment( - """ - EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) - AND hashtags_objects.object_id = ? LIMIT 1) - """, - ^tags, - object.id + # TODO: refactor: debug / experimental feature + if Config.get([:database, :improved_hashtag_timeline]) == :preselect_hashtag_ids do + hashtag_ids = + from(ht in Pleroma.Hashtag, + where: fragment("name = ANY(?::citext[])", ^tags), + select: ht.id ) - ) + |> Repo.all() + + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS ( + SELECT 1 FROM hashtags_objects WHERE hashtag_id = ANY(?) AND object_id = ? LIMIT 1) + """, + ^hashtag_ids, + object.id + ) + ) + else + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags, + object.id + ) + ) + end end defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do -- cgit v1.2.3 From 6531eddf361fa52db3906ab011a4e33c7a5f9552 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Mon, 22 Feb 2021 23:26:07 +0300 Subject: [#3213] `hashtags`: altered `name` type to `text`. `hashtags_objects`: removed unused index. HashtagsTableMigrator: records_per_second calculation fix. ActivityPub: hashtags-related options normalization. --- lib/pleroma/hashtag.ex | 22 ++++--- lib/pleroma/migrators/hashtags_table_migrator.ex | 4 +- lib/pleroma/web/activity_pub/activity_pub.ex | 75 ++++++++++-------------- 3 files changed, 49 insertions(+), 52 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index a6d033816..e9d143fb1 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -21,10 +21,14 @@ defmodule Pleroma.Hashtag do timestamps() end + def normalize_name(name) do + name + |> String.downcase() + |> String.trim() + end + def get_by_name(name) do - from(h in Hashtag) - |> where([h], fragment("name = ?::citext", ^String.downcase(name))) - |> Repo.one() + Repo.get_by(Hashtag, name: normalize_name(name)) end def get_or_create_by_name(name) when is_bitstring(name) do @@ -39,7 +43,7 @@ defmodule Pleroma.Hashtag do end def get_or_create_by_names(names) when is_list(names) do - names = Enum.map(names, &String.downcase/1) + names = Enum.map(names, &normalize_name/1) timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) structs = @@ -53,10 +57,12 @@ defmodule Pleroma.Hashtag do try do with {:ok, %{query_op: hashtags}} <- Multi.new() - |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing) + |> Multi.insert_all(:insert_all_op, Hashtag, structs, + on_conflict: :nothing, + conflict_target: :name + ) |> Multi.run(:query_op, fn _repo, _changes -> - {:ok, - Repo.all(from(ht in Hashtag, where: ht.name in fragment("?::citext[]", ^names)))} + {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} end) |> Repo.transaction() do {:ok, hashtags} @@ -71,7 +77,7 @@ defmodule Pleroma.Hashtag do def changeset(%Hashtag{} = struct, params) do struct |> cast(params, [:name]) - |> update_change(:name, &String.downcase/1) + |> update_change(:name, &normalize_name/1) |> validate_required([:name]) |> unique_constraint(:name) end diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 45dab8470..07bb9aeb2 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -82,6 +82,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do State.reinit() update_status(:running) + put_stat(:iteration_processed_count, 0) put_stat(:started_at, NaiveDateTime.utc_now()) data_migration_id = data_migration_id() @@ -127,6 +128,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do max_object_id = Enum.at(object_ids, -1) put_stat(:max_processed_id, max_object_id) + increment_stat(:iteration_processed_count, length(object_ids)) increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) increment_stat(:affected_count, chunk_affected_count) @@ -176,7 +178,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end defp records_per_second do - get_stat(:processed_count, 0) / Enum.max([running_time(), 1]) + get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1]) end defp running_time do diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 5392ce7c9..8182bc205 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -10,6 +10,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Conversation alias Pleroma.Conversation.Participation alias Pleroma.Filter + alias Pleroma.Hashtag alias Pleroma.Maps alias Pleroma.Notification alias Pleroma.Object @@ -698,8 +699,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do - tag_all = Enum.map(tag_all, &String.downcase/1) - from( [_activity, object] in query, where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) @@ -717,8 +716,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do - tag_any = Enum.map(tag_any, &String.downcase/1) - from( [_activity, object] in query, where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any) @@ -736,8 +733,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do - tag_reject = Enum.map(tag_reject, &String.downcase/1) - from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) @@ -766,7 +761,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do fragment( """ (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) AND hashtags_objects.object_id = ?) @> ? """, ^tags, @@ -787,42 +782,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do - # TODO: refactor: debug / experimental feature - if Config.get([:database, :improved_hashtag_timeline]) == :preselect_hashtag_ids do - hashtag_ids = - from(ht in Pleroma.Hashtag, - where: fragment("name = ANY(?::citext[])", ^tags), - select: ht.id + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags, + object.id ) - |> Repo.all() - - from( - [_activity, object] in query, - where: - fragment( - """ - EXISTS ( - SELECT 1 FROM hashtags_objects WHERE hashtag_id = ANY(?) AND object_id = ? LIMIT 1) - """, - ^hashtag_ids, - object.id - ) - ) - else - from( - [_activity, object] in query, - where: - fragment( - """ - EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) - AND hashtags_objects.object_id = ? LIMIT 1) - """, - ^tags, - object.id - ) - ) - end + ) end defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do @@ -842,7 +814,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do fragment( """ NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) AND hashtags_objects.object_id = ? LIMIT 1) """, ^tags_reject, @@ -1220,6 +1192,21 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp maybe_order(query, _), do: query + defp normalize_fetch_activities_query_opts(opts) do + Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts -> + case opts[key] do + value when is_bitstring(value) -> + Map.put(opts, key, Hashtag.normalize_name(value)) + + value when is_list(value) -> + Map.put(opts, key, Enum.map(value, &Hashtag.normalize_name/1)) + + _ -> + opts + end + end) + end + defp fetch_activities_query_ap_ids_ops(opts) do source_user = opts[:muting_user] ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: [] @@ -1243,6 +1230,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end def fetch_activities_query(recipients, opts \\ %{}) do + opts = normalize_fetch_activities_query_opts(opts) + {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} = fetch_activities_query_ap_ids_ops(opts) -- cgit v1.2.3 From 77f3da035894e2add911101466bfe41b99ee481e Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 23 Feb 2021 13:52:28 +0300 Subject: [#3213] Misc. tweaks: proper upsert in Hashtag, better feature toggle management. --- lib/pleroma/config.ex | 4 ++++ lib/pleroma/hashtag.ex | 22 +++++++++------------- lib/pleroma/migrators/hashtags_table_migrator.ex | 18 +++++++----------- lib/pleroma/web/activity_pub/activity_pub.ex | 2 +- 4 files changed, 21 insertions(+), 25 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/config.ex b/lib/pleroma/config.ex index f17e14128..e057d8c02 100644 --- a/lib/pleroma/config.ex +++ b/lib/pleroma/config.ex @@ -111,4 +111,8 @@ defmodule Pleroma.Config do end ) end + + def feature_enabled?(feature_name) do + get([:features, feature_name]) not in [nil, false, :disabled, :auto] + end end diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index e9d143fb1..53e2e9c89 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -27,19 +27,15 @@ defmodule Pleroma.Hashtag do |> String.trim() end - def get_by_name(name) do - Repo.get_by(Hashtag, name: normalize_name(name)) - end - - def get_or_create_by_name(name) when is_bitstring(name) do - with %Hashtag{} = hashtag <- get_by_name(name) do - {:ok, hashtag} - else - _ -> - %Hashtag{} - |> changeset(%{name: name}) - |> Repo.insert() - end + def get_or_create_by_name(name) do + changeset = changeset(%Hashtag{}, %{name: name}) + + Repo.insert( + changeset, + on_conflict: [set: [name: get_field(changeset, :name)]], + conflict_target: :name, + returning: true + ) end def get_or_create_by_names(names) when is_list(names) do diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 07bb9aeb2..6123c88e0 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -24,7 +24,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defdelegate put_stat(key, value), to: State, as: :put_data_key defdelegate increment_stat(key, increment), to: State, as: :increment_data_key - @feature_config_path [:database, :improved_hashtag_timeline] + @feature_config_path [:features, :improved_hashtag_timeline] @reg_name {:global, __MODULE__} def whereis, do: GenServer.whereis(@reg_name) @@ -296,16 +296,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end defp on_complete(data_migration) do - cond do - data_migration.feature_lock -> - :noop - - not is_nil(feature_state()) -> - :noop - - true -> - Config.put(@feature_config_path, true) - :ok + if data_migration.feature_lock || feature_state() == :disabled do + Logger.warn("#{__MODULE__}: migration complete but feature is locked; consider enabling.") + :noop + else + Config.put(@feature_config_path, :enabled) + :ok end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 8182bc205..9d557c2cd 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -1273,7 +1273,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) - if Config.get([:database, :improved_hashtag_timeline]) do + if Config.feature_enabled?(:improved_hashtag_timeline) do query |> restrict_hashtag_any(opts) |> restrict_hashtag_all(opts) -- cgit v1.2.3 From 40d4362261abaf0856a1b4397a4bff6344137120 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 23 Feb 2021 18:11:25 +0300 Subject: [#3213] `mix pleroma.database rollback` tweaks. --- lib/mix/tasks/pleroma/database.ex | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'lib') diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 2136ddb02..e7f4b67a4 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -231,19 +231,18 @@ defmodule Mix.Tasks.Pleroma.Database do re = ~r/^#{version}_.*\.exs/ path = Ecto.Migrator.migrations_path(repo) - with {:find, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))}, - {:compile, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))}, - {:rollback, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do + with {_, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))}, + {_, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))}, + {_, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do {:ok, "Reversed migration: #{file}"} else {:find, _} -> {:error, "No migration found with version prefix: #{version}"} {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"} {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"} - e -> {:error, "Something unexpected happened: #{inspect(e)}"} end end) - IO.inspect(result) + shell_info(inspect(result)) end end end -- cgit v1.2.3 From 5856f51717c12f4c6b0b89e480ff689c8480393d Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Wed, 3 Mar 2021 23:09:30 +0300 Subject: [#3213] ActivityPub hashtags filtering refactoring. Test fix. --- lib/pleroma/repo.ex | 2 ++ lib/pleroma/web/activity_pub/activity_pub.ex | 29 +++++++++------------------- 2 files changed, 11 insertions(+), 20 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex index 61b64ed3e..b8ea06e33 100644 --- a/lib/pleroma/repo.ex +++ b/lib/pleroma/repo.ex @@ -8,6 +8,8 @@ defmodule Pleroma.Repo do adapter: Ecto.Adapters.Postgres, migration_timestamps: [type: :naive_datetime_usec] + use Ecto.Explain + import Ecto.Query require Logger diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 9d557c2cd..a4b48ec9b 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -746,6 +746,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_embedded_tag_reject_any(query, _), do: query + defp object_ids_query_for_tags(tags) do + from(hto in "hashtags_objects") + |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id) + |> where([hto, ht], ht.name in ^tags) + |> select([hto], hto.object_id) + end + defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do raise_on_missing_preload() end @@ -784,16 +791,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do from( [_activity, object] in query, - where: - fragment( - """ - EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) - AND hashtags_objects.object_id = ? LIMIT 1) - """, - ^tags, - object.id - ) + where: object.id in subquery(object_ids_query_for_tags(tags)) ) end @@ -810,16 +808,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do from( [_activity, object] in query, - where: - fragment( - """ - NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) - AND hashtags_objects.object_id = ? LIMIT 1) - """, - ^tags_reject, - object.id - ) + where: object.id not in subquery(object_ids_query_for_tags(tags_reject)) ) end -- cgit v1.2.3 From 7f8785fd9be11fbb09283c2dbd32aeb7903a4f58 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 7 Mar 2021 11:33:21 +0300 Subject: [#3213] Performance optimization of filtering by hashtags ("any" condition). --- lib/pleroma/pagination.ex | 3 ++ lib/pleroma/web/activity_pub/activity_pub.ex | 47 ++++++++++++++++------ .../controllers/timeline_controller.ex | 43 ++++++++------------ 3 files changed, 55 insertions(+), 38 deletions(-) (limited to 'lib') diff --git a/lib/pleroma/pagination.ex b/lib/pleroma/pagination.ex index 0d24e1010..33e45a0eb 100644 --- a/lib/pleroma/pagination.ex +++ b/lib/pleroma/pagination.ex @@ -93,6 +93,7 @@ defmodule Pleroma.Pagination do max_id: :string, offset: :integer, limit: :integer, + skip_extra_order: :boolean, skip_order: :boolean } @@ -114,6 +115,8 @@ defmodule Pleroma.Pagination do defp restrict(query, :order, %{skip_order: true}, _), do: query + defp restrict(%{order_bys: [_ | _]} = query, :order, %{skip_extra_order: true}, _), do: query + defp restrict(query, :order, %{min_id: _}, table_binding) do order_by( query, diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index a4b48ec9b..230faf024 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -466,6 +466,23 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> Repo.one() end + defp fetch_paginated_optimized(query, opts, pagination) do + # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC", + # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan + opts = Map.put(opts, :skip_extra_order, true) + + Pagination.fetch_paginated(query, opts, pagination) + end + + def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do + list_memberships = Pleroma.List.memberships(opts[:user]) + + fetch_activities_query(recipients ++ list_memberships, opts) + |> fetch_paginated_optimized(opts, pagination) + |> Enum.reverse() + |> maybe_update_cc(list_memberships, opts[:user]) + end + @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()] def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do opts = Map.delete(opts, :user) @@ -473,7 +490,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do [Constants.as_public()] |> fetch_activities_query(opts) |> restrict_unlisted(opts) - |> Pagination.fetch_paginated(opts, pagination) + |> fetch_paginated_optimized(opts, pagination) end @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()] @@ -751,6 +768,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id) |> where([hto, ht], ht.name in ^tags) |> select([hto], hto.object_id) + |> distinct([hto], true) end defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do @@ -789,9 +807,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do + hashtag_ids = + from(ht in Hashtag, where: ht.name in ^tags, select: ht.id) + |> Repo.all() + + # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan from( [_activity, object] in query, - where: object.id in subquery(object_ids_query_for_tags(tags)) + join: hto in "hashtags_objects", + on: hto.object_id == object.id, + where: hto.hashtag_id in ^hashtag_ids, + distinct: [desc: object.id], + order_by: [desc: object.id] ) end @@ -1188,7 +1215,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do Map.put(opts, key, Hashtag.normalize_name(value)) value when is_list(value) -> - Map.put(opts, key, Enum.map(value, &Hashtag.normalize_name/1)) + normalized_value = + value + |> Enum.map(&Hashtag.normalize_name/1) + |> Enum.uniq() + + Map.put(opts, key, normalized_value) _ -> opts @@ -1275,15 +1307,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end - def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do - list_memberships = Pleroma.List.memberships(opts[:user]) - - fetch_activities_query(recipients ++ list_memberships, opts) - |> Pagination.fetch_paginated(opts, pagination) - |> Enum.reverse() - |> maybe_update_cc(list_memberships, opts[:user]) - end - @doc """ Fetch favorites activities of user with order by sort adds to favorites """ diff --git a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex index 87effa00b..c611958be 100644 --- a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex @@ -133,34 +133,25 @@ defmodule Pleroma.Web.MastodonAPI.TimelineController do end defp hashtag_fetching(params, user, local_only) do - tags = + # Note: not sanitizing tag options at this stage (may be mix-cased, have duplicates etc.) + tags_any = [params[:tag], params[:any]] |> List.flatten() - |> Enum.reject(&is_nil/1) - |> Enum.map(&String.downcase/1) - |> Enum.uniq() - - tag_all = - params - |> Map.get(:all, []) - |> Enum.map(&String.downcase/1) - - tag_reject = - params - |> Map.get(:none, []) - |> Enum.map(&String.downcase/1) - - _activities = - params - |> Map.put(:type, "Create") - |> Map.put(:local_only, local_only) - |> Map.put(:blocking_user, user) - |> Map.put(:muting_user, user) - |> Map.put(:user, user) - |> Map.put(:tag, tags) - |> Map.put(:tag_all, tag_all) - |> Map.put(:tag_reject, tag_reject) - |> ActivityPub.fetch_public_activities() + |> Enum.filter(& &1) + + tag_all = Map.get(params, :all, []) + tag_reject = Map.get(params, :none, []) + + params + |> Map.put(:type, "Create") + |> Map.put(:local_only, local_only) + |> Map.put(:blocking_user, user) + |> Map.put(:muting_user, user) + |> Map.put(:user, user) + |> Map.put(:tag, tags_any) + |> Map.put(:tag_all, tag_all) + |> Map.put(:tag_reject, tag_reject) + |> ActivityPub.fetch_public_activities() end # GET /api/v1/timelines/tag/:tag -- cgit v1.2.3 From 3edf45021eb6c3fba06bc083b346f7db54cd073f Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Fri, 12 Mar 2021 12:18:11 +0300 Subject: [#3213] Background migration infrastructure refactoring. Extracted BaseMigrator and BaseMigratorState. --- lib/pleroma/application.ex | 11 +- lib/pleroma/migrators/hashtags_table_migrator.ex | 265 ++++----------------- .../migrators/hashtags_table_migrator/state.ex | 104 -------- lib/pleroma/migrators/support/base_migrator.ex | 210 ++++++++++++++++ .../migrators/support/base_migrator_state.ex | 116 +++++++++ 5 files changed, 385 insertions(+), 321 deletions(-) delete mode 100644 lib/pleroma/migrators/hashtags_table_migrator/state.ex create mode 100644 lib/pleroma/migrators/support/base_migrator.ex create mode 100644 lib/pleroma/migrators/support/base_migrator_state.ex (limited to 'lib') diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 2ff7562e2..06d399b2e 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -103,10 +103,7 @@ defmodule Pleroma.Application do task_children(@mix_env) ++ dont_run_in_test(@mix_env) ++ chat_child(chat_enabled?()) ++ - [ - Pleroma.Migrators.HashtagsTableMigrator, - Pleroma.Gopher.Server - ] + [Pleroma.Gopher.Server] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options @@ -231,6 +228,12 @@ defmodule Pleroma.Application do keys: :duplicate, partitions: System.schedulers_online() ]} + ] ++ background_migrators() + end + + defp background_migrators do + [ + Pleroma.Migrators.HashtagsTableMigrator ] end diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 6123c88e0..b84058e11 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -3,88 +3,27 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Migrators.HashtagsTableMigrator do - use GenServer + defmodule State do + use Pleroma.Migrators.Support.BaseMigratorState - require Logger + @impl Pleroma.Migrators.Support.BaseMigratorState + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table + end - import Ecto.Query + use Pleroma.Migrators.Support.BaseMigrator - alias __MODULE__.State - alias Pleroma.Config alias Pleroma.Hashtag + alias Pleroma.Migrators.Support.BaseMigrator alias Pleroma.Object - alias Pleroma.Repo - - defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table - defdelegate data_migration_id(), to: State - - defdelegate state(), to: State - defdelegate persist_state(), to: State, as: :persist_to_db - defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key - defdelegate put_stat(key, value), to: State, as: :put_data_key - defdelegate increment_stat(key, increment), to: State, as: :increment_data_key - - @feature_config_path [:features, :improved_hashtag_timeline] - @reg_name {:global, __MODULE__} - - def whereis, do: GenServer.whereis(@reg_name) - - def feature_state, do: Config.get(@feature_config_path) - - def start_link(_) do - case whereis() do - nil -> - GenServer.start_link(__MODULE__, nil, name: @reg_name) - - pid -> - {:ok, pid} - end - end - - @impl true - def init(_) do - {:ok, nil, {:continue, :init_state}} - end - @impl true - def handle_continue(:init_state, _state) do - {:ok, _} = State.start_link(nil) + @impl BaseMigrator + def feature_config_path, do: [:features, :improved_hashtag_timeline] - data_migration = data_migration() - manual_migrations = Config.get([:instance, :manual_data_migrations], []) - - cond do - Config.get(:env) == :test -> - update_status(:noop) - - is_nil(data_migration) -> - message = "Data migration does not exist." - update_status(:failed, message) - Logger.error("#{__MODULE__}: #{message}") - - data_migration.state == :manual or data_migration.name in manual_migrations -> - message = "Data migration is in manual execution or manual fix mode." - update_status(:manual, message) - Logger.warn("#{__MODULE__}: #{message}") - - data_migration.state == :complete -> - on_complete(data_migration) - - true -> - send(self(), :migrate_hashtags) - end - - {:noreply, nil} - end - - @impl true - def handle_info(:migrate_hashtags, state) do - State.reinit() - - update_status(:running) - put_stat(:iteration_processed_count, 0) - put_stat(:started_at, NaiveDateTime.utc_now()) + @impl BaseMigrator + def fault_rate_allowance, do: Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) + @impl BaseMigrator + def perform do data_migration_id = data_migration_id() max_processed_id = get_stat(:max_processed_id, 0) @@ -103,7 +42,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) - # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags) + # Count of objects with hashtags: `{:noop, id}` is returned for objects having other AS2 tags chunk_affected_count = results |> Enum.filter(&(elem(&1, 0) == :ok)) @@ -140,84 +79,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do Process.sleep(sleep_interval) end) |> Stream.run() - - fault_rate = fault_rate() - put_stat(:fault_rate, fault_rate) - fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) - - cond do - fault_rate == 0 -> - set_complete() - - is_float(fault_rate) and fault_rate <= fault_rate_allowance -> - message = """ - Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. - Putting data migration to manual fix mode. Check `retry_failed/0`. - """ - - Logger.warn("#{__MODULE__}: #{message}") - update_status(:manual, message) - on_complete(data_migration()) - - true -> - message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`." - Logger.error("#{__MODULE__}: #{message}") - update_status(:failed, message) - end - - persist_state() - {:noreply, state} - end - - def fault_rate do - with failures_count when is_integer(failures_count) <- failures_count() do - failures_count / Enum.max([get_stat(:affected_count, 0), 1]) - else - _ -> :error - end - end - - defp records_per_second do - get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1]) - end - - defp running_time do - NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now())) - end - - @hashtags_objects_cleanup_query """ - DELETE FROM hashtags_objects WHERE object_id IN - (SELECT DISTINCT objects.id FROM objects - JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities - ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = - (objects.data->>'id') - AND activities.data->>'type' = 'Create' - WHERE activities.id IS NULL); - """ - - @hashtags_cleanup_query """ - DELETE FROM hashtags WHERE id IN - (SELECT hashtags.id FROM hashtags - LEFT OUTER JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id - WHERE hashtags_objects.hashtag_id IS NULL); - """ - - @doc """ - Deletes `hashtags_objects` for legacy objects not asoociated with Create activity. - Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). - """ - def delete_non_create_activities_hashtags do - {:ok, %{num_rows: hashtags_objects_count}} = - Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) - - {:ok, %{num_rows: hashtags_count}} = - Repo.query(@hashtags_cleanup_query, [], timeout: :infinity) - - {:ok, hashtags_objects_count, hashtags_count} end - defp query do + @impl BaseMigrator + def query do # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up from( @@ -276,54 +141,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do end) end - @doc "Approximate count for current iteration (including processed records count)" - def count(force \\ false, timeout \\ :infinity) do - stored_count = get_stat(:count) - - if stored_count && !force do - stored_count - else - processed_count = get_stat(:processed_count, 0) - max_processed_id = get_stat(:max_processed_id, 0) - query = where(query(), [object], object.id > ^max_processed_id) - - count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count - put_stat(:count, count) - persist_state() - - count - end - end - - defp on_complete(data_migration) do - if data_migration.feature_lock || feature_state() == :disabled do - Logger.warn("#{__MODULE__}: migration complete but feature is locked; consider enabling.") - :noop - else - Config.put(@feature_config_path, :enabled) - :ok - end - end - - def failed_objects_query do - from(o in Object) - |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), - on: dmf.record_id == o.id - ) - |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) - |> order_by([o], asc: o.id) - end - - def failures_count do - with {:ok, %{rows: [[count]]}} <- - Repo.query( - "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration_id()] - ) do - count - end - end - + @impl BaseMigrator def retry_failed do data_migration_id = data_migration_id() @@ -347,23 +165,44 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do force_continue() end - def force_continue do - send(whereis(), :migrate_hashtags) + defp failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) + |> order_by([o], asc: o.id) end - def force_restart do - :ok = State.reset() - force_continue() - end + @doc """ + Service func to delete `hashtags_objects` for legacy objects not associated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ + def delete_non_create_activities_hashtags do + hashtags_objects_cleanup_query = """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ + + hashtags_cleanup_query = """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL); + """ - def set_complete do - update_status(:complete) - persist_state() - on_complete(data_migration()) - end + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(hashtags_objects_cleanup_query, [], timeout: :infinity) - defp update_status(status, message \\ nil) do - put_stat(:state, status) - put_stat(:message, message) + {:ok, %{num_rows: hashtags_count}} = + Repo.query(hashtags_cleanup_query, [], timeout: :infinity) + + {:ok, hashtags_objects_count, hashtags_count} end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex deleted file mode 100644 index ee0009b2e..000000000 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ /dev/null @@ -1,104 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Migrators.HashtagsTableMigrator.State do - use Agent - - alias Pleroma.DataMigration - - defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator - - @reg_name {:global, __MODULE__} - - def start_link(_) do - Agent.start_link(fn -> load_state_from_db() end, name: @reg_name) - end - - defp load_state_from_db do - data_migration = data_migration() - - data = - if data_migration do - Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end) - else - %{} - end - - %{ - data_migration_id: data_migration && data_migration.id, - data: data - } - end - - def persist_to_db do - %{data_migration_id: data_migration_id, data: data} = state() - - if data_migration_id do - DataMigration.update_one_by_id(data_migration_id, data: data) - else - {:error, :nil_data_migration_id} - end - end - - def reset do - %{data_migration_id: data_migration_id} = state() - - with false <- is_nil(data_migration_id), - :ok <- - DataMigration.update_one_by_id(data_migration_id, - state: :pending, - data: %{} - ) do - reinit() - else - true -> {:error, :nil_data_migration_id} - e -> e - end - end - - def reinit do - Agent.update(@reg_name, fn _state -> load_state_from_db() end) - end - - def state do - Agent.get(@reg_name, & &1) - end - - def get_data_key(key, default \\ nil) do - get_in(state(), [:data, key]) || default - end - - def put_data_key(key, value) do - _ = persist_non_data_change(key, value) - - Agent.update(@reg_name, fn state -> - put_in(state, [:data, key], value) - end) - end - - def increment_data_key(key, increment \\ 1) do - Agent.update(@reg_name, fn state -> - initial_value = get_in(state, [:data, key]) || 0 - updated_value = initial_value + increment - put_in(state, [:data, key], updated_value) - end) - end - - defp persist_non_data_change(:state, value) do - with true <- get_data_key(:state) != value, - true <- value in Pleroma.DataMigration.State.__valid_values__(), - %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do - DataMigration.update_one_by_id(data_migration_id, state: value) - else - false -> :ok - _ -> {:error, :nil_data_migration_id} - end - end - - defp persist_non_data_change(_, _) do - nil - end - - def data_migration_id, do: Map.get(state(), :data_migration_id) -end diff --git a/lib/pleroma/migrators/support/base_migrator.ex b/lib/pleroma/migrators/support/base_migrator.ex new file mode 100644 index 000000000..1f8a5402b --- /dev/null +++ b/lib/pleroma/migrators/support/base_migrator.ex @@ -0,0 +1,210 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.Support.BaseMigrator do + @moduledoc """ + Base background migrator functionality. + """ + + @callback perform() :: any() + @callback retry_failed() :: any() + @callback feature_config_path() :: list(atom()) + @callback query() :: Ecto.Query.t() + @callback fault_rate_allowance() :: integer() | float() + + defmacro __using__(_opts) do + quote do + use GenServer + + require Logger + + import Ecto.Query + + alias __MODULE__.State + alias Pleroma.Config + alias Pleroma.Repo + + @behaviour Pleroma.Migrators.Support.BaseMigrator + + defdelegate data_migration(), to: State + defdelegate data_migration_id(), to: State + defdelegate state(), to: State + defdelegate persist_state(), to: State, as: :persist_to_db + defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key + defdelegate put_stat(key, value), to: State, as: :put_data_key + defdelegate increment_stat(key, increment), to: State, as: :increment_data_key + + @reg_name {:global, __MODULE__} + + def whereis, do: GenServer.whereis(@reg_name) + + def start_link(_) do + case whereis() do + nil -> + GenServer.start_link(__MODULE__, nil, name: @reg_name) + + pid -> + {:ok, pid} + end + end + + @impl true + def init(_) do + {:ok, nil, {:continue, :init_state}} + end + + @impl true + def handle_continue(:init_state, _state) do + {:ok, _} = State.start_link(nil) + + data_migration = data_migration() + manual_migrations = Config.get([:instance, :manual_data_migrations], []) + + cond do + Config.get(:env) == :test -> + update_status(:noop) + + is_nil(data_migration) -> + message = "Data migration does not exist." + update_status(:failed, message) + Logger.error("#{__MODULE__}: #{message}") + + data_migration.state == :manual or data_migration.name in manual_migrations -> + message = "Data migration is in manual execution or manual fix mode." + update_status(:manual, message) + Logger.warn("#{__MODULE__}: #{message}") + + data_migration.state == :complete -> + on_complete(data_migration) + + true -> + send(self(), :perform) + end + + {:noreply, nil} + end + + @impl true + def handle_info(:perform, state) do + State.reinit() + + update_status(:running) + put_stat(:iteration_processed_count, 0) + put_stat(:started_at, NaiveDateTime.utc_now()) + + perform() + + fault_rate = fault_rate() + put_stat(:fault_rate, fault_rate) + fault_rate_allowance = fault_rate_allowance() + + cond do + fault_rate == 0 -> + set_complete() + + is_float(fault_rate) and fault_rate <= fault_rate_allowance -> + message = """ + Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. + Putting data migration to manual fix mode. Try running `#{__MODULE__}.retry_failed/0`. + """ + + Logger.warn("#{__MODULE__}: #{message}") + update_status(:manual, message) + on_complete(data_migration()) + + true -> + message = "Too many failures. Try running `#{__MODULE__}.retry_failed/0`." + Logger.error("#{__MODULE__}: #{message}") + update_status(:failed, message) + end + + persist_state() + {:noreply, state} + end + + defp on_complete(data_migration) do + if data_migration.feature_lock || feature_state() == :disabled do + Logger.warn( + "#{__MODULE__}: migration complete but feature is locked; consider enabling." + ) + + :noop + else + Config.put(feature_config_path(), :enabled) + :ok + end + end + + @doc "Approximate count for current iteration (including processed records count)" + def count(force \\ false, timeout \\ :infinity) do + stored_count = get_stat(:count) + + if stored_count && !force do + stored_count + else + processed_count = get_stat(:processed_count, 0) + max_processed_id = get_stat(:max_processed_id, 0) + query = where(query(), [entity], entity.id > ^max_processed_id) + + count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count + put_stat(:count, count) + persist_state() + + count + end + end + + def failures_count do + with {:ok, %{rows: [[count]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration_id()] + ) do + count + end + end + + def feature_state, do: Config.get(feature_config_path()) + + def force_continue do + send(whereis(), :perform) + end + + def force_restart do + :ok = State.reset() + force_continue() + end + + def set_complete do + update_status(:complete) + persist_state() + on_complete(data_migration()) + end + + defp update_status(status, message \\ nil) do + put_stat(:state, status) + put_stat(:message, message) + end + + defp fault_rate do + with failures_count when is_integer(failures_count) <- failures_count() do + failures_count / Enum.max([get_stat(:affected_count, 0), 1]) + else + _ -> :error + end + end + + defp records_per_second do + get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1]) + end + + defp running_time do + NaiveDateTime.diff( + NaiveDateTime.utc_now(), + get_stat(:started_at, NaiveDateTime.utc_now()) + ) + end + end + end +end diff --git a/lib/pleroma/migrators/support/base_migrator_state.ex b/lib/pleroma/migrators/support/base_migrator_state.ex new file mode 100644 index 000000000..69724ae79 --- /dev/null +++ b/lib/pleroma/migrators/support/base_migrator_state.ex @@ -0,0 +1,116 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.Support.BaseMigratorState do + @moduledoc """ + Base background migrator state functionality. + """ + + @callback data_migration() :: Pleroma.DataMigration.t() + + defmacro __using__(_opts) do + quote do + use Agent + + alias Pleroma.DataMigration + + @behaviour Pleroma.Migrators.Support.BaseMigratorState + @reg_name {:global, __MODULE__} + + def start_link(_) do + Agent.start_link(fn -> load_state_from_db() end, name: @reg_name) + end + + def data_migration, do: raise("data_migration/0 is not implemented") + defoverridable data_migration: 0 + + defp load_state_from_db do + data_migration = data_migration() + + data = + if data_migration do + Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end) + else + %{} + end + + %{ + data_migration_id: data_migration && data_migration.id, + data: data + } + end + + def persist_to_db do + %{data_migration_id: data_migration_id, data: data} = state() + + if data_migration_id do + DataMigration.update_one_by_id(data_migration_id, data: data) + else + {:error, :nil_data_migration_id} + end + end + + def reset do + %{data_migration_id: data_migration_id} = state() + + with false <- is_nil(data_migration_id), + :ok <- + DataMigration.update_one_by_id(data_migration_id, + state: :pending, + data: %{} + ) do + reinit() + else + true -> {:error, :nil_data_migration_id} + e -> e + end + end + + def reinit do + Agent.update(@reg_name, fn _state -> load_state_from_db() end) + end + + def state do + Agent.get(@reg_name, & &1) + end + + def get_data_key(key, default \\ nil) do + get_in(state(), [:data, key]) || default + end + + def put_data_key(key, value) do + _ = persist_non_data_change(key, value) + + Agent.update(@reg_name, fn state -> + put_in(state, [:data, key], value) + end) + end + + def increment_data_key(key, increment \\ 1) do + Agent.update(@reg_name, fn state -> + initial_value = get_in(state, [:data, key]) || 0 + updated_value = initial_value + increment + put_in(state, [:data, key], updated_value) + end) + end + + defp persist_non_data_change(:state, value) do + with true <- get_data_key(:state) != value, + true <- value in Pleroma.DataMigration.State.__valid_values__(), + %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do + DataMigration.update_one_by_id(data_migration_id, state: value) + else + false -> :ok + _ -> {:error, :nil_data_migration_id} + end + end + + defp persist_non_data_change(_, _) do + nil + end + + def data_migration_id, do: Map.get(state(), :data_migration_id) + end + end +end -- cgit v1.2.3 From cb734566093f406fc3db12de2408fc166486f417 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Fri, 12 Mar 2021 12:25:18 +0300 Subject: [#3213] Code formatting fix. --- lib/pleroma/migrators/support/base_migrator_state.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/pleroma/migrators/support/base_migrator_state.ex b/lib/pleroma/migrators/support/base_migrator_state.ex index 69724ae79..b698587f2 100644 --- a/lib/pleroma/migrators/support/base_migrator_state.ex +++ b/lib/pleroma/migrators/support/base_migrator_state.ex @@ -98,7 +98,8 @@ defmodule Pleroma.Migrators.Support.BaseMigratorState do defp persist_non_data_change(:state, value) do with true <- get_data_key(:state) != value, true <- value in Pleroma.DataMigration.State.__valid_values__(), - %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do + %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- + state() do DataMigration.update_one_by_id(data_migration_id, state: value) else false -> :ok -- cgit v1.2.3