diff options
author | Mark Felder <feld@feld.me> | 2024-08-22 12:49:32 -0400 |
---|---|---|
committer | Mark Felder <feld@feld.me> | 2024-08-22 13:24:07 -0400 |
commit | a9aa810d3dadaac5a40d18f56ab41b6276206db1 (patch) | |
tree | f8c4b058db16c284ff3971eda74d568adb494b9f /lib | |
parent | 649e51b581327eb34d31e0160ea70d1cba281f9a (diff) | |
download | pleroma-a9aa810d3dadaac5a40d18f56ab41b6276206db1.tar.gz pleroma-a9aa810d3dadaac5a40d18f56ab41b6276206db1.zip |
Change imports to generate an Oban job per each task
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/user/import.ex | 144 | ||||
-rw-r--r-- | lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex | 8 | ||||
-rw-r--r-- | lib/pleroma/workers/background_worker.ex | 6 |
3 files changed, 91 insertions, 67 deletions
diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex index bee586234..400e62153 100644 --- a/lib/pleroma/user/import.ex +++ b/lib/pleroma/user/import.ex @@ -5,6 +5,7 @@ defmodule Pleroma.User.Import do use Ecto.Schema + alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.CommonAPI alias Pleroma.Workers.BackgroundWorker @@ -12,80 +13,103 @@ defmodule Pleroma.User.Import do require Logger @spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()} - def perform(:mutes_import, %User{} = user, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier), - {:ok, _} <- User.mute(user, muted_user) do - {:ok, muted_user} - else - error -> handle_error(:mutes_import, identifier, error) - end - end - ) + def perform(:mute_import, %User{} = user, actor) do + with {:ok, %User{} = muted_user} <- User.get_or_fetch(actor), + {_, false} <- {:existing_mute, User.mutes_user?(user, muted_user)}, + {:ok, _} <- User.mute(user, muted_user), + # User.mute/2 returns a FollowingRelationship not a %User{} like we get + # from CommonAPI.block/2 or CommonAPI.follow/2, so we fetch again to + # return the target actor for consistency + {:ok, muted_user} <- User.get_or_fetch(actor) do + {:ok, muted_user} + else + {:existing_mute, true} -> :ok + error -> handle_error(:mutes_import, actor, error) + end end - def perform(:blocks_import, %User{} = blocker, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier), - {:ok, _block} <- CommonAPI.block(blocked, blocker) do - {:ok, blocked} - else - error -> handle_error(:blocks_import, identifier, error) - end - end - ) + def perform(:block_import, %User{} = user, actor) do + with {:ok, %User{} = blocked} <- User.get_or_fetch(actor), + {_, false} <- {:existing_block, User.blocks_user?(user, blocked)}, + {:ok, _block} <- CommonAPI.block(blocked, user) do + {:ok, blocked} + else + {:existing_block, true} -> :ok + error -> handle_error(:blocks_import, actor, error) + end end - def perform(:follow_import, %User{} = follower, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = followed} <- User.get_or_fetch(identifier), - {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed), - {:ok, _, _, _} <- CommonAPI.follow(followed, follower) do - {:ok, followed} - else - error -> handle_error(:follow_import, identifier, error) - end - end - ) + def perform(:follow_import, %User{} = user, actor) do + with {:ok, %User{} = followed} <- User.get_or_fetch(actor), + {_, false} <- {:existing_follow, User.following?(user, followed)}, + {:ok, user, followed} <- User.maybe_direct_follow(user, followed), + {:ok, _, _, _} <- CommonAPI.follow(followed, user) do + {:ok, followed} + else + {:existing_follow, true} -> :ok + error -> handle_error(:follow_import, actor, error) + end end - def perform(_, _, _), do: :ok - defp handle_error(op, user_id, error) do Logger.debug("#{op} failed for #{user_id} with: #{inspect(error)}") error end - def blocks_import(%User{} = blocker, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "blocks_import", - "user_id" => blocker.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def blocks_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "block_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end - def follow_import(%User{} = follower, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "follow_import", - "user_id" => follower.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def follows_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "follow_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end - def mutes_import(%User{} = user, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "mutes_import", - "user_id" => user.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def mutes_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "mute_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end end diff --git a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex index 96466f192..d65c30dab 100644 --- a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex @@ -38,8 +38,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do |> Enum.map(&(&1 |> String.trim() |> String.trim_leading("@"))) |> Enum.reject(&(&1 == "")) - User.Import.follow_import(follower, identifiers) - json(conn, "job started") + User.Import.follows_import(follower, identifiers) + json(conn, "jobs started") end def blocks( @@ -55,7 +55,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do defp do_block(%{assigns: %{user: blocker}} = conn, list) do User.Import.blocks_import(blocker, prepare_user_identifiers(list)) - json(conn, "job started") + json(conn, "jobs started") end def mutes( @@ -71,7 +71,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do defp do_mute(%{assigns: %{user: user}} = conn, list) do User.Import.mutes_import(user, prepare_user_identifiers(list)) - json(conn, "job started") + json(conn, "jobs started") end defp prepare_user_identifiers(list) do diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 60da2d5ca..4737c6ea2 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -19,10 +19,10 @@ defmodule Pleroma.Workers.BackgroundWorker do User.perform(:force_password_reset, user) end - def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}}) - when op in ["blocks_import", "follow_import", "mutes_import"] do + def perform(%Job{args: %{"op" => op, "user_id" => user_id, "actor" => actor}}) + when op in ["block_import", "follow_import", "mute_import"] do user = User.get_cached_by_id(user_id) - {:ok, User.Import.perform(String.to_existing_atom(op), user, identifiers)} + User.Import.perform(String.to_existing_atom(op), user, actor) end def perform(%Job{ |