diff options
25 files changed, 360 insertions, 507 deletions
diff --git a/config/config.exs b/config/config.exs index d2325edbc..258e4d274 100644 --- a/config/config.exs +++ b/config/config.exs @@ -440,13 +440,7 @@ config :pleroma, Pleroma.User, "web" ] -config :pleroma, Pleroma.Web.Federator.RetryQueue, - enabled: false, - max_jobs: 20, - initial_timeout: 30, - max_retries: 5 - -config :pleroma_job_queue, :queues, +job_queues = [ federator_incoming: 50, federator_outgoing: 50, web_push: 50, @@ -454,6 +448,22 @@ config :pleroma_job_queue, :queues, transmogrifier: 20, scheduled_activities: 10, background: 5 +] + +config :pleroma_job_queue, :queues, job_queues + +config :pleroma, Oban, + repo: Pleroma.Repo, + verbose: false, + prune: {:maxage, 60 * 60 * 24 * 7}, + queues: job_queues + +config :pleroma, :workers, + retries: [ + compile_time_default: 1, + federator_incoming: 5, + federator_outgoing: 5 + ] config :pleroma, :fetch_initial_posts, enabled: false, diff --git a/config/test.exs b/config/test.exs index 6f75f39b5..f897b5d48 100644 --- a/config/test.exs +++ b/config/test.exs @@ -63,6 +63,10 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock config :pleroma_job_queue, disabled: true +config :pleroma, Oban, + queues: false, + prune: :disabled + config :pleroma, Pleroma.ScheduledActivity, daily_user_limit: 2, total_user_limit: 3, diff --git a/docs/config.md b/docs/config.md index 703ef67dd..8f8bd22f4 100644 --- a/docs/config.md +++ b/docs/config.md @@ -412,13 +412,6 @@ config :pleroma_job_queue, :queues, This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. -## Pleroma.Web.Federator.RetryQueue - -* `enabled`: If set to `true`, failed federation jobs will be retried -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time. -* `initial_timeout`: The initial timeout in seconds -* `max_retries`: The maximum number of times a federation job is retried - ## Pleroma.Web.Metadata * `providers`: a list of metadata providers to enable. Providers available: * Pleroma.Web.Metadata.Providers.OpenGraph diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 00b06f723..5550a4902 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -120,8 +120,8 @@ defmodule Pleroma.Application do hackney_pool_children() ++ [ %{ - id: Pleroma.Web.Federator.RetryQueue, - start: {Pleroma.Web.Federator.RetryQueue, :start_link, []} + id: Oban, + start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]} }, %{ id: Pleroma.Web.OAuth.Token.CleanWorker, diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 46edab0bd..29f3221d1 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -85,6 +85,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do end end + def publish_one(%{actor_id: actor_id} = params) do + actor = User.get_by_id(actor_id) + + params + |> Map.delete(:actor_id) + |> Map.put(:actor, actor) + |> publish_one() + end + defp should_federate?(inbox, public) do if public do true @@ -160,7 +169,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Publishes an activity with BCC to all relevant peers. """ - def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do + def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) + when is_list(bcc) and bcc != [] do public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -187,7 +197,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since }) @@ -222,7 +232,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since } diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index fc5305c58..4f68acc78 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do """ def maybe_federate(%Activity{local: true} = activity) do if Pleroma.Config.get!([:instance, :federating]) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end - - Pleroma.Web.Federator.publish(activity, priority) + Pleroma.Web.Federator.publish(activity) end :ok diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..bb9eadfee 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,23 +3,15 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do - alias Pleroma.Activity - alias Pleroma.Object.Containment - alias Pleroma.User - alias Pleroma.Web.ActivityPub.ActivityPub - alias Pleroma.Web.ActivityPub.Transmogrifier - alias Pleroma.Web.ActivityPub.Utils - alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.Federator.RetryQueue - alias Pleroma.Web.OStatus - alias Pleroma.Web.Websub + alias Pleroma.Workers.Publisher, as: PublisherWorker + alias Pleroma.Workers.Receiver, as: ReceiverWorker + alias Pleroma.Workers.Subscriber, as: SubscriberWorker require Logger def init do # 1 minute - Process.sleep(1000 * 60) - refresh_subscriptions() + refresh_subscriptions(schedule_in: 60) end @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -37,124 +29,50 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + %{"op" => "incoming_doc", "body" => doc} + |> ReceiverWorker.new(worker_args(:federator_incoming)) + |> Pleroma.Repo.insert() end def incoming_ap_doc(params) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + %{"op" => "incoming_ap_doc", "params" => params} + |> ReceiverWorker.new(worker_args(:federator_incoming)) + |> Pleroma.Repo.insert() end - def publish(activity, priority \\ 1) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + def publish(%{id: "pleroma:fakeid"} = activity) do + PublisherWorker.perform_publish(activity) end - def verify_websub(websub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) - end - - def request_subscription(sub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) - end - - def refresh_subscriptions do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) - end - - # Job Worker Callbacks - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) - end - - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") - - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end - end - - def perform(:publish, activity) do - Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) - - with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), - {:ok, actor} <- User.ensure_keys_present(actor) do - Publisher.publish(actor, activity) - end - end - - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - - def perform(:incoming_doc, doc) do - Logger.info("Got document, trying to parse") - OStatus.handle_incoming(doc) + def publish(activity) do + %{"op" => "publish", "activity_id" => activity.id} + |> PublisherWorker.new(worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - def perform(:incoming_ap_doc, params) do - Logger.info("Handling incoming AP activity") - - params = Utils.normalize_params(params) - - # NOTE: we use the actor ID to do the containment, this is fine because an - # actor shouldn't be acting on objects outside their own AP server. - with {:ok, _user} <- ap_enabled_actor(params["actor"]), - nil <- Activity.normalize(params["id"]), - :ok <- Containment.contain_origin_from_id(params["actor"], params), - {:ok, activity} <- Transmogrifier.handle_incoming(params) do - {:ok, activity} - else - %Activity{} -> - Logger.info("Already had #{params["id"]}") - :error - - _e -> - # Just drop those for now - Logger.info("Unhandled activity") - Logger.info(Jason.encode!(params, pretty: true)) - :error - end + def verify_websub(websub) do + %{"op" => "verify_websub", "websub_id" => websub.id} + |> SubscriberWorker.new(worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - def perform( - :publish_single_websub, - %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params - ) do - case Websub.publish_one(params) do - {:ok, _} -> - :ok - - {:error, _} -> - RetryQueue.enqueue(params, Websub) - end + def request_subscription(websub) do + %{"op" => "request_subscription", "websub_id" => websub.id} + |> SubscriberWorker.new(worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - def perform(type, _) do - Logger.debug(fn -> "Unknown task: #{type}" end) - {:error, "Don't know what to do with this"} + def refresh_subscriptions(worker_args \\ []) do + %{"op" => "refresh_subscriptions"} + |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - def ap_enabled_actor(id) do - user = User.get_cached_by_ap_id(id) - - if User.ap_enabled?(user) do - {:ok, user} + defp worker_args(queue) do + if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do + [max_attempts: max_attempts] else - ActivityPub.make_user_from_ap_id(id) + [] end end end diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 70f870244..05d2be615 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do alias Pleroma.Activity alias Pleroma.Config alias Pleroma.User - alias Pleroma.Web.Federator.RetryQueue + alias Pleroma.Workers.Publisher, as: PublisherWorker require Logger @@ -30,23 +30,17 @@ defmodule Pleroma.Web.Federator.Publisher do Enqueue publishing a single activity. """ @spec enqueue_one(module(), Map.t()) :: :ok - def enqueue_one(module, %{} = params), - do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) - - @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} - def perform(:publish_one, module, params) do - case apply(module, :publish_one, [params]) do - {:ok, _} -> - :ok - - {:error, _e} -> - RetryQueue.enqueue(params, module) - end - end + def enqueue_one(module, %{} = params) do + worker_args = + if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do + [max_attempts: max_attempts] + else + [] + end - def perform(type, _, _) do - Logger.debug("Unknown task: #{type}") - {:error, "Don't know what to do with this"} + %{"op" => "publish_one", "module" => to_string(module), "params" => params} + |> PublisherWorker.new(worker_args) + |> Pleroma.Repo.insert() end @doc """ diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex deleted file mode 100644 index 3db948c2e..000000000 --- a/lib/pleroma/web/federator/retry_queue.ex +++ /dev/null @@ -1,239 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Federator.RetryQueue do - use GenServer - - require Logger - - def init(args) do - queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) - - {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} - end - - def start_link do - enabled = - if Pleroma.Config.get(:env) == :test, - do: true, - else: Pleroma.Config.get([__MODULE__, :enabled], false) - - if enabled do - Logger.info("Starting retry queue") - - linkres = - GenServer.start_link( - __MODULE__, - %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, - name: __MODULE__ - ) - - maybe_kickoff_timer() - linkres - else - Logger.info("Retry queue disabled") - :ignore - end - end - - def enqueue(data, transport, retries \\ 0) do - GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) - end - - def get_stats do - GenServer.call(__MODULE__, :get_stats) - end - - def reset_stats do - GenServer.call(__MODULE__, :reset_stats) - end - - def get_retry_params(retries) do - if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do - {:drop, "Max retries reached"} - else - {:retry, growth_function(retries)} - end - end - - def get_retry_timer_interval do - Pleroma.Config.get([:retry_queue, :interval], 1000) - end - - defp ets_count_expires(table, current_time) do - :ets.select_count( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [true] - } - ] - ) - end - - defp ets_pop_n_expired(table, current_time, desired) do - {popped, _continuation} = - :ets.select( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [:"$_"] - } - ], - desired - ) - - popped - |> Enum.each(fn e -> - :ets.delete_object(table, e) - end) - - popped - end - - def maybe_start_job(running_jobs, queue_table) do - # we don't want to hit the ets or the DateTime more times than we have to - # could optimize slightly further by not using the count, and instead grabbing - # up to N objects early... - current_time = DateTime.to_unix(DateTime.utc_now()) - n_running_jobs = :sets.size(running_jobs) - - if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do - n_ready_jobs = ets_count_expires(queue_table, current_time) - - if n_ready_jobs > 0 do - # figure out how many we could start - available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs - start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - else - running_jobs - end - else - running_jobs - end - end - - defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do - running_jobs - end - - defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - when available_job_slots > 0 do - candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) - - candidates - |> List.foldl(running_jobs, fn {_, e}, rj -> - {:ok, pid} = Task.start(fn -> worker(e) end) - mref = Process.monitor(pid) - :sets.add_element(mref, rj) - end) - end - - def worker({:send, data, transport, retries}) do - case transport.publish_one(data) do - {:ok, _} -> - GenServer.cast(__MODULE__, :inc_delivered) - :delivered - - {:error, _reason} -> - enqueue(data, transport, retries) - :retry - end - end - - def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, state} - end - - def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, - %{state | delivered: 0, dropped: 0}} - end - - def handle_cast(:reset_stats, state) do - {:noreply, %{state | delivered: 0, dropped: 0}} - end - - def handle_cast( - {:maybe_enqueue, data, transport, retries}, - %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state - ) do - case get_retry_params(retries) do - {:retry, timeout} -> - :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - - {:drop, message} -> - Logger.debug(message) - {:noreply, %{state | dropped: drop_count + 1}} - end - end - - def handle_cast(:kickoff_timer, state) do - retry_interval = get_retry_timer_interval() - Process.send_after(__MODULE__, :retry_timer_run, retry_interval) - {:noreply, state} - end - - def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do - {:noreply, %{state | delivered: delivery_count + 1}} - end - - def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do - {:noreply, %{state | dropped: drop_count + 1}} - end - - def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do - case transport.publish_one(data) do - {:ok, _} -> - {:noreply, %{state | delivered: delivery_count + 1}} - - {:error, _reason} -> - enqueue(data, transport, retries) - {:noreply, state} - end - end - - def handle_info( - :retry_timer_run, - %{queue_table: queue_table, running_jobs: running_jobs} = state - ) do - maybe_kickoff_timer() - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{running_jobs: running_jobs, queue_table: queue_table} = state - running_jobs = :sets.del_element(ref, running_jobs) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info(unknown, state) do - Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") - {:noreply, state} - end - - if Pleroma.Config.get(:env) == :test do - defp growth_function(_retries) do - _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) - DateTime.to_unix(DateTime.utc_now()) - 1 - end - else - defp growth_function(retries) do - round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + - DateTime.to_unix(DateTime.utc_now()) - end - end - - defp maybe_kickoff_timer do - GenServer.cast(__MODULE__, :kickoff_timer) - end -end diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 9b01ebcc6..bbaa293fd 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do end end + def publish_one(%{recipient_id: recipient_id} = params) do + recipient = User.get_by_id(recipient_id) + + params + |> Map.delete(:recipient_id) + |> Map.put(:recipient, recipient) + |> publish_one() + end + def publish_one(_), do: :noop @supported_activities [ @@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Publisher.enqueue_one(__MODULE__, %{ - recipient: remote_user, + recipient_id: remote_user.id, feed: feed, unreachable_since: reachable_urls_metadata[remote_user.info.salmon] }) diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex new file mode 100644 index 000000000..67871977a --- /dev/null +++ b/lib/pleroma/workers/publisher.ex @@ -0,0 +1,35 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Publisher do + alias Pleroma.Activity + alias Pleroma.User + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "federator_outgoing", + max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + + @impl Oban.Worker + def perform(%{"op" => "publish", "activity_id" => activity_id}) do + with %Activity{} = activity <- Activity.get_by_id(activity_id) do + perform_publish(activity) + else + _ -> raise "Non-existing activity: #{activity_id}" + end + end + + def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do + module_name + |> String.to_atom() + |> apply(:publish_one, [params]) + end + + def perform_publish(%Activity{} = activity) do + with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]), + {:ok, actor} <- User.ensure_keys_present(actor) do + Pleroma.Web.Federator.Publisher.publish(actor, activity) + end + end +end diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex new file mode 100644 index 000000000..43558b4e6 --- /dev/null +++ b/lib/pleroma/workers/receiver.ex @@ -0,0 +1,61 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Receiver do + alias Pleroma.Activity + alias Pleroma.Object.Containment + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.ActivityPub.Utils + alias Pleroma.Web.OStatus + + require Logger + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "federator_incoming", + max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + + @impl Oban.Worker + def perform(%{"op" => "incoming_doc", "body" => doc}) do + Logger.info("Got incoming document, trying to parse") + OStatus.handle_incoming(doc) + end + + def perform(%{"op" => "incoming_ap_doc", "params" => params}) do + Logger.info("Handling incoming AP activity") + + params = Utils.normalize_params(params) + + # NOTE: we use the actor ID to do the containment, this is fine because an + # actor shouldn't be acting on objects outside their own AP server. + with {:ok, _user} <- ap_enabled_actor(params["actor"]), + nil <- Activity.normalize(params["id"]), + :ok <- Containment.contain_origin_from_id(params["actor"], params), + {:ok, activity} <- Transmogrifier.handle_incoming(params) do + {:ok, activity} + else + %Activity{} -> + Logger.info("Already had #{params["id"]}") + :error + + _e -> + # Just drop those for now + Logger.info("Unhandled activity") + Logger.info(Jason.encode!(params, pretty: true)) + :error + end + end + + defp ap_enabled_actor(id) do + user = User.get_cached_by_ap_id(id) + + if User.ap_enabled?(user) do + {:ok, user} + else + ActivityPub.make_user_from_ap_id(id) + end + end +end diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex new file mode 100644 index 000000000..a8c01bb10 --- /dev/null +++ b/lib/pleroma/workers/subscriber.ex @@ -0,0 +1,44 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Subscriber do + alias Pleroma.Repo + alias Pleroma.Web.Websub + alias Pleroma.Web.Websub.WebsubClientSubscription + + require Logger + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "federator_outgoing", + max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default]) + + @impl Oban.Worker + def perform(%{"op" => "refresh_subscriptions"}) do + Websub.refresh_subscriptions() + # Schedule the next run in 6 hours + Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6) + end + + def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do + websub = Repo.get(WebsubClientSubscription, websub_id) + Logger.debug("Refreshing #{websub.topic}") + + with {:ok, websub} <- Websub.request_subscription(websub) do + Logger.debug("Successfully refreshed #{websub.topic}") + else + _e -> Logger.debug("Couldn't refresh #{websub.topic}") + end + end + + def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do + websub = Repo.get(WebsubClientSubscription, websub_id) + + Logger.debug(fn -> + "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" + end) + + Websub.verify(websub) + end +end @@ -101,6 +101,7 @@ defmodule Pleroma.Mixfile do {:phoenix_ecto, "~> 4.0"}, {:ecto_sql, "~> 3.1"}, {:postgrex, ">= 0.13.5"}, + {:oban, "~> 0.6"}, {:gettext, "~> 0.15"}, {:comeonin, "~> 4.1.1"}, {:pbkdf2_elixir, "~> 0.12.3"}, @@ -57,6 +57,7 @@ "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"}, "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, + "oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"}, "pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"}, "phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs new file mode 100644 index 000000000..2f201bd05 --- /dev/null +++ b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs @@ -0,0 +1,6 @@ +defmodule Pleroma.Repo.Migrations.AddObanJobsTable do + use Ecto.Migration + + defdelegate up, to: Oban.Migrations + defdelegate down, to: Oban.Migrations +end diff --git a/test/activity_test.exs b/test/activity_test.exs index b27f6fd36..b9c12adb2 100644 --- a/test/activity_test.exs +++ b/test/activity_test.exs @@ -6,6 +6,7 @@ defmodule Pleroma.ActivityTest do use Pleroma.DataCase alias Pleroma.Activity alias Pleroma.Bookmark + alias Pleroma.ObanHelpers alias Pleroma.Object alias Pleroma.ThreadMute import Pleroma.Factory @@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do } {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"}) - {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params) + {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params) + {:ok, remote_activity} = ObanHelpers.perform(job) %{local_activity: local_activity, remote_activity: remote_activity, user: user} end diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex new file mode 100644 index 000000000..54b5a9566 --- /dev/null +++ b/test/support/oban_helpers.ex @@ -0,0 +1,36 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.ObanHelpers do + @moduledoc """ + Oban test helpers. + """ + + alias Pleroma.Repo + + def perform(%Oban.Job{} = job) do + res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job]) + Repo.delete(job) + res + end + + def perform(jobs) when is_list(jobs) do + for job <- jobs, do: perform(job) + end + + def member?(%{} = job_args, jobs) when is_list(jobs) do + Enum.any?(jobs, fn job -> + member?(job_args, job.args) + end) + end + + def member?(%{} = test_attrs, %{} = attrs) do + Enum.all?( + test_attrs, + fn {k, _v} -> member?(test_attrs[k], attrs[k]) end + ) + end + + def member?(x, y), do: x == y +end diff --git a/test/user_test.exs b/test/user_test.exs index 8440d456d..8617752d7 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -5,6 +5,7 @@ defmodule Pleroma.UserTest do alias Pleroma.Activity alias Pleroma.Builders.UserBuilder + alias Pleroma.ObanHelpers alias Pleroma.Object alias Pleroma.Repo alias Pleroma.User @@ -12,9 +13,9 @@ defmodule Pleroma.UserTest do alias Pleroma.Web.CommonAPI use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo import Pleroma.Factory - import Mock setup_all do Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -1034,11 +1035,7 @@ defmodule Pleroma.UserTest do refute Activity.get_by_id(repeat.id) end - test_with_mock "it sends out User Delete activity", - %{user: user}, - Pleroma.Web.ActivityPub.Publisher, - [:passthrough], - [] do + test "it sends out User Delete activity", %{user: user} do config_path = [:instance, :federating] initial_setting = Pleroma.Config.get(config_path) Pleroma.Config.put(config_path, true) @@ -1048,10 +1045,15 @@ defmodule Pleroma.UserTest do {:ok, _user} = User.delete(user) - assert called( - Pleroma.Web.ActivityPub.Publisher.publish_one(%{ - inbox: "http://mastodon.example.org/inbox" - }) + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{ + "inbox" => "http://mastodon.example.org/inbox", + "id" => "pleroma:fakeid" + } + }, + all_enqueued(worker: Pleroma.Workers.Publisher) ) Pleroma.Config.put(config_path, initial_setting) diff --git a/test/web/activity_pub/activity_pub_controller_test.exs b/test/web/activity_pub/activity_pub_controller_test.exs index 251055ee1..d7f0a8264 100644 --- a/test/web/activity_pub/activity_pub_controller_test.exs +++ b/test/web/activity_pub/activity_pub_controller_test.exs @@ -4,15 +4,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do use Pleroma.Web.ConnCase + use Oban.Testing, repo: Pleroma.Repo + import Pleroma.Factory alias Pleroma.Activity alias Pleroma.Instances + alias Pleroma.ObanHelpers alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.ActivityPub.ObjectView alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.CommonAPI + alias Pleroma.Workers.Receiver, as: ReceiverWorker setup_all do Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -279,7 +283,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/inbox", data) assert "ok" == json_response(conn, 200) - :timer.sleep(500) + + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) assert Activity.get_by_ap_id(data["id"]) end @@ -321,7 +326,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{user.nickname}/inbox", data) assert "ok" == json_response(conn, 200) - :timer.sleep(500) + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) assert Activity.get_by_ap_id(data["id"]) end @@ -350,7 +355,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{recipient.nickname}/inbox", data) assert "ok" == json_response(conn, 200) - :timer.sleep(500) + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) assert Activity.get_by_ap_id(data["id"]) end @@ -429,6 +434,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{recipient.nickname}/inbox", data) |> json_response(200) + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) + activity = Activity.get_by_ap_id(data["id"]) assert activity.id @@ -504,6 +511,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{user.nickname}/outbox", data) result = json_response(conn, 201) + assert Activity.get_by_ap_id(result["id"]) end diff --git a/test/web/activity_pub/publisher_test.exs b/test/web/activity_pub/publisher_test.exs index 36a39c84c..26d019878 100644 --- a/test/web/activity_pub/publisher_test.exs +++ b/test/web/activity_pub/publisher_test.exs @@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do assert called( Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ inbox: "https://domain.com/users/nick1/inbox", - actor: actor, + actor_id: actor.id, id: note_activity.data["id"] }) ) diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index 73cfaa8f1..d3a28d50e 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -4,9 +4,14 @@ defmodule Pleroma.Web.FederatorTest do alias Pleroma.Instances + alias Pleroma.ObanHelpers alias Pleroma.Web.CommonAPI alias Pleroma.Web.Federator + alias Pleroma.Workers.Publisher, as: PublisherWorker + use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo + import Pleroma.Factory import Mock @@ -22,15 +27,6 @@ defmodule Pleroma.Web.FederatorTest do :ok end - describe "Publisher.perform" do - test "call `perform` with unknown task" do - assert { - :error, - "Don't know what to do with this" - } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok) - end - end - describe "Publish an activity" do setup do user = insert(:user) @@ -51,6 +47,7 @@ defmodule Pleroma.Web.FederatorTest do } do with_mocks([relay_mock]) do Federator.publish(activity) + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) end assert_received :relay_publish @@ -64,6 +61,7 @@ defmodule Pleroma.Web.FederatorTest do with_mocks([relay_mock]) do Federator.publish(activity) + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) end refute_received :relay_publish @@ -73,10 +71,7 @@ defmodule Pleroma.Web.FederatorTest do end describe "Targets reachability filtering in `publish`" do - test_with_mock "it federates only to reachable instances via AP", - Pleroma.Web.ActivityPub.Publisher, - [:passthrough], - [] do + test "it federates only to reachable instances via AP" do user = insert(:user) {inbox1, inbox2} = @@ -104,20 +99,20 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called( - Pleroma.Web.ActivityPub.Publisher.publish_one(%{ - inbox: inbox1, - unreachable_since: dt - }) - ) + expected_dt = NaiveDateTime.to_iso8601(dt) - refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2})) + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) + + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt} + }, + all_enqueued(worker: PublisherWorker) + ) end - test_with_mock "it federates only to reachable instances via Websub", - Pleroma.Web.Websub, - [:passthrough], - [] do + test "it federates only to reachable instances via Websub" do user = insert(:user) websub_topic = Pleroma.Web.OStatus.feed_path(user) @@ -142,23 +137,27 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) - assert called( - Pleroma.Web.Websub.publish_one(%{ - callback: sub2.callback, - unreachable_since: dt - }) - ) + expected_callback = sub2.callback + expected_dt = NaiveDateTime.to_iso8601(dt) + + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) - refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback})) + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{ + "callback" => expected_callback, + "unreachable_since" => expected_dt + } + }, + all_enqueued(worker: PublisherWorker) + ) end - test_with_mock "it federates only to reachable instances via Salmon", - Pleroma.Web.Salmon, - [:passthrough], - [] do + test "it federates only to reachable instances via Salmon" do user = insert(:user) - remote_user1 = + _remote_user1 = insert(:user, %{ local: false, nickname: "nick1@domain.com", @@ -174,6 +173,8 @@ defmodule Pleroma.Web.FederatorTest do info: %{salmon: "https://domain2.com/salmon"} }) + remote_user2_id = remote_user2.id + dt = NaiveDateTime.utc_now() Instances.set_unreachable(remote_user2.ap_id, dt) @@ -182,14 +183,20 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called( - Pleroma.Web.Salmon.publish_one(%{ - recipient: remote_user2, - unreachable_since: dt - }) - ) + expected_dt = NaiveDateTime.to_iso8601(dt) + + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) - refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1})) + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{ + "recipient_id" => remote_user2_id, + "unreachable_since" => expected_dt + } + }, + all_enqueued(worker: PublisherWorker) + ) end end @@ -209,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - {:ok, _activity} = Federator.incoming_ap_doc(params) + assert {:ok, job} = Federator.incoming_ap_doc(params) + assert {:ok, _activity} = ObanHelpers.perform(job) end test "rejects incoming AP docs with incorrect origin" do @@ -227,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - :error = Federator.incoming_ap_doc(params) + assert {:ok, job} = Federator.incoming_ap_doc(params) + assert :error = ObanHelpers.perform(job) end test "it does not crash if MRF rejects the post" do diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs deleted file mode 100644 index ecb3ce5d0..000000000 --- a/test/web/retry_queue_test.exs +++ /dev/null @@ -1,48 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule MockActivityPub do - def publish_one({ret, waiter}) do - send(waiter, :complete) - {ret, "success"} - end -end - -defmodule Pleroma.Web.Federator.RetryQueueTest do - use Pleroma.DataCase - alias Pleroma.Web.Federator.RetryQueue - - @small_retry_count 0 - @hopeless_retry_count 10 - - setup do - RetryQueue.reset_stats() - end - - test "RetryQueue responds to stats request" do - assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats() - end - - test "failed posts are retried" do - {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count) - - wait_task = - Task.async(fn -> - receive do - :complete -> :ok - end - end) - - RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count) - Task.await(wait_task) - assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats() - end - - test "posts that have been tried too many times are dropped" do - {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count) - - RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count) - assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats() - end -end diff --git a/test/web/salmon/salmon_test.exs b/test/web/salmon/salmon_test.exs index e86e76fe9..0186f3fef 100644 --- a/test/web/salmon/salmon_test.exs +++ b/test/web/salmon/salmon_test.exs @@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do Salmon.publish(user, activity) - assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user})) + assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id})) end end diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs index 74386d7db..b704a558a 100644 --- a/test/web/websub/websub_test.exs +++ b/test/web/websub/websub_test.exs @@ -4,11 +4,14 @@ defmodule Pleroma.Web.WebsubTest do use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo + alias Pleroma.ObanHelpers alias Pleroma.Web.Router.Helpers alias Pleroma.Web.Websub alias Pleroma.Web.Websub.WebsubClientSubscription alias Pleroma.Web.Websub.WebsubServerSubscription + alias Pleroma.Workers.Subscriber, as: SubscriberWorker import Pleroma.Factory import Tesla.Mock @@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do }) _refresh = Websub.refresh_subscriptions() + ObanHelpers.perform(all_enqueued(worker: SubscriberWorker)) assert still_good == Repo.get(WebsubClientSubscription, still_good.id) refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id) |