summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMark Felder <feld@feld.me>2024-08-22 12:49:32 -0400
committerMark Felder <feld@feld.me>2024-08-22 13:24:07 -0400
commita9aa810d3dadaac5a40d18f56ab41b6276206db1 (patch)
treef8c4b058db16c284ff3971eda74d568adb494b9f /lib
parent649e51b581327eb34d31e0160ea70d1cba281f9a (diff)
downloadpleroma-a9aa810d3dadaac5a40d18f56ab41b6276206db1.tar.gz
pleroma-a9aa810d3dadaac5a40d18f56ab41b6276206db1.zip
Change imports to generate an Oban job per each task
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/user/import.ex144
-rw-r--r--lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex8
-rw-r--r--lib/pleroma/workers/background_worker.ex6
3 files changed, 91 insertions, 67 deletions
diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex
index bee586234..400e62153 100644
--- a/lib/pleroma/user/import.ex
+++ b/lib/pleroma/user/import.ex
@@ -5,6 +5,7 @@
defmodule Pleroma.User.Import do
use Ecto.Schema
+ alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.BackgroundWorker
@@ -12,80 +13,103 @@ defmodule Pleroma.User.Import do
require Logger
@spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()}
- def perform(:mutes_import, %User{} = user, [_ | _] = identifiers) do
- Enum.map(
- identifiers,
- fn identifier ->
- with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier),
- {:ok, _} <- User.mute(user, muted_user) do
- {:ok, muted_user}
- else
- error -> handle_error(:mutes_import, identifier, error)
- end
- end
- )
+ def perform(:mute_import, %User{} = user, actor) do
+ with {:ok, %User{} = muted_user} <- User.get_or_fetch(actor),
+ {_, false} <- {:existing_mute, User.mutes_user?(user, muted_user)},
+ {:ok, _} <- User.mute(user, muted_user),
+ # User.mute/2 returns a FollowingRelationship not a %User{} like we get
+ # from CommonAPI.block/2 or CommonAPI.follow/2, so we fetch again to
+ # return the target actor for consistency
+ {:ok, muted_user} <- User.get_or_fetch(actor) do
+ {:ok, muted_user}
+ else
+ {:existing_mute, true} -> :ok
+ error -> handle_error(:mutes_import, actor, error)
+ end
end
- def perform(:blocks_import, %User{} = blocker, [_ | _] = identifiers) do
- Enum.map(
- identifiers,
- fn identifier ->
- with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier),
- {:ok, _block} <- CommonAPI.block(blocked, blocker) do
- {:ok, blocked}
- else
- error -> handle_error(:blocks_import, identifier, error)
- end
- end
- )
+ def perform(:block_import, %User{} = user, actor) do
+ with {:ok, %User{} = blocked} <- User.get_or_fetch(actor),
+ {_, false} <- {:existing_block, User.blocks_user?(user, blocked)},
+ {:ok, _block} <- CommonAPI.block(blocked, user) do
+ {:ok, blocked}
+ else
+ {:existing_block, true} -> :ok
+ error -> handle_error(:blocks_import, actor, error)
+ end
end
- def perform(:follow_import, %User{} = follower, [_ | _] = identifiers) do
- Enum.map(
- identifiers,
- fn identifier ->
- with {:ok, %User{} = followed} <- User.get_or_fetch(identifier),
- {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed),
- {:ok, _, _, _} <- CommonAPI.follow(followed, follower) do
- {:ok, followed}
- else
- error -> handle_error(:follow_import, identifier, error)
- end
- end
- )
+ def perform(:follow_import, %User{} = user, actor) do
+ with {:ok, %User{} = followed} <- User.get_or_fetch(actor),
+ {_, false} <- {:existing_follow, User.following?(user, followed)},
+ {:ok, user, followed} <- User.maybe_direct_follow(user, followed),
+ {:ok, _, _, _} <- CommonAPI.follow(followed, user) do
+ {:ok, followed}
+ else
+ {:existing_follow, true} -> :ok
+ error -> handle_error(:follow_import, actor, error)
+ end
end
- def perform(_, _, _), do: :ok
-
defp handle_error(op, user_id, error) do
Logger.debug("#{op} failed for #{user_id} with: #{inspect(error)}")
error
end
- def blocks_import(%User{} = blocker, [_ | _] = identifiers) do
- BackgroundWorker.new(%{
- "op" => "blocks_import",
- "user_id" => blocker.id,
- "identifiers" => identifiers
- })
- |> Oban.insert()
+ def blocks_import(%User{} = user, [_ | _] = actors) do
+ jobs =
+ Repo.checkout(fn ->
+ Enum.reduce(actors, [], fn actor, acc ->
+ {:ok, job} =
+ BackgroundWorker.new(%{
+ "op" => "block_import",
+ "user_id" => user.id,
+ "actor" => actor
+ })
+ |> Oban.insert()
+
+ acc ++ [job]
+ end)
+ end)
+
+ {:ok, jobs}
end
- def follow_import(%User{} = follower, [_ | _] = identifiers) do
- BackgroundWorker.new(%{
- "op" => "follow_import",
- "user_id" => follower.id,
- "identifiers" => identifiers
- })
- |> Oban.insert()
+ def follows_import(%User{} = user, [_ | _] = actors) do
+ jobs =
+ Repo.checkout(fn ->
+ Enum.reduce(actors, [], fn actor, acc ->
+ {:ok, job} =
+ BackgroundWorker.new(%{
+ "op" => "follow_import",
+ "user_id" => user.id,
+ "actor" => actor
+ })
+ |> Oban.insert()
+
+ acc ++ [job]
+ end)
+ end)
+
+ {:ok, jobs}
end
- def mutes_import(%User{} = user, [_ | _] = identifiers) do
- BackgroundWorker.new(%{
- "op" => "mutes_import",
- "user_id" => user.id,
- "identifiers" => identifiers
- })
- |> Oban.insert()
+ def mutes_import(%User{} = user, [_ | _] = actors) do
+ jobs =
+ Repo.checkout(fn ->
+ Enum.reduce(actors, [], fn actor, acc ->
+ {:ok, job} =
+ BackgroundWorker.new(%{
+ "op" => "mute_import",
+ "user_id" => user.id,
+ "actor" => actor
+ })
+ |> Oban.insert()
+
+ acc ++ [job]
+ end)
+ end)
+
+ {:ok, jobs}
end
end
diff --git a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex
index 96466f192..d65c30dab 100644
--- a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex
+++ b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex
@@ -38,8 +38,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do
|> Enum.map(&(&1 |> String.trim() |> String.trim_leading("@")))
|> Enum.reject(&(&1 == ""))
- User.Import.follow_import(follower, identifiers)
- json(conn, "job started")
+ User.Import.follows_import(follower, identifiers)
+ json(conn, "jobs started")
end
def blocks(
@@ -55,7 +55,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do
defp do_block(%{assigns: %{user: blocker}} = conn, list) do
User.Import.blocks_import(blocker, prepare_user_identifiers(list))
- json(conn, "job started")
+ json(conn, "jobs started")
end
def mutes(
@@ -71,7 +71,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do
defp do_mute(%{assigns: %{user: user}} = conn, list) do
User.Import.mutes_import(user, prepare_user_identifiers(list))
- json(conn, "job started")
+ json(conn, "jobs started")
end
defp prepare_user_identifiers(list) do
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index 60da2d5ca..4737c6ea2 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -19,10 +19,10 @@ defmodule Pleroma.Workers.BackgroundWorker do
User.perform(:force_password_reset, user)
end
- def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}})
- when op in ["blocks_import", "follow_import", "mutes_import"] do
+ def perform(%Job{args: %{"op" => op, "user_id" => user_id, "actor" => actor}})
+ when op in ["block_import", "follow_import", "mute_import"] do
user = User.get_cached_by_id(user_id)
- {:ok, User.Import.perform(String.to_existing_atom(op), user, identifiers)}
+ User.Import.perform(String.to_existing_atom(op), user, actor)
end
def perform(%Job{