summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaelwenn <contact+git.pleroma.social@hacktivis.me>2023-06-27 12:08:11 +0000
committerHaelwenn <contact+git.pleroma.social@hacktivis.me>2023-06-27 12:08:11 +0000
commit41f2ee69a84178b83eb045145de72728208d399b (patch)
tree5473eca0ff7b1b1e3e6751b7081a10fd6358e0a9
parent4e26fbda082f52bbb2a7db634744978fb4586a6b (diff)
parent179efd94677d1d30bdbbbbaafc899c8c908181d2 (diff)
downloadpleroma-41f2ee69a84178b83eb045145de72728208d399b.tar.gz
pleroma-41f2ee69a84178b83eb045145de72728208d399b.zip
Merge branch 'from/upstream-develop/tusooa/backup-status' into 'develop'
Detail backup states Closes #3024 See merge request pleroma/pleroma!3809
-rw-r--r--config/config.exs4
-rw-r--r--config/description.exs15
-rw-r--r--lib/pleroma/ecto_enums.ex8
-rw-r--r--lib/pleroma/user/backup.ex175
-rw-r--r--lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex12
-rw-r--r--lib/pleroma/web/pleroma_api/views/backup_view.ex10
-rw-r--r--lib/pleroma/workers/backup_worker.ex2
-rw-r--r--priv/repo/migrations/20221216052127_add_state_to_backups.exs21
-rw-r--r--test/pleroma/user/backup_test.exs98
-rw-r--r--test/pleroma/web/pleroma_api/views/backup_view_test.exs39
10 files changed, 351 insertions, 33 deletions
diff --git a/config/config.exs b/config/config.exs
index 178b6be99..a92ab574b 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -871,7 +871,9 @@ config :pleroma, Pleroma.Web.Auth.Authenticator, Pleroma.Web.Auth.PleromaAuthent
config :pleroma, Pleroma.User.Backup,
purge_after_days: 30,
limit_days: 7,
- dir: nil
+ dir: nil,
+ process_wait_time: 30_000,
+ process_chunk_size: 100
config :pleroma, ConcurrentLimiter, [
{Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]},
diff --git a/config/description.exs b/config/description.exs
index 5f8add8a4..d18649ae8 100644
--- a/config/description.exs
+++ b/config/description.exs
@@ -3364,6 +3364,21 @@ config :pleroma, :config_description, [
type: :integer,
description: "Limit user to export not more often than once per N days",
suggestions: [7]
+ },
+ %{
+ key: :process_wait_time,
+ type: :integer,
+ label: "Process Wait Time",
+ description:
+ "The amount of time to wait for backup to report progress, in milliseconds. If no progress is received from the backup job for that much time, terminate it and deem it failed.",
+ suggestions: [30_000]
+ },
+ %{
+ key: :process_chunk_size,
+ type: :integer,
+ label: "Process Chunk Size",
+ description: "The number of activities to fetch in the backup job for each chunk.",
+ suggestions: [100]
}
]
},
diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex
index a4890b489..b346b39d6 100644
--- a/lib/pleroma/ecto_enums.ex
+++ b/lib/pleroma/ecto_enums.ex
@@ -27,3 +27,11 @@ defenum(Pleroma.DataMigration.State,
failed: 4,
manual: 5
)
+
+defenum(Pleroma.User.Backup.State,
+ pending: 1,
+ running: 2,
+ complete: 3,
+ failed: 4,
+ invalid: 5
+)
diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex
index 9df010605..447fca2a1 100644
--- a/lib/pleroma/user/backup.ex
+++ b/lib/pleroma/user/backup.ex
@@ -9,12 +9,14 @@ defmodule Pleroma.User.Backup do
import Ecto.Query
import Pleroma.Web.Gettext
+ require Logger
require Pleroma.Constants
alias Pleroma.Activity
alias Pleroma.Bookmark
alias Pleroma.Repo
alias Pleroma.User
+ alias Pleroma.User.Backup.State
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.UserView
@@ -25,6 +27,8 @@ defmodule Pleroma.User.Backup do
field(:file_name, :string)
field(:file_size, :integer, default: 0)
field(:processed, :boolean, default: false)
+ field(:state, State, default: :invalid)
+ field(:processed_number, :integer, default: 0)
belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
@@ -46,7 +50,8 @@ defmodule Pleroma.User.Backup do
%__MODULE__{
user_id: user.id,
content_type: "application/zip",
- file_name: name
+ file_name: name,
+ state: :pending
}
end
@@ -109,27 +114,108 @@ defmodule Pleroma.User.Backup do
def get(id), do: Repo.get(__MODULE__, id)
+ defp set_state(backup, state, processed_number \\ nil) do
+ struct =
+ %{state: state}
+ |> Pleroma.Maps.put_if_present(:processed_number, processed_number)
+
+ backup
+ |> cast(struct, [:state, :processed_number])
+ |> Repo.update()
+ end
+
def process(%__MODULE__{} = backup) do
- with {:ok, zip_file} <- export(backup),
+ set_state(backup, :running, 0)
+
+ current_pid = self()
+
+ task =
+ Task.Supervisor.async_nolink(
+ Pleroma.TaskSupervisor,
+ __MODULE__,
+ :do_process,
+ [backup, current_pid]
+ )
+
+ wait_backup(backup, backup.processed_number, task)
+ end
+
+ def do_process(backup, current_pid) do
+ with {:ok, zip_file} <- export(backup, current_pid),
{:ok, %{size: size}} <- File.stat(zip_file),
{:ok, _upload} <- upload(backup, zip_file) do
backup
- |> cast(%{file_size: size, processed: true}, [:file_size, :processed])
+ |> cast(
+ %{
+ file_size: size,
+ processed: true,
+ state: :complete
+ },
+ [:file_size, :processed, :state]
+ )
|> Repo.update()
end
end
+ defp wait_backup(backup, current_processed, task) do
+ wait_time = Pleroma.Config.get([__MODULE__, :process_wait_time])
+
+ receive do
+ {:progress, new_processed} ->
+ total_processed = current_processed + new_processed
+
+ set_state(backup, :running, total_processed)
+ wait_backup(backup, total_processed, task)
+
+ {:DOWN, _ref, _proc, _pid, reason} ->
+ backup = get(backup.id)
+
+ if reason != :normal do
+ Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
+
+ {:ok, backup} = set_state(backup, :failed)
+
+ cleanup(backup)
+
+ {:error,
+ %{
+ backup: backup,
+ reason: :exit,
+ details: reason
+ }}
+ else
+ {:ok, backup}
+ end
+ after
+ wait_time ->
+ Logger.error(
+ "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
+ )
+
+ Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
+
+ {:ok, backup} = set_state(backup, :failed)
+
+ cleanup(backup)
+
+ {:error,
+ %{
+ backup: backup,
+ reason: :timeout
+ }}
+ end
+ end
+
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
- def export(%__MODULE__{} = backup) do
+ def export(%__MODULE__{} = backup, caller_pid) do
backup = Repo.preload(backup, :user)
- name = String.trim_trailing(backup.file_name, ".zip")
- dir = dir(name)
+ dir = backup_tempdir(backup)
with :ok <- File.mkdir(dir),
- :ok <- actor(dir, backup.user),
- :ok <- statuses(dir, backup.user),
- :ok <- likes(dir, backup.user),
- :ok <- bookmarks(dir, backup.user),
+ :ok <- actor(dir, backup.user, caller_pid),
+ :ok <- statuses(dir, backup.user, caller_pid),
+ :ok <- likes(dir, backup.user, caller_pid),
+ :ok <- bookmarks(dir, backup.user, caller_pid),
{:ok, zip_path} <- :zip.create(String.to_charlist(dir <> ".zip"), @files, cwd: dir),
{:ok, _} <- File.rm_rf(dir) do
{:ok, to_string(zip_path)}
@@ -157,11 +243,12 @@ defmodule Pleroma.User.Backup do
end
end
- defp actor(dir, user) do
+ defp actor(dir, user, caller_pid) do
with {:ok, json} <-
UserView.render("user.json", %{user: user})
|> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
|> Jason.encode() do
+ send(caller_pid, {:progress, 1})
File.write(Path.join(dir, "actor.json"), json)
end
end
@@ -180,47 +267,80 @@ defmodule Pleroma.User.Backup do
)
end
- defp write(query, dir, name, fun) do
+ defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
+
+ defp backup_tempdir(backup) do
+ name = String.trim_trailing(backup.file_name, ".zip")
+ dir(name)
+ end
+
+ defp cleanup(backup) do
+ dir = backup_tempdir(backup)
+ File.rm_rf(dir)
+ end
+
+ defp write(query, dir, name, fun, caller_pid) do
path = Path.join(dir, "#{name}.json")
+ chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
+
with {:ok, file} <- File.open(path, [:write, :utf8]),
:ok <- write_header(file, name) do
total =
query
- |> Pleroma.Repo.chunk_stream(100)
+ |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
|> Enum.reduce(0, fn i, acc ->
- with {:ok, data} <- fun.(i),
+ with {:ok, data} <-
+ (try do
+ fun.(i)
+ rescue
+ e -> {:error, e}
+ end),
{:ok, str} <- Jason.encode(data),
:ok <- IO.write(file, str <> ",\n") do
+ if should_report?(acc + 1, chunk_size) do
+ send(caller_pid, {:progress, chunk_size})
+ end
+
acc + 1
else
- _ -> acc
+ {:error, e} ->
+ Logger.warn(
+ "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
+ )
+
+ acc
+
+ _ ->
+ acc
end
end)
+ send(caller_pid, {:progress, rem(total, chunk_size)})
+
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
File.close(file)
end
end
end
- defp bookmarks(dir, %{id: user_id} = _user) do
+ defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
Bookmark
|> where(user_id: ^user_id)
|> join(:inner, [b], activity in assoc(b, :activity))
|> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
- |> write(dir, "bookmarks", fn a -> {:ok, a.object} end)
+ |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
end
- defp likes(dir, user) do
+ defp likes(dir, user, caller_pid) do
user.ap_id
|> Activity.Queries.by_actor()
|> Activity.Queries.by_type("Like")
|> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
- |> write(dir, "likes", fn a -> {:ok, a.object} end)
+ |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
end
- defp statuses(dir, user) do
+ defp statuses(dir, user, caller_pid) do
opts =
%{}
|> Map.put(:type, ["Create", "Announce"])
@@ -233,10 +353,15 @@ defmodule Pleroma.User.Backup do
]
|> Enum.concat()
|> ActivityPub.fetch_activities_query(opts)
- |> write(dir, "outbox", fn a ->
- with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
- {:ok, Map.delete(activity, "@context")}
- end
- end)
+ |> write(
+ dir,
+ "outbox",
+ fn a ->
+ with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
+ {:ok, Map.delete(activity, "@context")}
+ end
+ end,
+ caller_pid
+ )
end
end
diff --git a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex
index 45fa2b058..400f3825d 100644
--- a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex
+++ b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex
@@ -64,7 +64,13 @@ defmodule Pleroma.Web.ApiSpec.PleromaBackupOperation do
content_type: %Schema{type: :string},
file_name: %Schema{type: :string},
file_size: %Schema{type: :integer},
- processed: %Schema{type: :boolean}
+ processed: %Schema{type: :boolean, description: "whether this backup has succeeded"},
+ state: %Schema{
+ type: :string,
+ description: "the state of the backup",
+ enum: ["pending", "running", "complete", "failed"]
+ },
+ processed_number: %Schema{type: :integer, description: "the number of records processed"}
},
example: %{
"content_type" => "application/zip",
@@ -72,7 +78,9 @@ defmodule Pleroma.Web.ApiSpec.PleromaBackupOperation do
"https://cofe.fe:4000/media/backups/archive-foobar-20200908T164207-Yr7vuT5Wycv-sN3kSN2iJ0k-9pMo60j9qmvRCdDqIew.zip",
"file_size" => 4105,
"inserted_at" => "2020-09-08T16:42:07.000Z",
- "processed" => true
+ "processed" => true,
+ "state" => "complete",
+ "processed_number" => 20
}
}
end
diff --git a/lib/pleroma/web/pleroma_api/views/backup_view.ex b/lib/pleroma/web/pleroma_api/views/backup_view.ex
index d778590f0..20403aeee 100644
--- a/lib/pleroma/web/pleroma_api/views/backup_view.ex
+++ b/lib/pleroma/web/pleroma_api/views/backup_view.ex
@@ -9,12 +9,22 @@ defmodule Pleroma.Web.PleromaAPI.BackupView do
alias Pleroma.Web.CommonAPI.Utils
def render("show.json", %{backup: %Backup{} = backup}) do
+ # To deal with records before the migration
+ state =
+ if backup.state == :invalid do
+ if backup.processed, do: :complete, else: :failed
+ else
+ backup.state
+ end
+
%{
id: backup.id,
content_type: backup.content_type,
url: download_url(backup),
file_size: backup.file_size,
processed: backup.processed,
+ state: to_string(state),
+ processed_number: backup.processed_number,
inserted_at: Utils.to_masto_date(backup.inserted_at)
}
end
diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex
index 12ee70f00..a485ddb4b 100644
--- a/lib/pleroma/workers/backup_worker.ex
+++ b/lib/pleroma/workers/backup_worker.ex
@@ -51,7 +51,7 @@ defmodule Pleroma.Workers.BackupWorker do
end
@impl Oban.Worker
- def timeout(_job), do: :timer.seconds(900)
+ def timeout(_job), do: :infinity
defp has_email?(user) do
not is_nil(user.email) and user.email != ""
diff --git a/priv/repo/migrations/20221216052127_add_state_to_backups.exs b/priv/repo/migrations/20221216052127_add_state_to_backups.exs
new file mode 100644
index 000000000..73b30fc35
--- /dev/null
+++ b/priv/repo/migrations/20221216052127_add_state_to_backups.exs
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Repo.Migrations.AddStateToBackups do
+ use Ecto.Migration
+
+ def up do
+ alter table(:backups) do
+ add(:state, :integer, default: 5)
+ add(:processed_number, :integer, default: 0)
+ end
+ end
+
+ def down do
+ alter table(:backups) do
+ remove(:state)
+ remove(:processed_number)
+ end
+ end
+end
diff --git a/test/pleroma/user/backup_test.exs b/test/pleroma/user/backup_test.exs
index 5c9b94000..066bf6ba8 100644
--- a/test/pleroma/user/backup_test.exs
+++ b/test/pleroma/user/backup_test.exs
@@ -39,7 +39,7 @@ defmodule Pleroma.User.BackupTest do
assert_enqueued(worker: BackupWorker, args: args)
backup = Backup.get(args["backup_id"])
- assert %Backup{user_id: ^user_id, processed: false, file_size: 0} = backup
+ assert %Backup{user_id: ^user_id, processed: false, file_size: 0, state: :pending} = backup
end
test "it return an error if the export limit is over" do
@@ -59,7 +59,30 @@ defmodule Pleroma.User.BackupTest do
assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user)
assert {:ok, backup} = perform_job(BackupWorker, args)
assert backup.file_size > 0
- assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup
+ assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup
+
+ delete_job_args = %{"op" => "delete", "backup_id" => backup_id}
+
+ assert_enqueued(worker: BackupWorker, args: delete_job_args)
+ assert {:ok, backup} = perform_job(BackupWorker, delete_job_args)
+ refute Backup.get(backup_id)
+
+ email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup)
+
+ assert_email_sent(
+ to: {user.name, user.email},
+ html_body: email.html_body
+ )
+ end
+
+ test "it updates states of the backup" do
+ clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local)
+ %{id: user_id} = user = insert(:user)
+
+ assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user)
+ assert {:ok, backup} = perform_job(BackupWorker, args)
+ assert backup.file_size > 0
+ assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup
delete_job_args = %{"op" => "delete", "backup_id" => backup_id}
@@ -148,7 +171,7 @@ defmodule Pleroma.User.BackupTest do
Bookmark.create(user.id, status3.id)
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
- assert {:ok, path} = Backup.export(backup)
+ assert {:ok, path} = Backup.export(backup, self())
assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(path), [:memory])
assert {:ok, {'actor.json', json}} = :zip.zip_get('actor.json', zipfile)
@@ -230,6 +253,73 @@ defmodule Pleroma.User.BackupTest do
File.rm!(path)
end
+ test "it counts the correct number processed" do
+ user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
+
+ Enum.map(1..120, fn i ->
+ {:ok, status} = CommonAPI.post(user, %{status: "status #{i}"})
+ CommonAPI.favorite(user, status.id)
+ Bookmark.create(user.id, status.id)
+ end)
+
+ assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
+ {:ok, backup} = Backup.process(backup)
+
+ assert backup.processed_number == 1 + 120 + 120 + 120
+
+ Backup.delete(backup)
+ end
+
+ test "it handles errors" do
+ user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
+
+ Enum.map(1..120, fn i ->
+ {:ok, _status} = CommonAPI.post(user, %{status: "status #{i}"})
+ end)
+
+ assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
+
+ with_mock Pleroma.Web.ActivityPub.Transmogrifier,
+ [:passthrough],
+ prepare_outgoing: fn data ->
+ object =
+ data["object"]
+ |> Pleroma.Object.normalize(fetch: false)
+ |> Map.get(:data)
+
+ data = data |> Map.put("object", object)
+
+ if String.contains?(data["object"]["content"], "119"),
+ do: raise(%Postgrex.Error{}),
+ else: {:ok, data}
+ end do
+ {:ok, backup} = Backup.process(backup)
+ assert backup.processed
+ assert backup.state == :complete
+ assert backup.processed_number == 1 + 119
+
+ Backup.delete(backup)
+ end
+ end
+
+ test "it handles unrecoverable exceptions" do
+ user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
+
+ assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
+
+ with_mock Backup, [:passthrough], do_process: fn _, _ -> raise "mock exception" end do
+ {:error, %{backup: backup, reason: :exit}} = Backup.process(backup)
+
+ assert backup.state == :failed
+ end
+
+ with_mock Backup, [:passthrough], do_process: fn _, _ -> Process.sleep(:timer.seconds(32)) end do
+ {:error, %{backup: backup, reason: :timeout}} = Backup.process(backup)
+
+ assert backup.state == :failed
+ end
+ end
+
describe "it uploads and deletes a backup archive" do
setup do
clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com")
@@ -246,7 +336,7 @@ defmodule Pleroma.User.BackupTest do
Bookmark.create(user.id, status3.id)
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
- assert {:ok, path} = Backup.export(backup)
+ assert {:ok, path} = Backup.export(backup, self())
[path: path, backup: backup]
end
diff --git a/test/pleroma/web/pleroma_api/views/backup_view_test.exs b/test/pleroma/web/pleroma_api/views/backup_view_test.exs
index a86688bc4..6908463d6 100644
--- a/test/pleroma/web/pleroma_api/views/backup_view_test.exs
+++ b/test/pleroma/web/pleroma_api/views/backup_view_test.exs
@@ -15,4 +15,43 @@ defmodule Pleroma.Web.PleromaAPI.BackupViewTest do
result = BackupView.render("show.json", backup: backup)
assert result.id == backup.id
end
+
+ test "it renders the state and processed_number" do
+ user = insert(:user)
+ backup = Backup.new(user)
+
+ result = BackupView.render("show.json", backup: backup)
+ assert result.state == to_string(backup.state)
+ assert result.processed_number == backup.processed_number
+ end
+
+ test "it renders failed state with legacy records" do
+ backup = %Backup{
+ id: 0,
+ content_type: "application/zip",
+ file_name: "dummy",
+ file_size: 1,
+ state: :invalid,
+ processed: true,
+ processed_number: 1,
+ inserted_at: NaiveDateTime.utc_now()
+ }
+
+ result = BackupView.render("show.json", backup: backup)
+ assert result.state == "complete"
+
+ backup = %Backup{
+ id: 0,
+ content_type: "application/zip",
+ file_name: "dummy",
+ file_size: 1,
+ state: :invalid,
+ processed: false,
+ processed_number: 1,
+ inserted_at: NaiveDateTime.utc_now()
+ }
+
+ result = BackupView.render("show.json", backup: backup)
+ assert result.state == "failed"
+ end
end