summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mix/tasks/pleroma/search/indexer.ex80
-rw-r--r--lib/pleroma/search/database_search.ex6
-rw-r--r--lib/pleroma/search/meilisearch.ex6
-rw-r--r--lib/pleroma/search/qdrant_search.ex182
-rw-r--r--lib/pleroma/search/search_backend.ex10
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub_controller.ex8
-rw-r--r--lib/pleroma/web/endpoint.ex2
-rw-r--r--lib/pleroma/web/plugs/logger_metadata_path.ex12
-rw-r--r--lib/pleroma/web/plugs/logger_metadata_user.ex18
-rw-r--r--lib/pleroma/web/router.ex8
10 files changed, 332 insertions, 0 deletions
diff --git a/lib/mix/tasks/pleroma/search/indexer.ex b/lib/mix/tasks/pleroma/search/indexer.ex
new file mode 100644
index 000000000..81a9fced6
--- /dev/null
+++ b/lib/mix/tasks/pleroma/search/indexer.ex
@@ -0,0 +1,80 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Mix.Tasks.Pleroma.Search.Indexer do
+ import Mix.Pleroma
+ import Ecto.Query
+
+ alias Pleroma.Workers.SearchIndexingWorker
+
+ def run(["create_index"]) do
+ start_pleroma()
+
+ with :ok <- Pleroma.Config.get([Pleroma.Search, :module]).create_index() do
+ IO.puts("Index created")
+ else
+ e -> IO.puts("Could not create index: #{inspect(e)}")
+ end
+ end
+
+ def run(["drop_index"]) do
+ start_pleroma()
+
+ with :ok <- Pleroma.Config.get([Pleroma.Search, :module]).drop_index() do
+ IO.puts("Index dropped")
+ else
+ e -> IO.puts("Could not drop index: #{inspect(e)}")
+ end
+ end
+
+ def run(["index" | options]) do
+ {options, [], []} =
+ OptionParser.parse(
+ options,
+ strict: [
+ limit: :integer
+ ]
+ )
+
+ start_pleroma()
+
+ limit = Keyword.get(options, :limit, 100_000)
+
+ per_step = 1000
+ chunks = max(div(limit, per_step), 1)
+
+ 1..chunks
+ |> Enum.each(fn step ->
+ q =
+ from(a in Pleroma.Activity,
+ limit: ^per_step,
+ offset: ^per_step * (^step - 1),
+ select: [:id],
+ order_by: [desc: :id]
+ )
+
+ {:ok, ids} =
+ Pleroma.Repo.transaction(fn ->
+ Pleroma.Repo.stream(q, timeout: :infinity)
+ |> Enum.map(fn a ->
+ a.id
+ end)
+ end)
+
+ IO.puts("Got #{length(ids)} activities, adding to indexer")
+
+ ids
+ |> Enum.chunk_every(100)
+ |> Enum.each(fn chunk ->
+ IO.puts("Adding #{length(chunk)} activities to indexing queue")
+
+ chunk
+ |> Enum.map(fn id ->
+ SearchIndexingWorker.new(%{"op" => "add_to_index", "activity" => id})
+ end)
+ |> Oban.insert_all()
+ end)
+ end)
+ end
+end
diff --git a/lib/pleroma/search/database_search.ex b/lib/pleroma/search/database_search.ex
index 11e99e7f1..c6fe8a9bd 100644
--- a/lib/pleroma/search/database_search.ex
+++ b/lib/pleroma/search/database_search.ex
@@ -49,6 +49,12 @@ defmodule Pleroma.Search.DatabaseSearch do
def remove_from_index(_object), do: :ok
@impl true
+ def create_index, do: :ok
+
+ @impl true
+ def drop_index, do: :ok
+
+ @impl true
def healthcheck_endpoints, do: nil
def maybe_restrict_author(query, %User{} = author) do
diff --git a/lib/pleroma/search/meilisearch.ex b/lib/pleroma/search/meilisearch.ex
index 08c2f3d86..9bba5b30f 100644
--- a/lib/pleroma/search/meilisearch.ex
+++ b/lib/pleroma/search/meilisearch.ex
@@ -10,6 +10,12 @@ defmodule Pleroma.Search.Meilisearch do
@behaviour Pleroma.Search.SearchBackend
+ @impl true
+ def create_index, do: :ok
+
+ @impl true
+ def drop_index, do: :ok
+
defp meili_headers do
private_key = Config.get([Pleroma.Search.Meilisearch, :private_key])
diff --git a/lib/pleroma/search/qdrant_search.ex b/lib/pleroma/search/qdrant_search.ex
new file mode 100644
index 000000000..b659bb682
--- /dev/null
+++ b/lib/pleroma/search/qdrant_search.ex
@@ -0,0 +1,182 @@
+defmodule Pleroma.Search.QdrantSearch do
+ @behaviour Pleroma.Search.SearchBackend
+ import Ecto.Query
+
+ alias Pleroma.Activity
+ alias Pleroma.Config.Getting, as: Config
+
+ alias __MODULE__.OpenAIClient
+ alias __MODULE__.QdrantClient
+
+ import Pleroma.Search.Meilisearch, only: [object_to_search_data: 1]
+ import Pleroma.Search.DatabaseSearch, only: [maybe_fetch: 3]
+
+ @impl true
+ def create_index do
+ payload = Config.get([Pleroma.Search.QdrantSearch, :qdrant_index_configuration])
+
+ with {:ok, %{status: 200}} <- QdrantClient.put("/collections/posts", payload) do
+ :ok
+ else
+ e -> {:error, e}
+ end
+ end
+
+ @impl true
+ def drop_index do
+ with {:ok, %{status: 200}} <- QdrantClient.delete("/collections/posts") do
+ :ok
+ else
+ e -> {:error, e}
+ end
+ end
+
+ def get_embedding(text) do
+ with {:ok, %{body: %{"data" => [%{"embedding" => embedding}]}}} <-
+ OpenAIClient.post("/v1/embeddings", %{
+ input: text,
+ model: Config.get([Pleroma.Search.QdrantSearch, :openai_model])
+ }) do
+ {:ok, embedding}
+ else
+ _ ->
+ {:error, "Failed to get embedding"}
+ end
+ end
+
+ defp actor_from_activity(%{data: %{"actor" => actor}}) do
+ actor
+ end
+
+ defp actor_from_activity(_), do: nil
+
+ defp build_index_payload(activity, embedding) do
+ actor = actor_from_activity(activity)
+ published_at = activity.data["published"]
+
+ %{
+ points: [
+ %{
+ id: activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!(),
+ vector: embedding,
+ payload: %{actor: actor, published_at: published_at}
+ }
+ ]
+ }
+ end
+
+ defp build_search_payload(embedding, options) do
+ base = %{
+ vector: embedding,
+ limit: options[:limit] || 20,
+ offset: options[:offset] || 0
+ }
+
+ if author = options[:author] do
+ Map.put(base, :filter, %{
+ must: [%{key: "actor", match: %{value: author.ap_id}}]
+ })
+ else
+ base
+ end
+ end
+
+ @impl true
+ def add_to_index(activity) do
+ # This will only index public or unlisted notes
+ maybe_search_data = object_to_search_data(activity.object)
+
+ if activity.data["type"] == "Create" and maybe_search_data do
+ with {:ok, embedding} <- get_embedding(maybe_search_data.content),
+ {:ok, %{status: 200}} <-
+ QdrantClient.put(
+ "/collections/posts/points",
+ build_index_payload(activity, embedding)
+ ) do
+ :ok
+ else
+ e -> {:error, e}
+ end
+ else
+ :ok
+ end
+ end
+
+ @impl true
+ def remove_from_index(object) do
+ activity = Activity.get_by_object_ap_id_with_object(object.data["id"])
+ id = activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!()
+
+ with {:ok, %{status: 200}} <-
+ QdrantClient.post("/collections/posts/points/delete", %{"points" => [id]}) do
+ :ok
+ else
+ e -> {:error, e}
+ end
+ end
+
+ @impl true
+ def search(user, original_query, options) do
+ query = "Represent this sentence for searching relevant passages: #{original_query}"
+
+ with {:ok, embedding} <- get_embedding(query),
+ {:ok, %{body: %{"result" => result}}} <-
+ QdrantClient.post(
+ "/collections/posts/points/search",
+ build_search_payload(embedding, options)
+ ) do
+ ids =
+ Enum.map(result, fn %{"id" => id} ->
+ Ecto.UUID.dump!(id)
+ end)
+
+ from(a in Activity, where: a.id in ^ids)
+ |> Activity.with_preloaded_object()
+ |> Activity.restrict_deactivated_users()
+ |> Ecto.Query.order_by([a], fragment("array_position(?, ?)", ^ids, a.id))
+ |> Pleroma.Repo.all()
+ |> maybe_fetch(user, original_query)
+ else
+ _ ->
+ []
+ end
+ end
+
+ @impl true
+ def healthcheck_endpoints do
+ qdrant_health =
+ Config.get([Pleroma.Search.QdrantSearch, :qdrant_url])
+ |> URI.parse()
+ |> Map.put(:path, "/healthz")
+ |> URI.to_string()
+
+ openai_health = Config.get([Pleroma.Search.QdrantSearch, :openai_healthcheck_url])
+
+ [qdrant_health, openai_health] |> Enum.filter(& &1)
+ end
+end
+
+defmodule Pleroma.Search.QdrantSearch.OpenAIClient do
+ use Tesla
+ alias Pleroma.Config.Getting, as: Config
+
+ plug(Tesla.Middleware.BaseUrl, Config.get([Pleroma.Search.QdrantSearch, :openai_url]))
+ plug(Tesla.Middleware.JSON)
+
+ plug(Tesla.Middleware.Headers, [
+ {"Authorization",
+ "Bearer #{Pleroma.Config.get([Pleroma.Search.QdrantSearch, :openai_api_key])}"}
+ ])
+end
+
+defmodule Pleroma.Search.QdrantSearch.QdrantClient do
+ use Tesla
+ alias Pleroma.Config.Getting, as: Config
+
+ plug(Tesla.Middleware.BaseUrl, Config.get([Pleroma.Search.QdrantSearch, :qdrant_url]))
+ plug(Tesla.Middleware.JSON)
+
+ plug(Tesla.Middleware.Headers, [
+ {"api-key", Pleroma.Config.get([Pleroma.Search.QdrantSearch, :qdrant_api_key])}
+ ])
+end
diff --git a/lib/pleroma/search/search_backend.ex b/lib/pleroma/search/search_backend.ex
index 13c887bc2..f4ed13c36 100644
--- a/lib/pleroma/search/search_backend.ex
+++ b/lib/pleroma/search/search_backend.ex
@@ -23,6 +23,16 @@ defmodule Pleroma.Search.SearchBackend do
@callback remove_from_index(object :: Pleroma.Object.t()) :: :ok | {:error, any()}
@doc """
+ Create the index
+ """
+ @callback create_index() :: :ok | {:error, any()}
+
+ @doc """
+ Drop the index
+ """
+ @callback drop_index() :: :ok | {:error, any()}
+
+ @doc """
Healthcheck endpoints of search backend infrastructure to monitor for controlling
processing of jobs in the Oban queue.
diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
index e38a94966..d2b2cae0b 100644
--- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
@@ -52,6 +52,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
when action in [:activity, :object]
)
+ plug(:log_inbox_metadata when action in [:inbox])
plug(:set_requester_reachable when action in [:inbox])
plug(:relay_active? when action in [:relay])
@@ -521,6 +522,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
conn
end
+ defp log_inbox_metadata(conn = %{params: %{"actor" => actor, "type" => type}}, _) do
+ Logger.metadata(actor: actor, type: type)
+ conn
+ end
+
+ defp log_inbox_metadata(conn, _), do: conn
+
def upload_media(%{assigns: %{user: %User{} = user}} = conn, %{"file" => file} = data) do
with {:ok, object} <-
ActivityPub.upload(
diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex
index 2e2104904..fef907ace 100644
--- a/lib/pleroma/web/endpoint.ex
+++ b/lib/pleroma/web/endpoint.ex
@@ -38,6 +38,8 @@ defmodule Pleroma.Web.Endpoint do
plug(Plug.Telemetry, event_prefix: [:phoenix, :endpoint])
+ plug(Pleroma.Web.Plugs.LoggerMetadataPath)
+
plug(Pleroma.Web.Plugs.SetLocalePlug)
plug(CORSPlug)
plug(Pleroma.Web.Plugs.HTTPSecurityPlug)
diff --git a/lib/pleroma/web/plugs/logger_metadata_path.ex b/lib/pleroma/web/plugs/logger_metadata_path.ex
new file mode 100644
index 000000000..a5553cfc8
--- /dev/null
+++ b/lib/pleroma/web/plugs/logger_metadata_path.ex
@@ -0,0 +1,12 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.Plugs.LoggerMetadataPath do
+ def init(opts), do: opts
+
+ def call(conn, _) do
+ Logger.metadata(path: conn.request_path)
+ conn
+ end
+end
diff --git a/lib/pleroma/web/plugs/logger_metadata_user.ex b/lib/pleroma/web/plugs/logger_metadata_user.ex
new file mode 100644
index 000000000..6a5c0041d
--- /dev/null
+++ b/lib/pleroma/web/plugs/logger_metadata_user.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.Plugs.LoggerMetadataUser do
+ alias Pleroma.User
+
+ def init(opts), do: opts
+
+ def call(%{assigns: %{user: user = %User{}}} = conn, _) do
+ Logger.metadata(user: user.nickname)
+ conn
+ end
+
+ def call(conn, _) do
+ conn
+ end
+end
diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex
index 368a04df0..56c457e90 100644
--- a/lib/pleroma/web/router.ex
+++ b/lib/pleroma/web/router.ex
@@ -29,6 +29,7 @@ defmodule Pleroma.Web.Router do
pipeline :browser do
plug(:accepts, ["html"])
plug(:fetch_session)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
pipeline :oauth do
@@ -67,12 +68,14 @@ defmodule Pleroma.Web.Router do
plug(:fetch_session)
plug(:authenticate)
plug(OpenApiSpex.Plug.PutApiSpec, module: Pleroma.Web.ApiSpec)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
pipeline :no_auth_or_privacy_expectations_api do
plug(:base_api)
plug(:after_auth)
plug(Pleroma.Web.Plugs.IdempotencyPlug)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
# Pipeline for app-related endpoints (no user auth checks — app-bound tokens must be supported)
@@ -83,12 +86,14 @@ defmodule Pleroma.Web.Router do
pipeline :api do
plug(:expect_public_instance_or_user_authentication)
plug(:no_auth_or_privacy_expectations_api)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
pipeline :authenticated_api do
plug(:expect_user_authentication)
plug(:no_auth_or_privacy_expectations_api)
plug(Pleroma.Web.Plugs.EnsureAuthenticatedPlug)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
pipeline :admin_api do
@@ -99,6 +104,7 @@ defmodule Pleroma.Web.Router do
plug(Pleroma.Web.Plugs.EnsureAuthenticatedPlug)
plug(Pleroma.Web.Plugs.UserIsStaffPlug)
plug(Pleroma.Web.Plugs.IdempotencyPlug)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
pipeline :require_admin do
@@ -179,6 +185,7 @@ defmodule Pleroma.Web.Router do
plug(:browser)
plug(:authenticate)
plug(Pleroma.Web.Plugs.EnsureUserTokenAssignsPlug)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
pipeline :well_known do
@@ -193,6 +200,7 @@ defmodule Pleroma.Web.Router do
pipeline :pleroma_api do
plug(:accepts, ["html", "json"])
plug(OpenApiSpex.Plug.PutApiSpec, module: Pleroma.Web.ApiSpec)
+ plug(Pleroma.Web.Plugs.LoggerMetadataUser)
end
pipeline :mailbox_preview do