diff options
| author | Roger Braun <roger@rogerbraun.net> | 2017-11-11 14:59:25 +0100 | 
|---|---|---|
| committer | Roger Braun <roger@rogerbraun.net> | 2017-11-11 14:59:25 +0100 | 
| commit | bd5bdc4c247e2ebb239215540a51b69c356da65c (patch) | |
| tree | cf9376c31e2a26286dc38da78fab4d9f82089313 | |
| parent | a1923d20e850c6b4f187928dd739314df84047b6 (diff) | |
| download | pleroma-bd5bdc4c247e2ebb239215540a51b69c356da65c.tar.gz pleroma-bd5bdc4c247e2ebb239215540a51b69c356da65c.zip | |
MastoAPI: Basic streaming.
| -rw-r--r-- | lib/pleroma/application.ex | 3 | ||||
| -rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub.ex | 3 | ||||
| -rw-r--r-- | lib/pleroma/web/endpoint.ex | 1 | ||||
| -rw-r--r-- | lib/pleroma/web/mastodon_api/mastodon_socket.ex | 27 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer.ex | 45 | ||||
| -rw-r--r-- | lib/transports.ex | 77 | 
6 files changed, 155 insertions, 1 deletions
| diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 1f0a05568..5422cbc28 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -19,7 +19,8 @@ defmodule Pleroma.Application do                           ttl_interval: 1000,                           limit: 2500                         ]]), -      worker(Pleroma.Web.Federator, []) +      worker(Pleroma.Web.Federator, []), +      worker(Pleroma.Web.Streamer, [])      ]      # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 1624c6545..35536a1e4 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -22,6 +22,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do      with create_data <- make_create_data(%{to: to, actor: actor, published: published, context: context, object: object}, additional),           {:ok, activity} <- insert(create_data, local),           :ok <- maybe_federate(activity) do +      if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do +        Pleroma.Web.Streamer.stream("public", activity) +      end        {:ok, activity}      end    end diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex index a1b4108cd..dc1ba2a05 100644 --- a/lib/pleroma/web/endpoint.ex +++ b/lib/pleroma/web/endpoint.ex @@ -2,6 +2,7 @@ defmodule Pleroma.Web.Endpoint do    use Phoenix.Endpoint, otp_app: :pleroma    socket "/socket", Pleroma.Web.UserSocket +  socket "/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket    # Serve at "/" the static files from "priv/static" directory.    # diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex new file mode 100644 index 000000000..c27d025c4 --- /dev/null +++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex @@ -0,0 +1,27 @@ +defmodule Pleroma.Web.MastodonAPI.MastodonSocket do +  use Phoenix.Socket + +  transport :streaming, Phoenix.Transports.WebSocket.Raw + +  def connect(params, socket) do +    IO.inspect(params) +    Pleroma.Web.Streamer.add_socket(params["stream"], socket) +    {:ok, socket} +  end + +  def id(socket), do: nil + +  def handle(:text, message, state) do +    IO.inspect message +    #| :ok +    #| state +    #| {:text, message} +    #| {:text, message, state} +    #| {:close, "Goodbye!"} +    {:text, message} +  end + +  def handle(:closed, reason, _state) do +    IO.inspect reason +  end +end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex new file mode 100644 index 000000000..cc3805894 --- /dev/null +++ b/lib/pleroma/web/streamer.ex @@ -0,0 +1,45 @@ +defmodule Pleroma.Web.Streamer do +  use GenServer +  require Logger +  import Plug.Conn + +  def start_link do +    GenServer.start_link(__MODULE__, %{}, name: __MODULE__) +  end + +  def add_socket(topic, socket) do +    GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) +  end + +  def stream(topic, item) do +    GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) +  end + +  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do +    Logger.debug("Trying to push to #{topic}") +    Logger.debug("Pushing item to #{topic}") +    Enum.each(topics[topic] || [], fn (socket) -> +      json = %{ +        event: "update", +        payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode! +      } |> Poison.encode! + +      send socket.transport_pid, {:text, json} +    end) +    {:noreply, topics} +  end + +  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do +    sockets_for_topic = sockets[topic] || [] +    sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) +    sockets = Map.put(sockets, topic, sockets_for_topic) +    Logger.debug("Got new conn for #{topic}") +    IO.inspect(sockets) +    {:noreply, sockets} +  end + +  def handle_cast(m, state) do +    IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") +    {:noreply, state} +  end +end diff --git a/lib/transports.ex b/lib/transports.ex new file mode 100644 index 000000000..5600a4fdd --- /dev/null +++ b/lib/transports.ex @@ -0,0 +1,77 @@ +defmodule Phoenix.Transports.WebSocket.Raw do +  import Plug.Conn, only: [ +    fetch_query_params: 1, +    send_resp: 3 +  ] +  alias Phoenix.Socket.Transport + +  def default_config do +    [ +      timeout: 60_000, +      transport_log: false, +      cowboy: Phoenix.Endpoint.CowboyWebSocket +    ] +  end + +  def init(%Plug.Conn{method: "GET"} = conn, {endpoint, handler, transport}) do +    {_, opts} = handler.__transport__(transport) + +    conn = conn +    |> fetch_query_params +    |> Transport.transport_log(opts[:transport_log]) +    |> Transport.force_ssl(handler, endpoint, opts) +    |> Transport.check_origin(handler, endpoint, opts) + +    case conn do +      %{halted: false} = conn -> +        case Transport.connect(endpoint, handler, transport, __MODULE__, nil, conn.params) do +          {:ok, socket} -> +            {:ok, conn, {__MODULE__, {socket, opts}}} +          :error -> +            send_resp(conn, :forbidden, "") +            {:error, conn} +        end +      _ -> +        {:error, conn} +    end +  end + +  def init(conn, _) do +    send_resp(conn, :bad_request, "") +    {:error, conn} +  end + +  def ws_init({socket, config}) do +    Process.flag(:trap_exit, true) +    {:ok, %{socket: socket}, config[:timeout]} +  end + +  def ws_handle(op, data, state) do +    state.socket.handler +    |> apply(:handle, [op, data, state]) +    |> case do +      {op, data} -> +        {:reply, {op, data}, state} +      {op, data, state} -> +        {:reply, {op, data}, state} +      %{} = state -> +        {:ok, state} +      _ -> +        {:ok, state} +    end +  end + +  def ws_info({op, data} = tuple, state) do +    {:reply, tuple, state} +  end + +  def ws_info(_tuple, state), do: {:ok, state} + +  def ws_close(state) do +    ws_handle(:closed, :normal, state) +  end + +  def ws_terminate(reason, state) do +    ws_handle(:closed, reason, state) +  end +end | 
