diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mix/tasks/pleroma/search/indexer.ex | 80 | ||||
-rw-r--r-- | lib/pleroma/search/database_search.ex | 6 | ||||
-rw-r--r-- | lib/pleroma/search/meilisearch.ex | 6 | ||||
-rw-r--r-- | lib/pleroma/search/qdrant_search.ex | 182 | ||||
-rw-r--r-- | lib/pleroma/search/search_backend.ex | 10 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub_controller.ex | 8 | ||||
-rw-r--r-- | lib/pleroma/web/endpoint.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/web/plugs/logger_metadata_path.ex | 12 | ||||
-rw-r--r-- | lib/pleroma/web/plugs/logger_metadata_user.ex | 18 | ||||
-rw-r--r-- | lib/pleroma/web/router.ex | 8 |
10 files changed, 332 insertions, 0 deletions
diff --git a/lib/mix/tasks/pleroma/search/indexer.ex b/lib/mix/tasks/pleroma/search/indexer.ex new file mode 100644 index 000000000..81a9fced6 --- /dev/null +++ b/lib/mix/tasks/pleroma/search/indexer.ex @@ -0,0 +1,80 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Mix.Tasks.Pleroma.Search.Indexer do + import Mix.Pleroma + import Ecto.Query + + alias Pleroma.Workers.SearchIndexingWorker + + def run(["create_index"]) do + start_pleroma() + + with :ok <- Pleroma.Config.get([Pleroma.Search, :module]).create_index() do + IO.puts("Index created") + else + e -> IO.puts("Could not create index: #{inspect(e)}") + end + end + + def run(["drop_index"]) do + start_pleroma() + + with :ok <- Pleroma.Config.get([Pleroma.Search, :module]).drop_index() do + IO.puts("Index dropped") + else + e -> IO.puts("Could not drop index: #{inspect(e)}") + end + end + + def run(["index" | options]) do + {options, [], []} = + OptionParser.parse( + options, + strict: [ + limit: :integer + ] + ) + + start_pleroma() + + limit = Keyword.get(options, :limit, 100_000) + + per_step = 1000 + chunks = max(div(limit, per_step), 1) + + 1..chunks + |> Enum.each(fn step -> + q = + from(a in Pleroma.Activity, + limit: ^per_step, + offset: ^per_step * (^step - 1), + select: [:id], + order_by: [desc: :id] + ) + + {:ok, ids} = + Pleroma.Repo.transaction(fn -> + Pleroma.Repo.stream(q, timeout: :infinity) + |> Enum.map(fn a -> + a.id + end) + end) + + IO.puts("Got #{length(ids)} activities, adding to indexer") + + ids + |> Enum.chunk_every(100) + |> Enum.each(fn chunk -> + IO.puts("Adding #{length(chunk)} activities to indexing queue") + + chunk + |> Enum.map(fn id -> + SearchIndexingWorker.new(%{"op" => "add_to_index", "activity" => id}) + end) + |> Oban.insert_all() + end) + end) + end +end diff --git a/lib/pleroma/search/database_search.ex b/lib/pleroma/search/database_search.ex index 11e99e7f1..c6fe8a9bd 100644 --- a/lib/pleroma/search/database_search.ex +++ b/lib/pleroma/search/database_search.ex @@ -49,6 +49,12 @@ defmodule Pleroma.Search.DatabaseSearch do def remove_from_index(_object), do: :ok @impl true + def create_index, do: :ok + + @impl true + def drop_index, do: :ok + + @impl true def healthcheck_endpoints, do: nil def maybe_restrict_author(query, %User{} = author) do diff --git a/lib/pleroma/search/meilisearch.ex b/lib/pleroma/search/meilisearch.ex index 08c2f3d86..9bba5b30f 100644 --- a/lib/pleroma/search/meilisearch.ex +++ b/lib/pleroma/search/meilisearch.ex @@ -10,6 +10,12 @@ defmodule Pleroma.Search.Meilisearch do @behaviour Pleroma.Search.SearchBackend + @impl true + def create_index, do: :ok + + @impl true + def drop_index, do: :ok + defp meili_headers do private_key = Config.get([Pleroma.Search.Meilisearch, :private_key]) diff --git a/lib/pleroma/search/qdrant_search.ex b/lib/pleroma/search/qdrant_search.ex new file mode 100644 index 000000000..b659bb682 --- /dev/null +++ b/lib/pleroma/search/qdrant_search.ex @@ -0,0 +1,182 @@ +defmodule Pleroma.Search.QdrantSearch do + @behaviour Pleroma.Search.SearchBackend + import Ecto.Query + + alias Pleroma.Activity + alias Pleroma.Config.Getting, as: Config + + alias __MODULE__.OpenAIClient + alias __MODULE__.QdrantClient + + import Pleroma.Search.Meilisearch, only: [object_to_search_data: 1] + import Pleroma.Search.DatabaseSearch, only: [maybe_fetch: 3] + + @impl true + def create_index do + payload = Config.get([Pleroma.Search.QdrantSearch, :qdrant_index_configuration]) + + with {:ok, %{status: 200}} <- QdrantClient.put("/collections/posts", payload) do + :ok + else + e -> {:error, e} + end + end + + @impl true + def drop_index do + with {:ok, %{status: 200}} <- QdrantClient.delete("/collections/posts") do + :ok + else + e -> {:error, e} + end + end + + def get_embedding(text) do + with {:ok, %{body: %{"data" => [%{"embedding" => embedding}]}}} <- + OpenAIClient.post("/v1/embeddings", %{ + input: text, + model: Config.get([Pleroma.Search.QdrantSearch, :openai_model]) + }) do + {:ok, embedding} + else + _ -> + {:error, "Failed to get embedding"} + end + end + + defp actor_from_activity(%{data: %{"actor" => actor}}) do + actor + end + + defp actor_from_activity(_), do: nil + + defp build_index_payload(activity, embedding) do + actor = actor_from_activity(activity) + published_at = activity.data["published"] + + %{ + points: [ + %{ + id: activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!(), + vector: embedding, + payload: %{actor: actor, published_at: published_at} + } + ] + } + end + + defp build_search_payload(embedding, options) do + base = %{ + vector: embedding, + limit: options[:limit] || 20, + offset: options[:offset] || 0 + } + + if author = options[:author] do + Map.put(base, :filter, %{ + must: [%{key: "actor", match: %{value: author.ap_id}}] + }) + else + base + end + end + + @impl true + def add_to_index(activity) do + # This will only index public or unlisted notes + maybe_search_data = object_to_search_data(activity.object) + + if activity.data["type"] == "Create" and maybe_search_data do + with {:ok, embedding} <- get_embedding(maybe_search_data.content), + {:ok, %{status: 200}} <- + QdrantClient.put( + "/collections/posts/points", + build_index_payload(activity, embedding) + ) do + :ok + else + e -> {:error, e} + end + else + :ok + end + end + + @impl true + def remove_from_index(object) do + activity = Activity.get_by_object_ap_id_with_object(object.data["id"]) + id = activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!() + + with {:ok, %{status: 200}} <- + QdrantClient.post("/collections/posts/points/delete", %{"points" => [id]}) do + :ok + else + e -> {:error, e} + end + end + + @impl true + def search(user, original_query, options) do + query = "Represent this sentence for searching relevant passages: #{original_query}" + + with {:ok, embedding} <- get_embedding(query), + {:ok, %{body: %{"result" => result}}} <- + QdrantClient.post( + "/collections/posts/points/search", + build_search_payload(embedding, options) + ) do + ids = + Enum.map(result, fn %{"id" => id} -> + Ecto.UUID.dump!(id) + end) + + from(a in Activity, where: a.id in ^ids) + |> Activity.with_preloaded_object() + |> Activity.restrict_deactivated_users() + |> Ecto.Query.order_by([a], fragment("array_position(?, ?)", ^ids, a.id)) + |> Pleroma.Repo.all() + |> maybe_fetch(user, original_query) + else + _ -> + [] + end + end + + @impl true + def healthcheck_endpoints do + qdrant_health = + Config.get([Pleroma.Search.QdrantSearch, :qdrant_url]) + |> URI.parse() + |> Map.put(:path, "/healthz") + |> URI.to_string() + + openai_health = Config.get([Pleroma.Search.QdrantSearch, :openai_healthcheck_url]) + + [qdrant_health, openai_health] |> Enum.filter(& &1) + end +end + +defmodule Pleroma.Search.QdrantSearch.OpenAIClient do + use Tesla + alias Pleroma.Config.Getting, as: Config + + plug(Tesla.Middleware.BaseUrl, Config.get([Pleroma.Search.QdrantSearch, :openai_url])) + plug(Tesla.Middleware.JSON) + + plug(Tesla.Middleware.Headers, [ + {"Authorization", + "Bearer #{Pleroma.Config.get([Pleroma.Search.QdrantSearch, :openai_api_key])}"} + ]) +end + +defmodule Pleroma.Search.QdrantSearch.QdrantClient do + use Tesla + alias Pleroma.Config.Getting, as: Config + + plug(Tesla.Middleware.BaseUrl, Config.get([Pleroma.Search.QdrantSearch, :qdrant_url])) + plug(Tesla.Middleware.JSON) + + plug(Tesla.Middleware.Headers, [ + {"api-key", Pleroma.Config.get([Pleroma.Search.QdrantSearch, :qdrant_api_key])} + ]) +end diff --git a/lib/pleroma/search/search_backend.ex b/lib/pleroma/search/search_backend.ex index 13c887bc2..f4ed13c36 100644 --- a/lib/pleroma/search/search_backend.ex +++ b/lib/pleroma/search/search_backend.ex @@ -23,6 +23,16 @@ defmodule Pleroma.Search.SearchBackend do @callback remove_from_index(object :: Pleroma.Object.t()) :: :ok | {:error, any()} @doc """ + Create the index + """ + @callback create_index() :: :ok | {:error, any()} + + @doc """ + Drop the index + """ + @callback drop_index() :: :ok | {:error, any()} + + @doc """ Healthcheck endpoints of search backend infrastructure to monitor for controlling processing of jobs in the Oban queue. diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index e38a94966..d2b2cae0b 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -52,6 +52,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do when action in [:activity, :object] ) + plug(:log_inbox_metadata when action in [:inbox]) plug(:set_requester_reachable when action in [:inbox]) plug(:relay_active? when action in [:relay]) @@ -521,6 +522,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do conn end + defp log_inbox_metadata(conn = %{params: %{"actor" => actor, "type" => type}}, _) do + Logger.metadata(actor: actor, type: type) + conn + end + + defp log_inbox_metadata(conn, _), do: conn + def upload_media(%{assigns: %{user: %User{} = user}} = conn, %{"file" => file} = data) do with {:ok, object} <- ActivityPub.upload( diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex index 2e2104904..fef907ace 100644 --- a/lib/pleroma/web/endpoint.ex +++ b/lib/pleroma/web/endpoint.ex @@ -38,6 +38,8 @@ defmodule Pleroma.Web.Endpoint do plug(Plug.Telemetry, event_prefix: [:phoenix, :endpoint]) + plug(Pleroma.Web.Plugs.LoggerMetadataPath) + plug(Pleroma.Web.Plugs.SetLocalePlug) plug(CORSPlug) plug(Pleroma.Web.Plugs.HTTPSecurityPlug) diff --git a/lib/pleroma/web/plugs/logger_metadata_path.ex b/lib/pleroma/web/plugs/logger_metadata_path.ex new file mode 100644 index 000000000..a5553cfc8 --- /dev/null +++ b/lib/pleroma/web/plugs/logger_metadata_path.ex @@ -0,0 +1,12 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.Plugs.LoggerMetadataPath do + def init(opts), do: opts + + def call(conn, _) do + Logger.metadata(path: conn.request_path) + conn + end +end diff --git a/lib/pleroma/web/plugs/logger_metadata_user.ex b/lib/pleroma/web/plugs/logger_metadata_user.ex new file mode 100644 index 000000000..6a5c0041d --- /dev/null +++ b/lib/pleroma/web/plugs/logger_metadata_user.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.Plugs.LoggerMetadataUser do + alias Pleroma.User + + def init(opts), do: opts + + def call(%{assigns: %{user: user = %User{}}} = conn, _) do + Logger.metadata(user: user.nickname) + conn + end + + def call(conn, _) do + conn + end +end diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index 368a04df0..56c457e90 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -29,6 +29,7 @@ defmodule Pleroma.Web.Router do pipeline :browser do plug(:accepts, ["html"]) plug(:fetch_session) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end pipeline :oauth do @@ -67,12 +68,14 @@ defmodule Pleroma.Web.Router do plug(:fetch_session) plug(:authenticate) plug(OpenApiSpex.Plug.PutApiSpec, module: Pleroma.Web.ApiSpec) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end pipeline :no_auth_or_privacy_expectations_api do plug(:base_api) plug(:after_auth) plug(Pleroma.Web.Plugs.IdempotencyPlug) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end # Pipeline for app-related endpoints (no user auth checks — app-bound tokens must be supported) @@ -83,12 +86,14 @@ defmodule Pleroma.Web.Router do pipeline :api do plug(:expect_public_instance_or_user_authentication) plug(:no_auth_or_privacy_expectations_api) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end pipeline :authenticated_api do plug(:expect_user_authentication) plug(:no_auth_or_privacy_expectations_api) plug(Pleroma.Web.Plugs.EnsureAuthenticatedPlug) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end pipeline :admin_api do @@ -99,6 +104,7 @@ defmodule Pleroma.Web.Router do plug(Pleroma.Web.Plugs.EnsureAuthenticatedPlug) plug(Pleroma.Web.Plugs.UserIsStaffPlug) plug(Pleroma.Web.Plugs.IdempotencyPlug) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end pipeline :require_admin do @@ -179,6 +185,7 @@ defmodule Pleroma.Web.Router do plug(:browser) plug(:authenticate) plug(Pleroma.Web.Plugs.EnsureUserTokenAssignsPlug) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end pipeline :well_known do @@ -193,6 +200,7 @@ defmodule Pleroma.Web.Router do pipeline :pleroma_api do plug(:accepts, ["html", "json"]) plug(OpenApiSpex.Plug.PutApiSpec, module: Pleroma.Web.ApiSpec) + plug(Pleroma.Web.Plugs.LoggerMetadataUser) end pipeline :mailbox_preview do |