diff options
| -rw-r--r-- | changelog.d/publisher.change | 1 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/publisher.ex | 54 | ||||
| -rw-r--r-- | lib/pleroma/web/common_api.ex | 4 | ||||
| -rw-r--r-- | priv/repo/migrations/20240729163838_publisher_job_change.exs | 27 | ||||
| -rw-r--r-- | test/pleroma/repo/migrations/publisher_migration_change_test.exs | 43 | ||||
| -rw-r--r-- | test/pleroma/web/activity_pub/publisher_test.exs | 76 | ||||
| -rw-r--r-- | test/pleroma/web/common_api_test.exs | 32 | 
7 files changed, 149 insertions, 88 deletions
diff --git a/changelog.d/publisher.change b/changelog.d/publisher.change new file mode 100644 index 000000000..0d26d7b00 --- /dev/null +++ b/changelog.d/publisher.change @@ -0,0 +1 @@ +Publisher jobs now store the the activity id instead of inserting duplicate JSON data in the Oban queue for each delivery. diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index c8bdf2250..e040753dc 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -80,13 +80,26 @@ defmodule Pleroma.Web.ActivityPub.Publisher do    parameters set:    * `inbox`: the inbox to publish to -  * `json`: the JSON message body representing the ActivityPub message -  * `actor`: the actor which is signing the message -  * `id`: the ActivityStreams URI of the message +  * `activity_id`: the internal activity id +  * `cc`: the cc recipients relevant to this inbox (optional)    """ -  def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do -    Logger.debug("Federating #{id} to #{inbox}") +  def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do +    activity = Activity.get_by_id_with_user_actor(activity_id) +    actor = activity.user_actor + +    ap_id = activity.data["id"] +    Logger.debug("Federating #{ap_id} to #{inbox}")      uri = %{path: path} = URI.parse(inbox) + +    {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) + +    cc = Map.get(params, :cc) + +    json = +      data +      |> Map.put("cc", cc) +      |> Jason.encode!() +      digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())      date = Pleroma.Signature.signed_date() @@ -119,7 +132,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do      else        {_post_result, %{status: code} = response} = e ->          unless params[:unreachable_since], do: Instances.set_unreachable(inbox) -        Logger.metadata(activity: id, inbox: inbox, status: code) +        Logger.metadata(activity: activity_id, inbox: inbox, status: code)          Logger.error("Publisher failed to inbox #{inbox} with status #{code}")          case response do @@ -136,21 +149,12 @@ defmodule Pleroma.Web.ActivityPub.Publisher do        e ->          unless params[:unreachable_since], do: Instances.set_unreachable(inbox) -        Logger.metadata(activity: id, inbox: inbox) +        Logger.metadata(activity: activity_id, inbox: inbox)          Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")          {:error, e}      end    end -  def publish_one(%{actor_id: actor_id} = params) do -    actor = User.get_cached_by_id(actor_id) - -    params -    |> Map.delete(:actor_id) -    |> Map.put(:actor, actor) -    |> publish_one() -  end -    defp signature_host(%URI{port: port, scheme: scheme, host: host}) do      if port == URI.default_port(scheme) do        host @@ -251,7 +255,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do    def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)        when is_list(bcc) and bcc != [] do      public = public?(activity) -    {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)      [priority_recipients, recipients] = recipients(actor, activity) @@ -276,16 +279,10 @@ defmodule Pleroma.Web.ActivityPub.Publisher do            # instance would only accept a first message for the first recipient and ignore the rest.            cc = get_cc_ap_ids(ap_id, recipients) -          json = -            data -            |> Map.put("cc", cc) -            |> Jason.encode!() -            __MODULE__.enqueue_one(%{              inbox: inbox, -            json: json, -            actor_id: actor.id, -            id: activity.data["id"], +            cc: cc, +            activity_id: activity.id,              unreachable_since: unreachable_since            })          end) @@ -302,9 +299,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do        Relay.publish(activity)      end -    {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) -    json = Jason.encode!(data) -      [priority_inboxes, inboxes] =        recipients(actor, activity)        |> Enum.map(fn recipients -> @@ -326,9 +320,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do          __MODULE__.enqueue_one(            %{              inbox: inbox, -            json: json, -            actor_id: actor.id, -            id: activity.data["id"], +            activity_id: activity.id,              unreachable_since: unreachable_since            },            priority: priority diff --git a/lib/pleroma/web/common_api.ex b/lib/pleroma/web/common_api.ex index 06faf845e..b90b6a6d9 100644 --- a/lib/pleroma/web/common_api.ex +++ b/lib/pleroma/web/common_api.ex @@ -714,11 +714,11 @@ defmodule Pleroma.Web.CommonAPI do      end    end -  defp maybe_cancel_jobs(%Activity{data: %{"id" => ap_id}}) do +  defp maybe_cancel_jobs(%Activity{id: activity_id}) do      Oban.Job      |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")      |> where([j], j.args["op"] == "publish_one") -    |> where([j], j.args["params"]["id"] == ^ap_id) +    |> where([j], j.args["params"]["activity_id"] == ^activity_id)      |> Oban.cancel_all_jobs()    end diff --git a/priv/repo/migrations/20240729163838_publisher_job_change.exs b/priv/repo/migrations/20240729163838_publisher_job_change.exs new file mode 100644 index 000000000..3449e3b3b --- /dev/null +++ b/priv/repo/migrations/20240729163838_publisher_job_change.exs @@ -0,0 +1,27 @@ +defmodule Pleroma.Repo.Migrations.PublisherJobChange do +  use Ecto.Migration + +  alias Pleroma.Activity +  alias Pleroma.Repo +  import Ecto.Query + +  def up do +    query = +      from(j in Oban.Job, +        where: j.worker == "Pleroma.Workers.PublisherWorker", +        where: j.state in ["available", "retryable"] +      ) + +    jobs = Repo.all(query) + +    Enum.each(jobs, fn job -> +      args = job.args +      activity = Activity.get_by_ap_id(args["id"]) + +      updated_args = Map.put(args, "activity_id", activity.id) + +      Pleroma.Workers.PublisherWorker.new(updated_args) +      |> Oban.insert() +    end) +  end +end diff --git a/test/pleroma/repo/migrations/publisher_migration_change_test.exs b/test/pleroma/repo/migrations/publisher_migration_change_test.exs new file mode 100644 index 000000000..9c035e604 --- /dev/null +++ b/test/pleroma/repo/migrations/publisher_migration_change_test.exs @@ -0,0 +1,43 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Repo.Migrations.PublisherMigrationChangeTest do +  use Oban.Testing, repo: Pleroma.Repo +  use Pleroma.DataCase +  import Pleroma.Factory +  import Pleroma.Tests.Helpers + +  alias Pleroma.Activity +  alias Pleroma.Workers.PublisherWorker + +  setup_all do: require_migration("20240729163838_publisher_job_change") + +  describe "up/0" do +    test "migrates publisher jobs to new format", %{migration: migration} do +      user = insert(:user) + +      %Activity{id: activity_id, data: %{"id" => ap_id}} = +        insert(:note_activity, user: user) + +      {:ok, %{id: job_id}} = +        PublisherWorker.new(%{ +          "actor_id" => user.id, +          "json" => "{}", +          "id" => ap_id, +          "inbox" => "https://example.com/inbox", +          "unreachable_since" => nil +        }) +        |> Oban.insert() + +      assert [%{id: ^job_id, args: %{"id" => ^ap_id}}] = all_enqueued(worker: PublisherWorker) + +      assert migration.up() == :ok + +      assert_enqueued( +        worker: PublisherWorker, +        args: %{"id" => ap_id, "activity_id" => activity_id} +      ) +    end +  end +end diff --git a/test/pleroma/web/activity_pub/publisher_test.exs b/test/pleroma/web/activity_pub/publisher_test.exs index 6f48a0227..569b6af1a 100644 --- a/test/pleroma/web/activity_pub/publisher_test.exs +++ b/test/pleroma/web/activity_pub/publisher_test.exs @@ -137,6 +137,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do      test "publish to url with with different ports" do        inbox80 = "http://42.site/users/nick1/inbox"        inbox42 = "http://42.site:42/users/nick1/inbox" +      activity = insert(:note_activity)        mock(fn          %{method: :post, url: "http://42.site:42/users/nick1/inbox"} -> @@ -146,23 +147,19 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do            {:ok, %Tesla.Env{status: 200, body: "port 80"}}        end) -      actor = insert(:user) +      _actor = insert(:user)        assert {:ok, %{body: "port 42"}} =                 Publisher.publish_one(%{                   inbox: inbox42, -                 json: "{}", -                 actor: actor, -                 id: 1, +                 activity_id: activity.id,                   unreachable_since: true                 })        assert {:ok, %{body: "port 80"}} =                 Publisher.publish_one(%{                   inbox: inbox80, -                 json: "{}", -                 actor: actor, -                 id: 1, +                 activity_id: activity.id,                   unreachable_since: true                 })      end @@ -171,10 +168,13 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                     Instances,                     [:passthrough],                     [] do -      actor = insert(:user) +      _actor = insert(:user)        inbox = "http://200.site/users/nick1/inbox" +      activity = insert(:note_activity) + +      assert {:ok, _} = +               Publisher.publish_one(%{inbox: inbox, activity_id: activity.id}) -      assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})        assert called(Instances.set_reachable(inbox))      end @@ -182,15 +182,14 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                     Instances,                     [:passthrough],                     [] do -      actor = insert(:user) +      _actor = insert(:user)        inbox = "http://200.site/users/nick1/inbox" +      activity = insert(:note_activity)        assert {:ok, _} =                 Publisher.publish_one(%{                   inbox: inbox, -                 json: "{}", -                 actor: actor, -                 id: 1, +                 activity_id: activity.id,                   unreachable_since: NaiveDateTime.utc_now()                 }) @@ -201,15 +200,14 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                     Instances,                     [:passthrough],                     [] do -      actor = insert(:user) +      _actor = insert(:user)        inbox = "http://200.site/users/nick1/inbox" +      activity = insert(:note_activity)        assert {:ok, _} =                 Publisher.publish_one(%{                   inbox: inbox, -                 json: "{}", -                 actor: actor, -                 id: 1, +                 activity_id: activity.id,                   unreachable_since: nil                 }) @@ -220,11 +218,12 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                     Instances,                     [:passthrough],                     [] do -      actor = insert(:user) +      _actor = insert(:user)        inbox = "http://404.site/users/nick1/inbox" +      activity = insert(:note_activity)        assert {:cancel, _} = -               Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) +               Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})        assert called(Instances.set_unreachable(inbox))      end @@ -233,12 +232,16 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                     Instances,                     [:passthrough],                     [] do -      actor = insert(:user) +      _actor = insert(:user)        inbox = "http://connrefused.site/users/nick1/inbox" +      activity = insert(:note_activity)        assert capture_log(fn ->                 assert {:error, _} = -                        Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) +                        Publisher.publish_one(%{ +                          inbox: inbox, +                          activity_id: activity.id +                        })               end) =~ "connrefused"        assert called(Instances.set_unreachable(inbox)) @@ -248,10 +251,12 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                     Instances,                     [:passthrough],                     [] do -      actor = insert(:user) +      _actor = insert(:user)        inbox = "http://200.site/users/nick1/inbox" +      activity = insert(:note_activity) -      assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) +      assert {:ok, _} = +               Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})        refute called(Instances.set_unreachable(inbox))      end @@ -260,16 +265,15 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                     Instances,                     [:passthrough],                     [] do -      actor = insert(:user) +      _actor = insert(:user)        inbox = "http://connrefused.site/users/nick1/inbox" +      activity = insert(:note_activity)        assert capture_log(fn ->                 assert {:error, _} =                          Publisher.publish_one(%{                            inbox: inbox, -                          json: "{}", -                          actor: actor, -                          id: 1, +                          activity_id: activity.id,                            unreachable_since: NaiveDateTime.utc_now()                          })               end) =~ "connrefused" @@ -309,8 +313,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do        assert not called(                 Publisher.enqueue_one(%{                   inbox: "https://domain.com/users/nick1/inbox", -                 actor_id: actor.id, -                 id: note_activity.data["id"] +                 activity_id: note_activity.id                 })               )      end @@ -346,8 +349,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                 Publisher.enqueue_one(                   %{                     inbox: "https://domain.com/users/nick1/inbox", -                   actor_id: actor.id, -                   id: note_activity.data["id"] +                   activity_id: note_activity.id                   },                   priority: 1                 ) @@ -370,8 +372,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                 Publisher.enqueue_one(                   %{                     inbox: :_, -                   actor_id: actor.id, -                   id: note_activity.data["id"] +                   activity_id: note_activity.id                   },                   priority: 0                 ) @@ -405,8 +406,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do        assert called(                 Publisher.enqueue_one(%{                   inbox: "https://domain.com/users/nick1/inbox", -                 actor_id: actor.id, -                 id: note_activity.data["id"] +                 activity_id: note_activity.id                 })               )      end @@ -456,8 +456,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                 Publisher.enqueue_one(                   %{                     inbox: "https://domain.com/users/nick1/inbox", -                   actor_id: actor.id, -                   id: delete.data["id"] +                   activity_id: delete.id                   },                   priority: 1                 ) @@ -467,8 +466,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do                 Publisher.enqueue_one(                   %{                     inbox: "https://domain2.com/users/nick1/inbox", -                   actor_id: actor.id, -                   id: delete.data["id"] +                   activity_id: delete.id                   },                   priority: 1                 ) diff --git a/test/pleroma/web/common_api_test.exs b/test/pleroma/web/common_api_test.exs index b6fba6999..4cdd3cffa 100644 --- a/test/pleroma/web/common_api_test.exs +++ b/test/pleroma/web/common_api_test.exs @@ -1957,7 +1957,7 @@ defmodule Pleroma.Web.CommonAPITest do        {:ok, _, _} = Pleroma.User.follow(remote_one, local_user)        {:ok, _, _} = Pleroma.User.follow(remote_two, local_user) -      {:ok, %{data: %{"id" => ap_id}} = activity} = +      {:ok, %{id: activity_id} = _activity} =          CommonAPI.post(local_user, %{status: "Happy Friday everyone!"})        # Generate the publish_one jobs @@ -1971,7 +1971,7 @@ defmodule Pleroma.Web.CommonAPITest do                state: "available",                queue: "federator_outgoing",                worker: "Pleroma.Workers.PublisherWorker", -              args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} +              args: %{"op" => "publish_one", "params" => %{"activity_id" => ^activity_id}}              },              job            ) @@ -1980,7 +1980,7 @@ defmodule Pleroma.Web.CommonAPITest do        assert length(publish_one_jobs) == 2        # The delete should have triggered cancelling the publish_one jobs -      assert {:ok, _delete} = CommonAPI.delete(activity.id, local_user) +      assert {:ok, _delete} = CommonAPI.delete(activity_id, local_user)        # all_enqueued/1 will not return cancelled jobs        cancelled_jobs = @@ -1988,7 +1988,7 @@ defmodule Pleroma.Web.CommonAPITest do          |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")          |> where([j], j.state == "cancelled")          |> where([j], j.args["op"] == "publish_one") -        |> where([j], j.args["params"]["id"] == ^ap_id) +        |> where([j], j.args["params"]["activity_id"] == ^activity_id)          |> Pleroma.Repo.all()        assert length(cancelled_jobs) == 2 @@ -2001,7 +2001,7 @@ defmodule Pleroma.Web.CommonAPITest do        {:ok, activity} =          CommonAPI.post(remote_user, %{status: "I like turtles!"}) -      {:ok, %{data: %{"id" => ap_id}} = _favorite} = +      {:ok, %{id: favorite_id} = _favorite} =          CommonAPI.favorite(activity.id, local_user)        # Generate the publish_one jobs @@ -2015,7 +2015,7 @@ defmodule Pleroma.Web.CommonAPITest do                state: "available",                queue: "federator_outgoing",                worker: "Pleroma.Workers.PublisherWorker", -              args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} +              args: %{"op" => "publish_one", "params" => %{"activity_id" => ^favorite_id}}              },              job            ) @@ -2032,7 +2032,7 @@ defmodule Pleroma.Web.CommonAPITest do          |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")          |> where([j], j.state == "cancelled")          |> where([j], j.args["op"] == "publish_one") -        |> where([j], j.args["params"]["id"] == ^ap_id) +        |> where([j], j.args["params"]["activity_id"] == ^favorite_id)          |> Pleroma.Repo.all()        assert length(cancelled_jobs) == 1 @@ -2049,7 +2049,7 @@ defmodule Pleroma.Web.CommonAPITest do        {:ok, activity} =          CommonAPI.post(remote_one, %{status: "This is an unpleasant post"}) -      {:ok, %{data: %{"id" => ap_id}} = _repeat} = +      {:ok, %{id: repeat_id} = _repeat} =          CommonAPI.repeat(activity.id, local_user)        # Generate the publish_one jobs @@ -2063,7 +2063,7 @@ defmodule Pleroma.Web.CommonAPITest do                state: "available",                queue: "federator_outgoing",                worker: "Pleroma.Workers.PublisherWorker", -              args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} +              args: %{"op" => "publish_one", "params" => %{"activity_id" => ^repeat_id}}              },              job            ) @@ -2080,7 +2080,7 @@ defmodule Pleroma.Web.CommonAPITest do          |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")          |> where([j], j.state == "cancelled")          |> where([j], j.args["op"] == "publish_one") -        |> where([j], j.args["params"]["id"] == ^ap_id) +        |> where([j], j.args["params"]["activity_id"] == ^repeat_id)          |> Pleroma.Repo.all()        assert length(cancelled_jobs) == 2 @@ -2094,11 +2094,11 @@ defmodule Pleroma.Web.CommonAPITest do        {:ok, _, _} = Pleroma.User.follow(remote_one, local_user)        {:ok, _, _} = Pleroma.User.follow(remote_two, local_user) -      {:ok, activity} = +      {:ok, %{id: activity_id}} =          CommonAPI.post(remote_one, %{status: "Gang gang!!!!"}) -      {:ok, %{data: %{"id" => ap_id}} = _react} = -        CommonAPI.react_with_emoji(activity.id, local_user, "👍") +      {:ok, %{id: react_id} = _react} = +        CommonAPI.react_with_emoji(activity_id, local_user, "👍")        # Generate the publish_one jobs        ObanHelpers.perform_all() @@ -2111,7 +2111,7 @@ defmodule Pleroma.Web.CommonAPITest do                state: "available",                queue: "federator_outgoing",                worker: "Pleroma.Workers.PublisherWorker", -              args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} +              args: %{"op" => "publish_one", "params" => %{"activity_id" => ^react_id}}              },              job            ) @@ -2120,7 +2120,7 @@ defmodule Pleroma.Web.CommonAPITest do        assert length(publish_one_jobs) == 2        # The unreact should have triggered cancelling the publish_one jobs -      assert {:ok, _unreact} = CommonAPI.unreact_with_emoji(activity.id, local_user, "👍") +      assert {:ok, _unreact} = CommonAPI.unreact_with_emoji(activity_id, local_user, "👍")        # all_enqueued/1 will not return cancelled jobs        cancelled_jobs = @@ -2128,7 +2128,7 @@ defmodule Pleroma.Web.CommonAPITest do          |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")          |> where([j], j.state == "cancelled")          |> where([j], j.args["op"] == "publish_one") -        |> where([j], j.args["params"]["id"] == ^ap_id) +        |> where([j], j.args["params"]["activity_id"] == ^react_id)          |> Pleroma.Repo.all()        assert length(cancelled_jobs) == 2  | 
