diff options
| -rw-r--r-- | lib/pleroma/repo_streamer.ex | 34 | ||||
| -rw-r--r-- | lib/pleroma/user.ex | 46 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/utils.ex | 18 | ||||
| -rw-r--r-- | test/tasks/user_test.exs | 3 | ||||
| -rw-r--r-- | test/user_test.exs | 36 | 
5 files changed, 99 insertions, 38 deletions
diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex new file mode 100644 index 000000000..a4b71a1bb --- /dev/null +++ b/lib/pleroma/repo_streamer.ex @@ -0,0 +1,34 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.RepoStreamer do +  alias Pleroma.Repo +  import Ecto.Query + +  def chunk_stream(query, chunk_size) do +    Stream.unfold(0, fn +      :halt -> +        {[], :halt} + +      last_id -> +        query +        |> order_by(asc: :id) +        |> where([r], r.id > ^last_id) +        |> limit(^chunk_size) +        |> Repo.all() +        |> case do +          [] -> +            {[], :halt} + +          records -> +            last_id = List.last(records).id +            {records, last_id} +        end +    end) +    |> Stream.take_while(fn +      [] -> false +      _ -> true +    end) +  end +end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 3a9ae8d73..1e59a4121 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -15,6 +15,7 @@ defmodule Pleroma.User do    alias Pleroma.Object    alias Pleroma.Registration    alias Pleroma.Repo +  alias Pleroma.RepoStreamer    alias Pleroma.User    alias Pleroma.Web    alias Pleroma.Web.ActivityPub.ActivityPub @@ -932,18 +933,24 @@ defmodule Pleroma.User do    @spec perform(atom(), User.t()) :: {:ok, User.t()}    def perform(:delete, %User{} = user) do -    {:ok, user} = User.deactivate(user) -      # Remove all relationships      {:ok, followers} = User.get_followers(user) -    Enum.each(followers, fn follower -> User.unfollow(follower, user) end) +    Enum.each(followers, fn follower -> +      ActivityPub.unfollow(follower, user) +      User.unfollow(follower, user) +    end)      {:ok, friends} = User.get_friends(user) -    Enum.each(friends, fn followed -> User.unfollow(user, followed) end) +    Enum.each(friends, fn followed -> +      ActivityPub.unfollow(user, followed) +      User.unfollow(user, followed) +    end)      delete_user_activities(user) + +    {:ok, _user} = Repo.delete(user)    end    @spec perform(atom(), User.t()) :: {:ok, User.t()} @@ -1016,18 +1023,35 @@ defmodule Pleroma.User do        ])    def delete_user_activities(%User{ap_id: ap_id} = user) do -    stream = -      ap_id -      |> Activity.query_by_actor() -      |> Repo.stream() - -    Repo.transaction(fn -> Enum.each(stream, &delete_activity(&1)) end, timeout: :infinity) +    ap_id +    |> Activity.query_by_actor() +    |> RepoStreamer.chunk_stream(50) +    |> Stream.each(fn activities -> +      Enum.each(activities, &delete_activity(&1)) +    end) +    |> Stream.run()      {:ok, user}    end    defp delete_activity(%{data: %{"type" => "Create"}} = activity) do -    Object.normalize(activity) |> ActivityPub.delete() +    activity +    |> Object.normalize() +    |> ActivityPub.delete() +  end + +  defp delete_activity(%{data: %{"type" => "Like"}} = activity) do +    user = get_cached_by_ap_id(activity.actor) +    object = Object.normalize(activity) + +    ActivityPub.unlike(user, object) +  end + +  defp delete_activity(%{data: %{"type" => "Announce"}} = activity) do +    user = get_cached_by_ap_id(activity.actor) +    object = Object.normalize(activity) + +    ActivityPub.unannounce(user, object)    end    defp delete_activity(_activity), do: "Doing nothing" diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 10ff572a2..514266cee 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -151,16 +151,18 @@ defmodule Pleroma.Web.ActivityPub.Utils do    def create_context(context) do      context = context || generate_id("contexts") -    changeset = Object.context_mapping(context) -    case Repo.insert(changeset) do -      {:ok, object} -> -        object +    # Ecto has problems accessing the constraint inside the jsonb, +    # so we explicitly check for the existed object before insert +    object = Object.get_cached_by_ap_id(context) -      # This should be solved by an upsert, but it seems ecto -      # has problems accessing the constraint inside the jsonb. -      {:error, _} -> -        Object.get_cached_by_ap_id(context) +    with true <- is_nil(object), +         changeset <- Object.context_mapping(context), +         {:ok, inserted_object} <- Repo.insert(changeset) do +      inserted_object +    else +      _ -> +        object      end    end diff --git a/test/tasks/user_test.exs b/test/tasks/user_test.exs index 6fd7c7113..3d4b08fba 100644 --- a/test/tasks/user_test.exs +++ b/test/tasks/user_test.exs @@ -89,8 +89,7 @@ defmodule Mix.Tasks.Pleroma.UserTest do        assert_received {:mix_shell, :info, [message]}        assert message =~ " deleted" -      user = User.get_cached_by_nickname(user.nickname) -      assert user.info.deactivated +      refute User.get_by_nickname(user.nickname)      end      test "no user to delete" do diff --git a/test/user_test.exs b/test/user_test.exs index a8176025c..198a97fae 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -920,42 +920,44 @@ defmodule Pleroma.UserTest do      {:ok, activity} = CommonAPI.post(user, %{"status" => "2hu"}) -    Ecto.Adapters.SQL.Sandbox.unboxed_run(Repo, fn -> -      {:ok, _} = User.delete_user_activities(user) -      # TODO: Remove favorites, repeats, delete activities. -      refute Activity.get_by_id(activity.id) -    end) +    {:ok, _} = User.delete_user_activities(user) + +    # TODO: Remove favorites, repeats, delete activities. +    refute Activity.get_by_id(activity.id)    end -  test ".delete deactivates a user, all follow relationships and all create activities" do +  test ".delete deactivates a user, all follow relationships and all activities" do      user = insert(:user) -    followed = insert(:user)      follower = insert(:user) -    {:ok, user} = User.follow(user, followed)      {:ok, follower} = User.follow(follower, user)      {:ok, activity} = CommonAPI.post(user, %{"status" => "2hu"})      {:ok, activity_two} = CommonAPI.post(follower, %{"status" => "3hu"}) -    {:ok, _, _} = CommonAPI.favorite(activity_two.id, user) -    {:ok, _, _} = CommonAPI.favorite(activity.id, follower) -    {:ok, _, _} = CommonAPI.repeat(activity.id, follower) +    {:ok, like, _} = CommonAPI.favorite(activity_two.id, user) +    {:ok, like_two, _} = CommonAPI.favorite(activity.id, follower) +    {:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user)      {:ok, _} = User.delete(user) -    followed = User.get_cached_by_id(followed.id)      follower = User.get_cached_by_id(follower.id) -    user = User.get_cached_by_id(user.id) -    assert user.info.deactivated +    refute User.following?(follower, user) +    refute User.get_by_id(user.id) -    refute User.following?(user, followed) -    refute User.following?(followed, follower) +    user_activities = +      user.ap_id +      |> Activity.query_by_actor() +      |> Repo.all() +      |> Enum.map(fn act -> act.data["type"] end) -    # TODO: Remove favorites, repeats, delete activities. +    assert Enum.all?(user_activities, fn act -> act in ~w(Delete Undo) end)      refute Activity.get_by_id(activity.id) +    refute Activity.get_by_id(like.id) +    refute Activity.get_by_id(like_two.id) +    refute Activity.get_by_id(repeat.id)    end    test "get_public_key_for_ap_id fetches a user that's not in the db" do  | 
