diff options
Diffstat (limited to 'lib/pleroma/web/activity_pub')
-rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub.ex | 19 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex | 56 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/transmogrifier.ex | 123 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/utils.ex | 35 |
4 files changed, 172 insertions, 61 deletions
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index c0e3d1478..55315d66e 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -189,6 +189,22 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end) end + def stream_out_participations(%Object{data: %{"context" => context}}, user) do + with %Conversation{} = conversation <- Conversation.get_for_ap_id(context), + conversation = Repo.preload(conversation, :participations), + last_activity_id = + fetch_latest_activity_id_for_context(conversation.ap_id, %{ + "user" => user, + "blocking_user" => user + }) do + if last_activity_id do + stream_out_participations(conversation.participations) + end + end + end + + def stream_out_participations(_, _), do: :noop + def stream_out(activity) do public = "https://www.w3.org/ns/activitystreams#Public" @@ -401,7 +417,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do "to" => to, "deleted_activity_id" => activity && activity.id }, - {:ok, activity} <- insert(data, local), + {:ok, activity} <- insert(data, local, false), + stream_out_participations(object, user), _ <- decrease_replies_count_if_reply(object), # Changing note count prior to enqueuing federation task in order to avoid # race conditions on updating user.info diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex new file mode 100644 index 000000000..01d21a299 --- /dev/null +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -0,0 +1,56 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do + @moduledoc "Preloads any attachments in the MediaProxy cache by prefetching them" + @behaviour Pleroma.Web.ActivityPub.MRF + + alias Pleroma.HTTP + alias Pleroma.Web.MediaProxy + + require Logger + + @hackney_options [ + pool: :media, + recv_timeout: 10_000 + ] + + def perform(:prefetch, url) do + Logger.info("Prefetching #{inspect(url)}") + + url + |> MediaProxy.url() + |> HTTP.get([], adapter: @hackney_options) + end + + def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do + Enum.each(attachments, fn + %{"url" => url} when is_list(url) -> + url + |> Enum.each(fn + %{"href" => href} -> + PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) + + x -> + Logger.debug("Unhandled attachment URL object #{inspect(x)}") + end) + + x -> + Logger.debug("Unhandled attachment #{inspect(x)}") + end) + end + + @impl true + def filter( + %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message + ) + when is_list(attachments) and length(attachments) > 0 do + PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) + + {:ok, message} + end + + @impl true + def filter(message), do: {:ok, message} +end diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 3bb8b40b5..543d4bb7d 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -14,6 +14,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility + alias Pleroma.Web.Federator import Ecto.Query @@ -22,20 +23,20 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do @doc """ Modifies an incoming AP object (mastodon format) to our internal format. """ - def fix_object(object) do + def fix_object(object, options \\ []) do object |> fix_actor |> fix_url |> fix_attachments |> fix_context - |> fix_in_reply_to + |> fix_in_reply_to(options) |> fix_emoji |> fix_tag |> fix_content_map |> fix_likes |> fix_addressing |> fix_summary - |> fix_type + |> fix_type(options) end def fix_summary(%{"summary" => nil} = object) do @@ -164,7 +165,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do object end - def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object) + def fix_in_reply_to(object, options \\ []) + + def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object, options) when not is_nil(in_reply_to) do in_reply_to_id = cond do @@ -182,28 +185,34 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do "" end - case get_obj_helper(in_reply_to_id) do - {:ok, replied_object} -> - with %Activity{} = _activity <- - Activity.get_create_by_object_ap_id(replied_object.data["id"]) do - object - |> Map.put("inReplyTo", replied_object.data["id"]) - |> Map.put("inReplyToAtomUri", object["inReplyToAtomUri"] || in_reply_to_id) - |> Map.put("conversation", replied_object.data["context"] || object["conversation"]) - |> Map.put("context", replied_object.data["context"] || object["conversation"]) - else - e -> - Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") + object = Map.put(object, "inReplyToAtomUri", in_reply_to_id) + + if Federator.allowed_incoming_reply_depth?(options[:depth]) do + case get_obj_helper(in_reply_to_id, options) do + {:ok, replied_object} -> + with %Activity{} = _activity <- + Activity.get_create_by_object_ap_id(replied_object.data["id"]) do object - end + |> Map.put("inReplyTo", replied_object.data["id"]) + |> Map.put("inReplyToAtomUri", object["inReplyToAtomUri"] || in_reply_to_id) + |> Map.put("conversation", replied_object.data["context"] || object["conversation"]) + |> Map.put("context", replied_object.data["context"] || object["conversation"]) + else + e -> + Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") + object + end - e -> - Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") - object + e -> + Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") + object + end + else + object end end - def fix_in_reply_to(object), do: object + def fix_in_reply_to(object, _options), do: object def fix_context(object) do context = object["context"] || object["conversation"] || Utils.generate_context_id() @@ -336,8 +345,13 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def fix_content_map(object), do: object - def fix_type(%{"inReplyTo" => reply_id} = object) when is_binary(reply_id) do - reply = Object.normalize(reply_id) + def fix_type(object, options \\ []) + + def fix_type(%{"inReplyTo" => reply_id} = object, options) when is_binary(reply_id) do + reply = + if Federator.allowed_incoming_reply_depth?(options[:depth]) do + Object.normalize(reply_id, true) + end if reply && (reply.data["type"] == "Question" and object["name"]) do Map.put(object, "type", "Answer") @@ -346,7 +360,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end end - def fix_type(object), do: object + def fix_type(object, _), do: object defp mastodon_follow_hack(%{"id" => id, "actor" => follower_id}, followed) do with true <- id =~ "follows", @@ -374,9 +388,11 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end end + def handle_incoming(data, options \\ []) + # Flag objects are placed ahead of the ID check because Mastodon 2.8 and earlier send them # with nil ID. - def handle_incoming(%{"type" => "Flag", "object" => objects, "actor" => actor} = data) do + def handle_incoming(%{"type" => "Flag", "object" => objects, "actor" => actor} = data, _options) do with context <- data["context"] || Utils.generate_context_id(), content <- data["content"] || "", %User{} = actor <- User.get_cached_by_ap_id(actor), @@ -409,15 +425,19 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end # disallow objects with bogus IDs - def handle_incoming(%{"id" => nil}), do: :error - def handle_incoming(%{"id" => ""}), do: :error + def handle_incoming(%{"id" => nil}, _options), do: :error + def handle_incoming(%{"id" => ""}, _options), do: :error # length of https:// = 8, should validate better, but good enough for now. - def handle_incoming(%{"id" => id}) when not (is_binary(id) and length(id) > 8), do: :error + def handle_incoming(%{"id" => id}, _options) when not (is_binary(id) and length(id) > 8), + do: :error # TODO: validate those with a Ecto scheme # - tags # - emoji - def handle_incoming(%{"type" => "Create", "object" => %{"type" => objtype} = object} = data) + def handle_incoming( + %{"type" => "Create", "object" => %{"type" => objtype} = object} = data, + options + ) when objtype in ["Article", "Note", "Video", "Page", "Question", "Answer"] do actor = Containment.get_actor(data) @@ -427,7 +447,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do with nil <- Activity.get_create_by_object_ap_id(object["id"]), {:ok, %User{} = user} <- User.get_or_fetch_by_ap_id(data["actor"]) do - object = fix_object(data["object"]) + options = Keyword.put(options, :depth, (options[:depth] || 0) + 1) + object = fix_object(data["object"], options) params = %{ to: data["to"], @@ -452,7 +473,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def handle_incoming( - %{"type" => "Follow", "object" => followed, "actor" => follower, "id" => id} = data + %{"type" => "Follow", "object" => followed, "actor" => follower, "id" => id} = data, + _options ) do with %User{local: true} = followed <- User.get_cached_by_ap_id(followed), {:ok, %User{} = follower} <- User.get_or_fetch_by_ap_id(follower), @@ -503,7 +525,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def handle_incoming( - %{"type" => "Accept", "object" => follow_object, "actor" => _actor, "id" => _id} = data + %{"type" => "Accept", "object" => follow_object, "actor" => _actor, "id" => _id} = data, + _options ) do with actor <- Containment.get_actor(data), {:ok, %User{} = followed} <- User.get_or_fetch_by_ap_id(actor), @@ -524,7 +547,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def handle_incoming( - %{"type" => "Reject", "object" => follow_object, "actor" => _actor, "id" => _id} = data + %{"type" => "Reject", "object" => follow_object, "actor" => _actor, "id" => _id} = data, + _options ) do with actor <- Containment.get_actor(data), {:ok, %User{} = followed} <- User.get_or_fetch_by_ap_id(actor), @@ -548,7 +572,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def handle_incoming( - %{"type" => "Like", "object" => object_id, "actor" => _actor, "id" => id} = data + %{"type" => "Like", "object" => object_id, "actor" => _actor, "id" => id} = data, + _options ) do with actor <- Containment.get_actor(data), {:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor), @@ -561,7 +586,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def handle_incoming( - %{"type" => "Announce", "object" => object_id, "actor" => _actor, "id" => id} = data + %{"type" => "Announce", "object" => object_id, "actor" => _actor, "id" => id} = data, + _options ) do with actor <- Containment.get_actor(data), {:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor), @@ -576,7 +602,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def handle_incoming( %{"type" => "Update", "object" => %{"type" => object_type} = object, "actor" => actor_id} = - data + data, + _options ) when object_type in ["Person", "Application", "Service", "Organization"] do with %User{ap_id: ^actor_id} = actor <- User.get_cached_by_ap_id(object["id"]) do @@ -614,7 +641,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do # an error or a tombstone. This would allow us to verify that a deletion actually took # place. def handle_incoming( - %{"type" => "Delete", "object" => object_id, "actor" => _actor, "id" => _id} = data + %{"type" => "Delete", "object" => object_id, "actor" => _actor, "id" => _id} = data, + _options ) do object_id = Utils.get_ap_id(object_id) @@ -635,7 +663,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do "object" => %{"type" => "Announce", "object" => object_id}, "actor" => _actor, "id" => id - } = data + } = data, + _options ) do with actor <- Containment.get_actor(data), {:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor), @@ -653,7 +682,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do "object" => %{"type" => "Follow", "object" => followed}, "actor" => follower, "id" => id - } = _data + } = _data, + _options ) do with %User{local: true} = followed <- User.get_cached_by_ap_id(followed), {:ok, %User{} = follower} <- User.get_or_fetch_by_ap_id(follower), @@ -671,7 +701,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do "object" => %{"type" => "Block", "object" => blocked}, "actor" => blocker, "id" => id - } = _data + } = _data, + _options ) do with true <- Pleroma.Config.get([:activitypub, :accept_blocks]), %User{local: true} = blocked <- User.get_cached_by_ap_id(blocked), @@ -685,7 +716,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def handle_incoming( - %{"type" => "Block", "object" => blocked, "actor" => blocker, "id" => id} = _data + %{"type" => "Block", "object" => blocked, "actor" => blocker, "id" => id} = _data, + _options ) do with true <- Pleroma.Config.get([:activitypub, :accept_blocks]), %User{local: true} = blocked = User.get_cached_by_ap_id(blocked), @@ -705,7 +737,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do "object" => %{"type" => "Like", "object" => object_id}, "actor" => _actor, "id" => id - } = data + } = data, + _options ) do with actor <- Containment.get_actor(data), {:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor), @@ -717,10 +750,10 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end end - def handle_incoming(_), do: :error + def handle_incoming(_, _), do: :error - def get_obj_helper(id) do - if object = Object.normalize(id), do: {:ok, object}, else: nil + def get_obj_helper(id, options \\ []) do + if object = Object.normalize(id, true, options), do: {:ok, object}, else: nil end def set_reply_to_uri(%{"inReplyTo" => in_reply_to} = object) when is_binary(in_reply_to) do diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 10ff572a2..4288ea4c8 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -151,16 +151,18 @@ defmodule Pleroma.Web.ActivityPub.Utils do def create_context(context) do context = context || generate_id("contexts") - changeset = Object.context_mapping(context) - case Repo.insert(changeset) do - {:ok, object} -> - object + # Ecto has problems accessing the constraint inside the jsonb, + # so we explicitly check for the existed object before insert + object = Object.get_cached_by_ap_id(context) - # This should be solved by an upsert, but it seems ecto - # has problems accessing the constraint inside the jsonb. - {:error, _} -> - Object.get_cached_by_ap_id(context) + with true <- is_nil(object), + changeset <- Object.context_mapping(context), + {:ok, inserted_object} <- Repo.insert(changeset) do + inserted_object + else + _ -> + object end end @@ -168,14 +170,17 @@ defmodule Pleroma.Web.ActivityPub.Utils do Enqueues an activity for federation if it's local """ def maybe_federate(%Activity{local: true} = activity) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end + if Pleroma.Config.get!([:instance, :federating]) do + priority = + case activity.data["type"] do + "Delete" -> 10 + "Create" -> 1 + _ -> 5 + end + + Pleroma.Web.Federator.publish(activity, priority) + end - Pleroma.Web.Federator.publish(activity, priority) :ok end |