diff options
Diffstat (limited to 'lib/pleroma/web/activity_pub')
-rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub.ex | 53 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub_controller.ex | 27 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex | 5 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/publisher.ex | 31 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/transmogrifier.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/utils.ex | 9 |
6 files changed, 73 insertions, 55 deletions
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index bc9a7a2d6..eac0b5a99 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity + alias Pleroma.Activity.Ir.Topics alias Pleroma.Config alias Pleroma.Conversation alias Pleroma.Notification @@ -17,7 +18,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.Streamer alias Pleroma.Web.WebFinger + alias Pleroma.Workers.BackgroundWorker import Ecto.Query import Pleroma.Web.ActivityPub.Utils @@ -146,7 +149,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) + BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) Notification.create_notifications(activity) SubscriptionNotification.create_notifications(activity) @@ -188,9 +191,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do participations |> Repo.preload(:user) - Enum.each(participations, fn participation -> - Pleroma.Web.Streamer.stream("participation", participation) - end) + Streamer.stream("participation", participations) end def stream_out_participations(%Object{data: %{"context" => context}}, user) do @@ -209,41 +210,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do def stream_out_participations(_, _), do: :noop - def stream_out(activity) do - if activity.data["type"] in ["Create", "Announce", "Delete"] do - object = Object.normalize(activity) - # Do not stream out poll replies - unless object.data["type"] == "Answer" do - Pleroma.Web.Streamer.stream("user", activity) - Pleroma.Web.Streamer.stream("list", activity) - - if get_visibility(activity) == "public" do - Pleroma.Web.Streamer.stream("public", activity) - - if activity.local do - Pleroma.Web.Streamer.stream("public:local", activity) - end - - if activity.data["type"] in ["Create"] do - object.data - |> Map.get("tag", []) - |> Enum.filter(fn tag -> is_bitstring(tag) end) - |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) - - if object.data["attachment"] != [] do - Pleroma.Web.Streamer.stream("public:media", activity) - - if activity.local do - Pleroma.Web.Streamer.stream("public:local:media", activity) - end - end - end - else - if get_visibility(activity) == "direct", - do: Pleroma.Web.Streamer.stream("direct", activity) - end - end - end + def stream_out(%Activity{data: %{"type" => data_type}} = activity) + when data_type in ["Create", "Announce", "Delete"] do + activity + |> Topics.get_activity_topics() + |> Streamer.stream(activity) + end + + def stream_out(_activity) do + :noop end def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 705dbc1c2..01b34fb1d 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do use Pleroma.Web, :controller alias Pleroma.Activity + alias Pleroma.Delivery alias Pleroma.Object alias Pleroma.Object.Fetcher alias Pleroma.User @@ -23,7 +24,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do action_fallback(:errors) - plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object]) + plug( + Pleroma.Plugs.Cache, + [query_params: false, tracking_fun: &__MODULE__.track_object_fetch/2] + when action in [:activity, :object] + ) + plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay]) plug(:set_requester_reachable when action in [:inbox]) plug(:relay_active? when action in [:relay]) @@ -54,6 +60,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do %Object{} = object <- Object.get_cached_by_ap_id(ap_id), {_, true} <- {:public?, Visibility.is_public?(object)} do conn + |> assign(:tracking_fun_data, object.id) |> set_cache_ttl_for(object) |> put_resp_content_type("application/activity+json") |> put_view(ObjectView) @@ -64,6 +71,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do end end + def track_object_fetch(conn, nil), do: conn + + def track_object_fetch(conn, object_id) do + with %{assigns: %{user: %User{id: user_id}}} <- conn do + Delivery.create(object_id, user_id) + end + + conn + end + def object_likes(conn, %{"uuid" => uuid, "page" => page}) do with ap_id <- o_status_url(conn, :object, uuid), %Object{} = object <- Object.get_cached_by_ap_id(ap_id), @@ -99,6 +116,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do %Activity{} = activity <- Activity.normalize(ap_id), {_, true} <- {:public?, Visibility.is_public?(activity)} do conn + |> maybe_set_tracking_data(activity) |> set_cache_ttl_for(activity) |> put_resp_content_type("application/activity+json") |> put_view(ObjectView) @@ -109,6 +127,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do end end + defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do + object_id = Object.normalize(activity).id + assign(conn, :tracking_fun_data, object_id) + end + + defp maybe_set_tracking_data(conn, _activity), do: conn + defp set_cache_ttl_for(conn, %Activity{object: object}) do set_cache_ttl_for(conn, object) end diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index a179dd54d..26b8539fe 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do alias Pleroma.HTTP alias Pleroma.Web.MediaProxy + alias Pleroma.Workers.BackgroundWorker require Logger @@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do url |> Enum.each(fn %{"href" => href} -> - PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) + BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) + BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index c97405690..114251b24 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -5,8 +5,10 @@ defmodule Pleroma.Web.ActivityPub.Publisher do alias Pleroma.Activity alias Pleroma.Config + alias Pleroma.Delivery alias Pleroma.HTTP alias Pleroma.Instances + alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.Transmogrifier @@ -84,6 +86,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do end end + def publish_one(%{actor_id: actor_id} = params) do + actor = User.get_cached_by_id(actor_id) + + params + |> Map.delete(:actor_id) + |> Map.put(:actor, actor) + |> publish_one() + end + defp should_federate?(inbox, public) do if public do true @@ -107,7 +118,18 @@ defmodule Pleroma.Web.ActivityPub.Publisher do {:ok, []} end - Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers + fetchers = + with %Activity{data: %{"type" => "Delete"}} <- activity, + %Object{id: object_id} <- Object.normalize(activity), + fetchers <- User.get_delivered_users_by_object_id(object_id), + _ <- Delivery.delete_all_by_object_id(object_id) do + fetchers + else + _ -> + [] + end + + Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers end defp get_cc_ap_ids(ap_id, recipients) do @@ -159,7 +181,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Publishes an activity with BCC to all relevant peers. """ - def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do + def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) + when is_list(bcc) and bcc != [] do public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -186,7 +209,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since }) @@ -221,7 +244,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since } diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 350b83abb..acb3087d0 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator + alias Pleroma.Workers.TransmogrifierWorker import Ecto.Query @@ -1051,7 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do already_ap <- User.ap_enabled?(user), {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do unless already_ap do - PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) + TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id}) end {:ok, user} diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 47917f5d3..258e56066 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -167,14 +167,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do @spec maybe_federate(any()) :: :ok def maybe_federate(%Activity{local: true} = activity) do if Pleroma.Config.get!([:instance, :federating]) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end - - Pleroma.Web.Federator.publish(activity, priority) + Pleroma.Web.Federator.publish(activity) end :ok |