summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/application.ex4
-rw-r--r--lib/pleroma/web/activity_pub/publisher.ex16
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex9
-rw-r--r--lib/pleroma/web/federator/federator.ex146
-rw-r--r--lib/pleroma/web/federator/publisher.ex28
-rw-r--r--lib/pleroma/web/federator/retry_queue.ex239
-rw-r--r--lib/pleroma/web/salmon/salmon.ex11
-rw-r--r--lib/pleroma/workers/publisher.ex35
-rw-r--r--lib/pleroma/workers/receiver.ex61
-rw-r--r--lib/pleroma/workers/subscriber.ex44
10 files changed, 209 insertions, 384 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 00b06f723..5550a4902 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -120,8 +120,8 @@ defmodule Pleroma.Application do
hackney_pool_children() ++
[
%{
- id: Pleroma.Web.Federator.RetryQueue,
- start: {Pleroma.Web.Federator.RetryQueue, :start_link, []}
+ id: Oban,
+ start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}
},
%{
id: Pleroma.Web.OAuth.Token.CleanWorker,
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index 46edab0bd..29f3221d1 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -85,6 +85,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
end
end
+ def publish_one(%{actor_id: actor_id} = params) do
+ actor = User.get_by_id(actor_id)
+
+ params
+ |> Map.delete(:actor_id)
+ |> Map.put(:actor, actor)
+ |> publish_one()
+ end
+
defp should_federate?(inbox, public) do
if public do
true
@@ -160,7 +169,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Publishes an activity with BCC to all relevant peers.
"""
- def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+ when is_list(bcc) and bcc != [] do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
@@ -187,7 +197,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
})
@@ -222,7 +232,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
%{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
}
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index fc5305c58..4f68acc78 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
"""
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
- priority =
- case activity.data["type"] do
- "Delete" -> 10
- "Create" -> 1
- _ -> 5
- end
-
- Pleroma.Web.Federator.publish(activity, priority)
+ Pleroma.Web.Federator.publish(activity)
end
:ok
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index f4f9e83e0..bb9eadfee 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -3,23 +3,15 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
- alias Pleroma.Activity
- alias Pleroma.Object.Containment
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Transmogrifier
- alias Pleroma.Web.ActivityPub.Utils
- alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.Federator.RetryQueue
- alias Pleroma.Web.OStatus
- alias Pleroma.Web.Websub
+ alias Pleroma.Workers.Publisher, as: PublisherWorker
+ alias Pleroma.Workers.Receiver, as: ReceiverWorker
+ alias Pleroma.Workers.Subscriber, as: SubscriberWorker
require Logger
def init do
# 1 minute
- Process.sleep(1000 * 60)
- refresh_subscriptions()
+ refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -37,124 +29,50 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ %{"op" => "incoming_doc", "body" => doc}
+ |> ReceiverWorker.new(worker_args(:federator_incoming))
+ |> Pleroma.Repo.insert()
end
def incoming_ap_doc(params) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ %{"op" => "incoming_ap_doc", "params" => params}
+ |> ReceiverWorker.new(worker_args(:federator_incoming))
+ |> Pleroma.Repo.insert()
end
- def publish(activity, priority \\ 1) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+ def publish(%{id: "pleroma:fakeid"} = activity) do
+ PublisherWorker.perform_publish(activity)
end
- def verify_websub(websub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
- end
-
- def request_subscription(sub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
- end
-
- def refresh_subscriptions do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
- end
-
- # Job Worker Callbacks
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
-
- spawn(fn ->
- # 6 hours
- Process.sleep(1000 * 60 * 60 * 6)
- refresh_subscriptions()
- end)
- end
-
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
-
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
- end
- end
-
- def perform(:publish, activity) do
- Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
-
- with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
- {:ok, actor} <- User.ensure_keys_present(actor) do
- Publisher.publish(actor, activity)
- end
- end
-
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
- def perform(:incoming_doc, doc) do
- Logger.info("Got document, trying to parse")
- OStatus.handle_incoming(doc)
+ def publish(activity) do
+ %{"op" => "publish", "activity_id" => activity.id}
+ |> PublisherWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def perform(:incoming_ap_doc, params) do
- Logger.info("Handling incoming AP activity")
-
- params = Utils.normalize_params(params)
-
- # NOTE: we use the actor ID to do the containment, this is fine because an
- # actor shouldn't be acting on objects outside their own AP server.
- with {:ok, _user} <- ap_enabled_actor(params["actor"]),
- nil <- Activity.normalize(params["id"]),
- :ok <- Containment.contain_origin_from_id(params["actor"], params),
- {:ok, activity} <- Transmogrifier.handle_incoming(params) do
- {:ok, activity}
- else
- %Activity{} ->
- Logger.info("Already had #{params["id"]}")
- :error
-
- _e ->
- # Just drop those for now
- Logger.info("Unhandled activity")
- Logger.info(Jason.encode!(params, pretty: true))
- :error
- end
+ def verify_websub(websub) do
+ %{"op" => "verify_websub", "websub_id" => websub.id}
+ |> SubscriberWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def perform(
- :publish_single_websub,
- %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
- ) do
- case Websub.publish_one(params) do
- {:ok, _} ->
- :ok
-
- {:error, _} ->
- RetryQueue.enqueue(params, Websub)
- end
+ def request_subscription(websub) do
+ %{"op" => "request_subscription", "websub_id" => websub.id}
+ |> SubscriberWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def perform(type, _) do
- Logger.debug(fn -> "Unknown task: #{type}" end)
- {:error, "Don't know what to do with this"}
+ def refresh_subscriptions(worker_args \\ []) do
+ %{"op" => "refresh_subscriptions"}
+ |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def ap_enabled_actor(id) do
- user = User.get_cached_by_ap_id(id)
-
- if User.ap_enabled?(user) do
- {:ok, user}
+ defp worker_args(queue) do
+ if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
+ [max_attempts: max_attempts]
else
- ActivityPub.make_user_from_ap_id(id)
+ []
end
end
end
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index 70f870244..05d2be615 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Web.Federator.RetryQueue
+ alias Pleroma.Workers.Publisher, as: PublisherWorker
require Logger
@@ -30,23 +30,17 @@ defmodule Pleroma.Web.Federator.Publisher do
Enqueue publishing a single activity.
"""
@spec enqueue_one(module(), Map.t()) :: :ok
- def enqueue_one(module, %{} = params),
- do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
- @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
- def perform(:publish_one, module, params) do
- case apply(module, :publish_one, [params]) do
- {:ok, _} ->
- :ok
-
- {:error, _e} ->
- RetryQueue.enqueue(params, module)
- end
- end
+ def enqueue_one(module, %{} = params) do
+ worker_args =
+ if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
+ [max_attempts: max_attempts]
+ else
+ []
+ end
- def perform(type, _, _) do
- Logger.debug("Unknown task: #{type}")
- {:error, "Don't know what to do with this"}
+ %{"op" => "publish_one", "module" => to_string(module), "params" => params}
+ |> PublisherWorker.new(worker_args)
+ |> Pleroma.Repo.insert()
end
@doc """
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
deleted file mode 100644
index 3db948c2e..000000000
--- a/lib/pleroma/web/federator/retry_queue.ex
+++ /dev/null
@@ -1,239 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
- use GenServer
-
- require Logger
-
- def init(args) do
- queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
- {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
- end
-
- def start_link do
- enabled =
- if Pleroma.Config.get(:env) == :test,
- do: true,
- else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
- if enabled do
- Logger.info("Starting retry queue")
-
- linkres =
- GenServer.start_link(
- __MODULE__,
- %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
- name: __MODULE__
- )
-
- maybe_kickoff_timer()
- linkres
- else
- Logger.info("Retry queue disabled")
- :ignore
- end
- end
-
- def enqueue(data, transport, retries \\ 0) do
- GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
- end
-
- def get_stats do
- GenServer.call(__MODULE__, :get_stats)
- end
-
- def reset_stats do
- GenServer.call(__MODULE__, :reset_stats)
- end
-
- def get_retry_params(retries) do
- if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
- {:drop, "Max retries reached"}
- else
- {:retry, growth_function(retries)}
- end
- end
-
- def get_retry_timer_interval do
- Pleroma.Config.get([:retry_queue, :interval], 1000)
- end
-
- defp ets_count_expires(table, current_time) do
- :ets.select_count(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [true]
- }
- ]
- )
- end
-
- defp ets_pop_n_expired(table, current_time, desired) do
- {popped, _continuation} =
- :ets.select(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [:"$_"]
- }
- ],
- desired
- )
-
- popped
- |> Enum.each(fn e ->
- :ets.delete_object(table, e)
- end)
-
- popped
- end
-
- def maybe_start_job(running_jobs, queue_table) do
- # we don't want to hit the ets or the DateTime more times than we have to
- # could optimize slightly further by not using the count, and instead grabbing
- # up to N objects early...
- current_time = DateTime.to_unix(DateTime.utc_now())
- n_running_jobs = :sets.size(running_jobs)
-
- if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
- n_ready_jobs = ets_count_expires(queue_table, current_time)
-
- if n_ready_jobs > 0 do
- # figure out how many we could start
- available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
- start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- else
- running_jobs
- end
- else
- running_jobs
- end
- end
-
- defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
- running_jobs
- end
-
- defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- when available_job_slots > 0 do
- candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
- candidates
- |> List.foldl(running_jobs, fn {_, e}, rj ->
- {:ok, pid} = Task.start(fn -> worker(e) end)
- mref = Process.monitor(pid)
- :sets.add_element(mref, rj)
- end)
- end
-
- def worker({:send, data, transport, retries}) do
- case transport.publish_one(data) do
- {:ok, _} ->
- GenServer.cast(__MODULE__, :inc_delivered)
- :delivered
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- :retry
- end
- end
-
- def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
- end
-
- def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count},
- %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(:reset_stats, state) do
- {:noreply, %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(
- {:maybe_enqueue, data, transport, retries},
- %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- case get_retry_params(retries) do
- {:retry, timeout} ->
- :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
-
- {:drop, message} ->
- Logger.debug(message)
- {:noreply, %{state | dropped: drop_count + 1}}
- end
- end
-
- def handle_cast(:kickoff_timer, state) do
- retry_interval = get_retry_timer_interval()
- Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
- {:noreply, state}
- end
-
- def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
- {:noreply, %{state | delivered: delivery_count + 1}}
- end
-
- def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
- {:noreply, %{state | dropped: drop_count + 1}}
- end
-
- def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
- case transport.publish_one(data) do
- {:ok, _} ->
- {:noreply, %{state | delivered: delivery_count + 1}}
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- {:noreply, state}
- end
- end
-
- def handle_info(
- :retry_timer_run,
- %{queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- maybe_kickoff_timer()
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- %{running_jobs: running_jobs, queue_table: queue_table} = state
- running_jobs = :sets.del_element(ref, running_jobs)
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info(unknown, state) do
- Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
- {:noreply, state}
- end
-
- if Pleroma.Config.get(:env) == :test do
- defp growth_function(_retries) do
- _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
- DateTime.to_unix(DateTime.utc_now()) - 1
- end
- else
- defp growth_function(retries) do
- round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
- DateTime.to_unix(DateTime.utc_now())
- end
- end
-
- defp maybe_kickoff_timer do
- GenServer.cast(__MODULE__, :kickoff_timer)
- end
-end
diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex
index 9b01ebcc6..bbaa293fd 100644
--- a/lib/pleroma/web/salmon/salmon.ex
+++ b/lib/pleroma/web/salmon/salmon.ex
@@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
end
end
+ def publish_one(%{recipient_id: recipient_id} = params) do
+ recipient = User.get_by_id(recipient_id)
+
+ params
+ |> Map.delete(:recipient_id)
+ |> Map.put(:recipient, recipient)
+ |> publish_one()
+ end
+
def publish_one(_), do: :noop
@supported_activities [
@@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Publisher.enqueue_one(__MODULE__, %{
- recipient: remote_user,
+ recipient_id: remote_user.id,
feed: feed,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
})
diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex
new file mode 100644
index 000000000..67871977a
--- /dev/null
+++ b/lib/pleroma/workers/publisher.ex
@@ -0,0 +1,35 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Publisher do
+ alias Pleroma.Activity
+ alias Pleroma.User
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_outgoing",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "publish", "activity_id" => activity_id}) do
+ with %Activity{} = activity <- Activity.get_by_id(activity_id) do
+ perform_publish(activity)
+ else
+ _ -> raise "Non-existing activity: #{activity_id}"
+ end
+ end
+
+ def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
+ module_name
+ |> String.to_atom()
+ |> apply(:publish_one, [params])
+ end
+
+ def perform_publish(%Activity{} = activity) do
+ with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
+ {:ok, actor} <- User.ensure_keys_present(actor) do
+ Pleroma.Web.Federator.Publisher.publish(actor, activity)
+ end
+ end
+end
diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex
new file mode 100644
index 000000000..43558b4e6
--- /dev/null
+++ b/lib/pleroma/workers/receiver.ex
@@ -0,0 +1,61 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Receiver do
+ alias Pleroma.Activity
+ alias Pleroma.Object.Containment
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Transmogrifier
+ alias Pleroma.Web.ActivityPub.Utils
+ alias Pleroma.Web.OStatus
+
+ require Logger
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_incoming",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "incoming_doc", "body" => doc}) do
+ Logger.info("Got incoming document, trying to parse")
+ OStatus.handle_incoming(doc)
+ end
+
+ def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
+ Logger.info("Handling incoming AP activity")
+
+ params = Utils.normalize_params(params)
+
+ # NOTE: we use the actor ID to do the containment, this is fine because an
+ # actor shouldn't be acting on objects outside their own AP server.
+ with {:ok, _user} <- ap_enabled_actor(params["actor"]),
+ nil <- Activity.normalize(params["id"]),
+ :ok <- Containment.contain_origin_from_id(params["actor"], params),
+ {:ok, activity} <- Transmogrifier.handle_incoming(params) do
+ {:ok, activity}
+ else
+ %Activity{} ->
+ Logger.info("Already had #{params["id"]}")
+ :error
+
+ _e ->
+ # Just drop those for now
+ Logger.info("Unhandled activity")
+ Logger.info(Jason.encode!(params, pretty: true))
+ :error
+ end
+ end
+
+ defp ap_enabled_actor(id) do
+ user = User.get_cached_by_ap_id(id)
+
+ if User.ap_enabled?(user) do
+ {:ok, user}
+ else
+ ActivityPub.make_user_from_ap_id(id)
+ end
+ end
+end
diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex
new file mode 100644
index 000000000..a8c01bb10
--- /dev/null
+++ b/lib/pleroma/workers/subscriber.ex
@@ -0,0 +1,44 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Subscriber do
+ alias Pleroma.Repo
+ alias Pleroma.Web.Websub
+ alias Pleroma.Web.Websub.WebsubClientSubscription
+
+ require Logger
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_outgoing",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "refresh_subscriptions"}) do
+ Websub.refresh_subscriptions()
+ # Schedule the next run in 6 hours
+ Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6)
+ end
+
+ def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
+ websub = Repo.get(WebsubClientSubscription, websub_id)
+ Logger.debug("Refreshing #{websub.topic}")
+
+ with {:ok, websub} <- Websub.request_subscription(websub) do
+ Logger.debug("Successfully refreshed #{websub.topic}")
+ else
+ _e -> Logger.debug("Couldn't refresh #{websub.topic}")
+ end
+ end
+
+ def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
+ websub = Repo.get(WebsubClientSubscription, websub_id)
+
+ Logger.debug(fn ->
+ "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+ end)
+
+ Websub.verify(websub)
+ end
+end