diff options
Diffstat (limited to 'lib')
24 files changed, 437 insertions, 146 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index d67e2cdc8..d2523c045 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -108,9 +108,10 @@ defmodule Pleroma.Application do          hackney_pool_children() ++          [            worker(Pleroma.Web.Federator.RetryQueue, []), -          worker(Pleroma.Web.Federator, []),            worker(Pleroma.Stats, []), -          worker(Pleroma.Web.Push, []) +          worker(Pleroma.Web.Push, []), +          worker(Pleroma.Jobs, []), +          worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary)          ] ++          streamer_child() ++          chat_child() ++ diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex new file mode 100644 index 000000000..24b7e5e46 --- /dev/null +++ b/lib/pleroma/jobs.ex @@ -0,0 +1,152 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Jobs do +  @moduledoc """ +  A basic job queue +  """ +  use GenServer + +  require Logger + +  def init(args) do +    {:ok, args} +  end + +  def start_link do +    queues = +      Pleroma.Config.get(Pleroma.Jobs) +      |> Enum.map(fn {name, _} -> create_queue(name) end) +      |> Enum.into(%{}) + +    state = %{ +      queues: queues, +      refs: %{} +    } + +    GenServer.start_link(__MODULE__, state, name: __MODULE__) +  end + +  def create_queue(name) do +    {name, {:sets.new(), []}} +  end + +  @doc """ +  Enqueues a job. + +  Returns `:ok`. + +  ## Arguments + +  - `queue_name` - a queue name(must be specified in the config). +  - `mod` - a worker module (must have `perform` function). +  - `args` - a list of arguments for the `perform` function of the worker module. +  - `priority` - a job priority (`0` by default). + +  ## Examples + +  Enqueue `Module.perform/0` with `priority=1`: + +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, []) +      :ok + +  Enqueue `Module.perform(:job_name)` with `priority=5`: + +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5) +      :ok + +  Enqueue `Module.perform(:another_job, data)` with `priority=1`: + +      iex> data = "foobar" +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data]) +      :ok + +  Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`: + +      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42]) +      :ok + +  """ + +  def enqueue(queue_name, mod, args, priority \\ 1) + +  if Mix.env() == :test do +    def enqueue(_queue_name, mod, args, _priority) do +      apply(mod, :perform, args) +    end +  else +    @spec enqueue(atom(), atom(), [any()], integer()) :: :ok +    def enqueue(queue_name, mod, args, priority) do +      GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) +    end +  end + +  def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do +    {running_jobs, queue} = state[:queues][queue_name] + +    queue = enqueue_sorted(queue, {mod, args}, priority) + +    state = +      state +      |> update_queue(queue_name, {running_jobs, queue}) +      |> maybe_start_job(queue_name, running_jobs, queue) + +    {:noreply, state} +  end + +  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do +    queue_name = state.refs[ref] + +    {running_jobs, queue} = state[:queues][queue_name] + +    running_jobs = :sets.del_element(ref, running_jobs) + +    state = +      state +      |> remove_ref(ref) +      |> update_queue(queue_name, {running_jobs, queue}) +      |> maybe_start_job(queue_name, running_jobs, queue) + +    {:noreply, state} +  end + +  def maybe_start_job(state, queue_name, running_jobs, queue) do +    if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) && +         queue != [] do +      {{mod, args}, queue} = queue_pop(queue) +      {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end) +      mref = Process.monitor(pid) + +      state +      |> add_ref(queue_name, mref) +      |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue}) +    else +      state +    end +  end + +  def enqueue_sorted(queue, element, priority) do +    [%{item: element, priority: priority} | queue] +    |> Enum.sort_by(fn %{priority: priority} -> priority end) +  end + +  def queue_pop([%{item: element} | queue]) do +    {element, queue} +  end + +  defp add_ref(state, queue_name, ref) do +    refs = Map.put(state[:refs], ref, queue_name) +    Map.put(state, :refs, refs) +  end + +  defp remove_ref(state, ref) do +    refs = Map.delete(state[:refs], ref) +    Map.put(state, :refs, refs) +  end + +  defp update_queue(state, queue_name, data) do +    queues = Map.put(state[:queues], queue_name, data) +    Map.put(state, :queues, queues) +  end +end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 3c6a9953d..18bb56667 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -237,6 +237,7 @@ defmodule Pleroma.User do        changeset        |> put_change(:password_hash, hashed)        |> put_change(:ap_id, ap_id) +      |> unique_constraint(:ap_id)        |> put_change(:following, [followers])        |> put_change(:follower_address, followers)      else @@ -619,6 +620,32 @@ defmodule Pleroma.User do      )    end +  def update_follow_request_count(%User{} = user) do +    subquery = +      user +      |> User.get_follow_requests_query() +      |> select([a], %{count: count(a.id)}) + +    User +    |> where(id: ^user.id) +    |> join(:inner, [u], s in subquery(subquery)) +    |> update([u, s], +      set: [ +        info: +          fragment( +            "jsonb_set(?, '{follow_request_count}', ?::varchar::jsonb, true)", +            u.info, +            s.count +          ) +      ] +    ) +    |> Repo.update_all([], returning: true) +    |> case do +      {1, [user]} -> {:ok, user} +      _ -> {:error, user} +    end +  end +    def get_follow_requests(%User{} = user) do      q = get_follow_requests_query(user)      reqs = Repo.all(q) @@ -1174,7 +1201,7 @@ defmodule Pleroma.User do      {:ok, updated_user} =        user        |> change(%{tags: new_tags}) -      |> Repo.update() +      |> update_and_set_cache()      updated_user    end diff --git a/lib/pleroma/user/info.ex b/lib/pleroma/user/info.ex index 9d8779fab..9099d7fbb 100644 --- a/lib/pleroma/user/info.ex +++ b/lib/pleroma/user/info.ex @@ -12,6 +12,7 @@ defmodule Pleroma.User.Info do      field(:source_data, :map, default: %{})      field(:note_count, :integer, default: 0)      field(:follower_count, :integer, default: 0) +    field(:follow_request_count, :integer, default: 0)      field(:locked, :boolean, default: false)      field(:confirmation_pending, :boolean, default: false)      field(:confirmation_token, :string, default: nil) @@ -34,6 +35,7 @@ defmodule Pleroma.User.Info do      field(:hide_followers, :boolean, default: false)      field(:hide_follows, :boolean, default: false)      field(:pinned_activities, {:array, :string}, default: []) +    field(:flavour, :string, default: nil)      # Found in the wild      # ap_id -> Where is this used? @@ -186,6 +188,14 @@ defmodule Pleroma.User.Info do      |> validate_required([:settings])    end +  def mastodon_flavour_update(info, flavour) do +    params = %{flavour: flavour} + +    info +    |> cast(params, [:flavour]) +    |> validate_required([:flavour]) +  end +    def set_source_data(info, source_data) do      params = %{source_data: source_data} diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index ab2872f56..8d3116839 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -172,9 +172,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      # only accept false as false value      local = !(params[:local] == false) -    with data <- %{"to" => to, "type" => "Accept", "actor" => actor, "object" => object}, +    with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},           {:ok, activity} <- insert(data, local), -         :ok <- maybe_federate(activity) do +         :ok <- maybe_federate(activity), +         _ <- User.update_follow_request_count(actor) do        {:ok, activity}      end    end @@ -183,9 +184,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      # only accept false as false value      local = !(params[:local] == false) -    with data <- %{"to" => to, "type" => "Reject", "actor" => actor, "object" => object}, +    with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},           {:ok, activity} <- insert(data, local), -         :ok <- maybe_federate(activity) do +         :ok <- maybe_federate(activity), +         _ <- User.update_follow_request_count(actor) do        {:ok, activity}      end    end @@ -283,7 +285,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    def follow(follower, followed, activity_id \\ nil, local \\ true) do      with data <- make_follow_data(follower, followed, activity_id),           {:ok, activity} <- insert(data, local), -         :ok <- maybe_federate(activity) do +         :ok <- maybe_federate(activity), +         _ <- User.update_follow_request_count(followed) do        {:ok, activity}      end    end @@ -293,7 +296,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do           {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),           unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),           {:ok, activity} <- insert(unfollow_data, local), -         :ok <- maybe_federate(activity) do +         :ok <- maybe_federate(activity), +         _ <- User.update_follow_request_count(followed) do        {:ok, activity}      end    end @@ -753,21 +757,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      public = is_public?(activity) -    reachable_inboxes_metadata = -      (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) -      |> Enum.filter(fn user -> User.ap_enabled?(user) end) -      |> Enum.map(fn %{info: %{source_data: data}} -> -        (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] -      end) -      |> Enum.uniq() -      |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) -      |> Instances.filter_reachable() -      {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)      json = Jason.encode!(data) -    Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} -> -      Federator.enqueue(:publish_single_ap, %{ +    (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) +    |> Enum.filter(fn user -> User.ap_enabled?(user) end) +    |> Enum.map(fn %{info: %{source_data: data}} -> +      (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] +    end) +    |> Enum.uniq() +    |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) +    |> Instances.filter_reachable() +    |> Enum.each(fn {inbox, unreachable_since} -> +      Federator.publish_single_ap(%{          inbox: inbox,          json: json,          actor: actor, @@ -876,7 +878,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do    end    def is_private?(activity) do -    !is_public?(activity) && Enum.any?(activity.data["to"], &String.contains?(&1, "/followers")) +    unless is_public?(activity) do +      follower_address = User.get_cached_by_ap_id(activity.data["actor"]).follower_address +      Enum.any?(activity.data["to"], &(&1 == follower_address)) +    else +      false +    end    end    def is_direct?(%Activity{data: %{"directMessage" => true}}), do: true diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 69879476e..2bea51311 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -155,13 +155,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do      with %User{} = user <- User.get_cached_by_nickname(nickname),           true <- Utils.recipient_in_message(user.ap_id, params),           params <- Utils.maybe_splice_recipient(user.ap_id, params) do -      Federator.enqueue(:incoming_ap_doc, params) +      Federator.incoming_ap_doc(params)        json(conn, "ok")      end    end    def inbox(%{assigns: %{valid_signature: true}} = conn, params) do -    Federator.enqueue(:incoming_ap_doc, params) +    Federator.incoming_ap_doc(params)      json(conn, "ok")    end diff --git a/lib/pleroma/web/activity_pub/mrf/hellthread_policy.ex b/lib/pleroma/web/activity_pub/mrf/hellthread_policy.ex index 8ab1dd4e5..6736f3cb9 100644 --- a/lib/pleroma/web/activity_pub/mrf/hellthread_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/hellthread_policy.ex @@ -12,14 +12,14 @@ defmodule Pleroma.Web.ActivityPub.MRF.HellthreadPolicy do      follower_collection? = Enum.member?(message["to"] ++ message["cc"], follower_collection)      message = -      case recipients = get_recipient_count(message) do -        {:public, _} +      case get_recipient_count(message) do +        {:public, recipients}          when follower_collection? and recipients > threshold ->            message            |> Map.put("to", [follower_collection])            |> Map.put("cc", ["https://www.w3.org/ns/activitystreams#Public"]) -        {:public, _} when recipients > threshold -> +        {:public, recipients} when recipients > threshold ->            message            |> Map.put("to", [])            |> Map.put("cc", ["https://www.w3.org/ns/activitystreams#Public"]) diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 26b2dd575..41d89a02b 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -406,7 +406,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do        if not User.locked?(followed) do          ActivityPub.accept(%{            to: [follower.ap_id], -          actor: followed.ap_id, +          actor: followed,            object: data,            local: true          }) @@ -432,7 +432,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do             ActivityPub.accept(%{               to: follow_activity.data["to"],               type: "Accept", -             actor: followed.ap_id, +             actor: followed,               object: follow_activity.data["id"],               local: false             }) do @@ -458,7 +458,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do             ActivityPub.reject(%{               to: follow_activity.data["to"],               type: "Reject", -             actor: followed.ap_id, +             actor: followed,               object: follow_activity.data["id"],               local: false             }) do diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 964e11c9d..6a89374d0 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -164,7 +164,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do          _ -> 5        end -    Pleroma.Web.Federator.enqueue(:publish, activity, priority) +    Pleroma.Web.Federator.publish(activity, priority)      :ok    end diff --git a/lib/pleroma/web/common_api/common_api.ex b/lib/pleroma/web/common_api/common_api.ex index 86f249c54..90b208e54 100644 --- a/lib/pleroma/web/common_api/common_api.ex +++ b/lib/pleroma/web/common_api/common_api.ex @@ -95,7 +95,7 @@ defmodule Pleroma.Web.CommonAPI do      limit = Pleroma.Config.get([:instance, :limit])      with status <- String.trim(status), -         attachments <- attachments_from_ids(data["media_ids"]), +         attachments <- attachments_from_ids(data),           mentions <- Formatter.parse_mentions(status),           inReplyTo <- get_replied_to_activity(data["in_reply_to_status_id"]),           {to, cc} <- to_for_user_and_mentions(user, mentions, inReplyTo, visibility), diff --git a/lib/pleroma/web/common_api/utils.ex b/lib/pleroma/web/common_api/utils.ex index 123107b56..abdeee947 100644 --- a/lib/pleroma/web/common_api/utils.ex +++ b/lib/pleroma/web/common_api/utils.ex @@ -35,12 +35,28 @@ defmodule Pleroma.Web.CommonAPI.Utils do    def get_replied_to_activity(_), do: nil -  def attachments_from_ids(ids) do +  def attachments_from_ids(data) do +    if Map.has_key?(data, "descriptions") do +      attachments_from_ids_descs(data["media_ids"], data["descriptions"]) +    else +      attachments_from_ids_no_descs(data["media_ids"]) +    end +  end + +  def attachments_from_ids_no_descs(ids) do      Enum.map(ids || [], fn media_id ->        Repo.get(Object, media_id).data      end)    end +  def attachments_from_ids_descs(ids, descs_str) do +    {_, descs} = Jason.decode(descs_str) + +    Enum.map(ids || [], fn media_id -> +      Map.put(Repo.get(Object, media_id).data, "name", descs[media_id]) +    end) +  end +    def to_for_user_and_mentions(user, mentions, inReplyTo, "public") do      mentioned_users = Enum.map(mentions, fn {_, %{ap_id: ap_id}} -> ap_id end) diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 468959a65..d4e2a9742 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,8 +3,6 @@  # SPDX-License-Identifier: AGPL-3.0-only  defmodule Pleroma.Web.Federator do -  use GenServer -    alias Pleroma.Activity    alias Pleroma.User    alias Pleroma.Web.WebFinger @@ -16,45 +14,71 @@ defmodule Pleroma.Web.Federator do    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.Federator.RetryQueue    alias Pleroma.Web.OStatus +  alias Pleroma.Jobs    require Logger    @websub Application.get_env(:pleroma, :websub)    @ostatus Application.get_env(:pleroma, :ostatus) -  def init(args) do -    {:ok, args} +  def init() do +    # 1 minute +    Process.sleep(1000 * 60 * 1) +    refresh_subscriptions()    end -  def start_link do -    spawn(fn -> -      # 1 minute -      Process.sleep(1000 * 60) -      enqueue(:refresh_subscriptions, nil) -    end) +  # Client API + +  def incoming_doc(doc) do +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) +  end + +  def incoming_ap_doc(params) do +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) +  end + +  def publish(activity, priority \\ 1) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) +  end + +  def publish_single_ap(params) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params]) +  end -    GenServer.start_link( -      __MODULE__, -      %{ -        in: {:sets.new(), []}, -        out: {:sets.new(), []} -      }, -      name: __MODULE__ -    ) +  def publish_single_websub(websub) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])    end -  def handle(:refresh_subscriptions, _) do +  def verify_websub(websub) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) +  end + +  def request_subscription(sub) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) +  end + +  def refresh_subscriptions() do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) +  end + +  def publish_single_salmon(params) do +    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params]) +  end + +  # Job Worker Callbacks + +  def perform(:refresh_subscriptions) do      Logger.debug("Federator running refresh subscriptions")      Websub.refresh_subscriptions()      spawn(fn ->        # 6 hours        Process.sleep(1000 * 60 * 60 * 6) -      enqueue(:refresh_subscriptions, nil) +      refresh_subscriptions()      end)    end -  def handle(:request_subscription, websub) do +  def perform(:request_subscription, websub) do      Logger.debug("Refreshing #{websub.topic}")      with {:ok, websub} <- Websub.request_subscription(websub) do @@ -64,7 +88,7 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(:publish, activity) do +  def perform(:publish, activity) do      Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)      with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do @@ -90,7 +114,7 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(:verify_websub, websub) do +  def perform(:verify_websub, websub) do      Logger.debug(fn ->        "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"      end) @@ -98,12 +122,12 @@ defmodule Pleroma.Web.Federator do      @websub.verify(websub)    end -  def handle(:incoming_doc, doc) do +  def perform(:incoming_doc, doc) do      Logger.info("Got document, trying to parse")      @ostatus.handle_incoming(doc)    end -  def handle(:incoming_ap_doc, params) do +  def perform(:incoming_ap_doc, params) do      Logger.info("Handling incoming AP activity")      params = Utils.normalize_params(params) @@ -128,11 +152,11 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(:publish_single_salmon, params) do +  def perform(:publish_single_salmon, params) do      Salmon.send_to_user(params)    end -  def handle(:publish_single_ap, params) do +  def perform(:publish_single_ap, params) do      case ActivityPub.publish_one(params) do        {:ok, _} ->          :ok @@ -142,7 +166,7 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle( +  def perform(          :publish_single_websub,          %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params        ) do @@ -155,74 +179,11 @@ defmodule Pleroma.Web.Federator do      end    end -  def handle(type, _) do +  def perform(type, _) do      Logger.debug(fn -> "Unknown task: #{type}" end)      {:error, "Don't know what to do with this"}    end -  if Mix.env() == :test do -    def enqueue(type, payload, _priority \\ 1) do -      if Pleroma.Config.get([:instance, :federating]) do -        handle(type, payload) -      end -    end -  else -    def enqueue(type, payload, priority \\ 1) do -      if Pleroma.Config.get([:instance, :federating]) do -        GenServer.cast(__MODULE__, {:enqueue, type, payload, priority}) -      end -    end -  end - -  def maybe_start_job(running_jobs, queue) do -    if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do -      {{type, payload}, queue} = queue_pop(queue) -      {:ok, pid} = Task.start(fn -> handle(type, payload) end) -      mref = Process.monitor(pid) -      {:sets.add_element(mref, running_jobs), queue} -    else -      {running_jobs, queue} -    end -  end - -  def handle_cast({:enqueue, type, payload, _priority}, state) -      when type in [:incoming_doc, :incoming_ap_doc] do -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state -    i_queue = enqueue_sorted(i_queue, {type, payload}, 1) -    {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} -  end - -  def handle_cast({:enqueue, type, payload, _priority}, state) do -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state -    o_queue = enqueue_sorted(o_queue, {type, payload}, 1) -    {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} -  end - -  def handle_cast(_, state) do -    {:noreply, state} -  end - -  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do -    %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state -    i_running_jobs = :sets.del_element(ref, i_running_jobs) -    o_running_jobs = :sets.del_element(ref, o_running_jobs) -    {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) -    {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - -    {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} -  end - -  def enqueue_sorted(queue, element, priority) do -    [%{item: element, priority: priority} | queue] -    |> Enum.sort_by(fn %{priority: priority} -> priority end) -  end - -  def queue_pop([%{item: element} | queue]) do -    {element, queue} -  end -    def ap_enabled_actor(id) do      user = User.get_by_ap_id(id) diff --git a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex index 942bb4338..72a0f18b6 100644 --- a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex +++ b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex @@ -689,7 +689,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do           {:ok, _activity} <-             ActivityPub.accept(%{               to: [follower.ap_id], -             actor: followed.ap_id, +             actor: followed,               object: follow_activity.data["id"],               type: "Accept"             }) do @@ -711,7 +711,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do           {:ok, _activity} <-             ActivityPub.reject(%{               to: [follower.ap_id], -             actor: followed.ap_id, +             actor: followed,               object: follow_activity.data["id"],               type: "Reject"             }) do @@ -1060,6 +1060,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do        accounts =          Map.put(%{}, user.id, AccountView.render("account.json", %{user: user, for: user})) +      flavour = get_user_flavour(user) +        initial_state =          %{            meta: %{ @@ -1144,7 +1146,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do        conn        |> put_layout(false)        |> put_view(MastodonView) -      |> render("index.html", %{initial_state: initial_state}) +      |> render("index.html", %{initial_state: initial_state, flavour: flavour})      else        conn        |> redirect(to: "/web/login") @@ -1166,6 +1168,43 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do      end    end +  @supported_flavours ["glitch", "vanilla"] + +  def set_flavour(%{assigns: %{user: user}} = conn, %{"flavour" => flavour} = _params) +      when flavour in @supported_flavours do +    flavour_cng = User.Info.mastodon_flavour_update(user.info, flavour) + +    with changeset <- Ecto.Changeset.change(user), +         changeset <- Ecto.Changeset.put_embed(changeset, :info, flavour_cng), +         {:ok, user} <- User.update_and_set_cache(changeset), +         flavour <- user.info.flavour do +      json(conn, flavour) +    else +      e -> +        conn +        |> put_resp_content_type("application/json") +        |> send_resp(500, Jason.encode!(%{"error" => inspect(e)})) +    end +  end + +  def set_flavour(conn, _params) do +    conn +    |> put_status(400) +    |> json(%{error: "Unsupported flavour"}) +  end + +  def get_flavour(%{assigns: %{user: user}} = conn, _params) do +    json(conn, get_user_flavour(user)) +  end + +  defp get_user_flavour(%User{info: %{flavour: flavour}}) when flavour in @supported_flavours do +    flavour +  end + +  defp get_user_flavour(_) do +    "glitch" +  end +    def login(conn, %{"code" => code}) do      with {:ok, app} <- get_or_make_app(),           %Authorization{} = auth <- Repo.get_by(Authorization, token: code, app_id: app.id), diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex index 69f5f992c..a49b381c9 100644 --- a/lib/pleroma/web/mastodon_api/views/status_view.ex +++ b/lib/pleroma/web/mastodon_api/views/status_view.ex @@ -166,7 +166,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do        sensitive: sensitive,        spoiler_text: object["summary"] || "",        visibility: get_visibility(object), -      media_attachments: attachments |> Enum.take(4), +      media_attachments: attachments,        mentions: mentions,        tags: build_tags(tags),        application: %{ diff --git a/lib/pleroma/web/oauth/token.ex b/lib/pleroma/web/oauth/token.ex index ea4d56a29..ca67632ba 100644 --- a/lib/pleroma/web/oauth/token.ex +++ b/lib/pleroma/web/oauth/token.ex @@ -50,9 +50,27 @@ defmodule Pleroma.Web.OAuth.Token do    def delete_user_tokens(%User{id: user_id}) do      from( -      t in Pleroma.Web.OAuth.Token, +      t in Token,        where: t.user_id == ^user_id      )      |> Repo.delete_all()    end + +  def delete_user_token(%User{id: user_id}, token_id) do +    from( +      t in Token, +      where: t.user_id == ^user_id, +      where: t.id == ^token_id +    ) +    |> Repo.delete_all() +  end + +  def get_user_tokens(%User{id: user_id}) do +    from( +      t in Token, +      where: t.user_id == ^user_id +    ) +    |> Repo.all() +    |> Repo.preload(:app) +  end  end diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex index db4c8f4da..bab3da2b0 100644 --- a/lib/pleroma/web/ostatus/ostatus_controller.ex +++ b/lib/pleroma/web/ostatus/ostatus_controller.ex @@ -87,7 +87,7 @@ defmodule Pleroma.Web.OStatus.OStatusController do      {:ok, body, _conn} = read_body(conn)      {:ok, doc} = decode_or_retry(body) -    Federator.enqueue(:incoming_doc, doc) +    Federator.incoming_doc(doc)      conn      |> send_resp(200, "") diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index 81e83579e..3692e13e3 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -230,6 +230,8 @@ defmodule Pleroma.Web.Router do        get("/suggestions", MastodonAPIController, :suggestions)        get("/endorsements", MastodonAPIController, :empty_array) + +      get("/pleroma/flavour", MastodonAPIController, :get_flavour)      end      scope [] do @@ -265,6 +267,8 @@ defmodule Pleroma.Web.Router do        get("/filters/:id", MastodonAPIController, :get_filter)        put("/filters/:id", MastodonAPIController, :update_filter)        delete("/filters/:id", MastodonAPIController, :delete_filter) + +      post("/pleroma/flavour/:flavour", MastodonAPIController, :set_flavour)      end      scope [] do @@ -402,6 +406,9 @@ defmodule Pleroma.Web.Router do    scope "/api", Pleroma.Web, as: :authenticated_twitter_api do      pipe_through(:authenticated_api) +    get("/oauth_tokens", TwitterAPI.Controller, :oauth_tokens) +    delete("/oauth_tokens/:id", TwitterAPI.Controller, :revoke_token) +      scope [] do        pipe_through(:oauth_read) diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index a5a9e16c6..0a69aa1ec 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -229,7 +229,7 @@ defmodule Pleroma.Web.Salmon do        |> Enum.each(fn remote_user ->          Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) -        Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{ +        Pleroma.Web.Federator.publish_single_salmon(%{            recipient: remote_user,            feed: feed,            poster: poster, diff --git a/lib/pleroma/web/templates/mastodon_api/mastodon/index.html.eex b/lib/pleroma/web/templates/mastodon_api/mastodon/index.html.eex index 9a725e420..5659c7828 100644 --- a/lib/pleroma/web/templates/mastodon_api/mastodon/index.html.eex +++ b/lib/pleroma/web/templates/mastodon_api/mastodon/index.html.eex @@ -8,7 +8,7 @@  </title>  <link rel="icon" type="image/png" href="/favicon.png"/>  <script crossorigin='anonymous' src="/packs/locales.js"></script> -<script crossorigin='anonymous' src="/packs/locales/glitch/en.js"></script> +<script crossorigin='anonymous' src="/packs/locales/<%= @flavour %>/en.js"></script>  <link rel='preload' as='script' crossorigin='anonymous' href='/packs/features/getting_started.js'>  <link rel='preload' as='script' crossorigin='anonymous' href='/packs/features/compose.js'> @@ -19,10 +19,10 @@  <script src="/packs/core/common.js"></script>  <link rel="stylesheet" media="all" href="/packs/core/common.css" /> -<script src="/packs/flavours/glitch/common.js"></script> -<link rel="stylesheet" media="all" href="/packs/flavours/glitch/common.css" /> +<script src="/packs/flavours/<%= @flavour %>/common.js"></script> +<link rel="stylesheet" media="all" href="/packs/flavours/<%= @flavour %>/common.css" /> -<script src="/packs/flavours/glitch/home.js"></script> +<script src="/packs/flavours/<%= @flavour %>/home.js"></script>  </head>  <body class='app-body no-reduce-motion system-font'>    <div class='app-holder' data-props='{"locale":"en"}' id='mastodon'> diff --git a/lib/pleroma/web/twitter_api/twitter_api_controller.ex b/lib/pleroma/web/twitter_api/twitter_api_controller.ex index c2f0dc2a9..b815379fd 100644 --- a/lib/pleroma/web/twitter_api/twitter_api_controller.ex +++ b/lib/pleroma/web/twitter_api/twitter_api_controller.ex @@ -8,6 +8,10 @@ defmodule Pleroma.Web.TwitterAPI.Controller do    import Pleroma.Web.ControllerHelper, only: [json_response: 3]    alias Ecto.Changeset +  alias Pleroma.Web.TwitterAPI.{TwitterAPI, UserView, ActivityView, NotificationView, TokenView} +  alias Pleroma.Web.CommonAPI +  alias Pleroma.{Repo, Activity, Object, User, Notification} +  alias Pleroma.Web.OAuth.Token    alias Pleroma.Web.ActivityPub.ActivityPub    alias Pleroma.Web.ActivityPub.Utils    alias Pleroma.Web.CommonAPI @@ -524,6 +528,9 @@ defmodule Pleroma.Web.TwitterAPI.Controller do    def friends(%{assigns: %{user: for_user}} = conn, params) do      {:ok, page} = Ecto.Type.cast(:integer, params["page"] || 1) +    {:ok, export} = Ecto.Type.cast(:boolean, params["all"] || false) + +    page = if export, do: nil, else: page      with {:ok, user} <- TwitterAPI.get_user(conn.assigns[:user], params),           {:ok, friends} <- User.get_friends(user, page) do @@ -542,6 +549,20 @@ defmodule Pleroma.Web.TwitterAPI.Controller do      end    end +  def oauth_tokens(%{assigns: %{user: user}} = conn, _params) do +    with oauth_tokens <- Token.get_user_tokens(user) do +      conn +      |> put_view(TokenView) +      |> render("index.json", %{tokens: oauth_tokens}) +    end +  end + +  def revoke_token(%{assigns: %{user: user}} = conn, %{"id" => id} = _params) do +    Token.delete_user_token(user, id) + +    json_reply(conn, 201, "") +  end +    def blocks(%{assigns: %{user: user}} = conn, _params) do      with blocked_users <- User.blocked_users(user) do        conn @@ -570,7 +591,7 @@ defmodule Pleroma.Web.TwitterAPI.Controller do           {:ok, _activity} <-             ActivityPub.accept(%{               to: [follower.ap_id], -             actor: followed.ap_id, +             actor: followed,               object: follow_activity.data["id"],               type: "Accept"             }) do @@ -590,7 +611,7 @@ defmodule Pleroma.Web.TwitterAPI.Controller do           {:ok, _activity} <-             ActivityPub.reject(%{               to: [follower.ap_id], -             actor: followed.ap_id, +             actor: followed,               object: follow_activity.data["id"],               type: "Reject"             }) do diff --git a/lib/pleroma/web/twitter_api/views/token_view.ex b/lib/pleroma/web/twitter_api/views/token_view.ex new file mode 100644 index 000000000..3ff314913 --- /dev/null +++ b/lib/pleroma/web/twitter_api/views/token_view.ex @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.TwitterAPI.TokenView do +  use Pleroma.Web, :view + +  def render("index.json", %{tokens: tokens}) do +    tokens +    |> render_many(Pleroma.Web.TwitterAPI.TokenView, "show.json") +    |> Enum.filter(&Enum.any?/1) +  end + +  def render("show.json", %{token: token_entry}) do +    %{ +      id: token_entry.id, +      valid_until: token_entry.valid_until, +      app_name: token_entry.app.client_name +    } +  end +end diff --git a/lib/pleroma/web/twitter_api/views/user_view.ex b/lib/pleroma/web/twitter_api/views/user_view.ex index a09450df7..df7384476 100644 --- a/lib/pleroma/web/twitter_api/views/user_view.ex +++ b/lib/pleroma/web/twitter_api/views/user_view.ex @@ -113,10 +113,12 @@ defmodule Pleroma.Web.TwitterAPI.UserView do        "fields" => fields,        # Pleroma extension -      "pleroma" => %{ -        "confirmation_pending" => user_info.confirmation_pending, -        "tags" => user.tags -      } +      "pleroma" => +        %{ +          "confirmation_pending" => user_info.confirmation_pending, +          "tags" => user.tags +        } +        |> maybe_with_follow_request_count(user, for_user)      }      data = @@ -132,6 +134,14 @@ defmodule Pleroma.Web.TwitterAPI.UserView do      end    end +  defp maybe_with_follow_request_count(data, %User{id: id, info: %{locked: true}} = user, %User{ +         id: id +       }) do +    Map.put(data, "follow_request_count", user.info.follow_request_count) +  end + +  defp maybe_with_follow_request_count(data, _, _), do: data +    defp maybe_with_role(data, %User{id: id} = user, %User{id: id}) do      Map.merge(data, %{"role" => role(user), "show_role" => user.info.show_role})    end diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index a08d7993d..c00ec0858 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -13,6 +13,7 @@ defmodule Pleroma.Web.Websub do    alias Pleroma.Web.Endpoint    alias Pleroma.Web.OStatus    alias Pleroma.Web.Router.Helpers +  alias Pleroma.Web.Federator    require Logger    import Ecto.Query @@ -87,7 +88,7 @@ defmodule Pleroma.Web.Websub do          unreachable_since: reachable_callbacks_metadata[sub.callback]        } -      Pleroma.Web.Federator.enqueue(:publish_single_websub, data) +      Federator.publish_single_websub(data)      end)    end @@ -119,7 +120,7 @@ defmodule Pleroma.Web.Websub do        websub = Repo.update!(change) -      Pleroma.Web.Federator.enqueue(:verify_websub, websub) +      Federator.verify_websub(websub)        {:ok, websub}      else @@ -269,7 +270,7 @@ defmodule Pleroma.Web.Websub do      subs = Repo.all(query)      Enum.each(subs, fn sub -> -      Pleroma.Web.Federator.enqueue(:request_subscription, sub) +      Federator.request_subscription(sub)      end)    end diff --git a/lib/pleroma/web/websub/websub_controller.ex b/lib/pleroma/web/websub/websub_controller.ex index 1ad18a8a4..ad40f1b94 100644 --- a/lib/pleroma/web/websub/websub_controller.ex +++ b/lib/pleroma/web/websub/websub_controller.ex @@ -84,7 +84,7 @@ defmodule Pleroma.Web.Websub.WebsubController do           %WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id),           {:ok, body, _conn} = read_body(conn),           ^signature <- Websub.sign(websub.secret, body) do -      Federator.enqueue(:incoming_doc, body) +      Federator.incoming_doc(body)        conn        |> send_resp(200, "OK") | 
