diff options
| -rw-r--r-- | lib/pleroma/web/mastodon_api/mastodon_socket.ex | 19 | ||||
| -rw-r--r-- | lib/pleroma/web/streamer.ex | 31 | 
2 files changed, 44 insertions, 6 deletions
| diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex index c27d025c4..f9c8cec32 100644 --- a/lib/pleroma/web/mastodon_api/mastodon_socket.ex +++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex @@ -1,12 +1,18 @@  defmodule Pleroma.Web.MastodonAPI.MastodonSocket do    use Phoenix.Socket -  transport :streaming, Phoenix.Transports.WebSocket.Raw +  transport :streaming, Phoenix.Transports.WebSocket.Raw, +    timeout: :infinity # We never receive data.    def connect(params, socket) do -    IO.inspect(params) -    Pleroma.Web.Streamer.add_socket(params["stream"], socket) -    {:ok, socket} +    if params["stream"] == "public" do +      socket = socket +      |> assign(:topic, params["stream"]) +      Pleroma.Web.Streamer.add_socket(params["stream"], socket) +      {:ok, socket} +    else +      :error +    end    end    def id(socket), do: nil @@ -21,7 +27,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do      {:text, message}    end -  def handle(:closed, reason, _state) do -    IO.inspect reason +  def handle(:closed, reason, %{socket: socket}) do +    topic = socket.assigns[:topic] +    Pleroma.Web.Streamer.remove_socket(topic, socket)    end  end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index cc3805894..3a7b91743 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -4,6 +4,10 @@ defmodule Pleroma.Web.Streamer do    import Plug.Conn    def start_link do +    spawn(fn -> +      Process.sleep(1000 * 30) # 30 seconds +      GenServer.cast(__MODULE__, %{action: :ping}) +    end)      GenServer.start_link(__MODULE__, %{}, name: __MODULE__)    end @@ -11,10 +15,28 @@ defmodule Pleroma.Web.Streamer do      GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})    end +  def remove_socket(topic, socket) do +    GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) +  end +    def stream(topic, item) do      GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})    end +  def handle_cast(%{action: :ping}, topics) do +    Map.values(topics) +    |> List.flatten +    |> Enum.each(fn (socket) -> +      Logger.debug("Sending keepalive ping") +      send socket.transport_pid, {:text, ""} +    end) +    spawn(fn -> +      Process.sleep(1000 * 30) # 30 seconds +      GenServer.cast(__MODULE__, %{action: :ping}) +    end) +    {:noreply, topics} +  end +    def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do      Logger.debug("Trying to push to #{topic}")      Logger.debug("Pushing item to #{topic}") @@ -38,6 +60,15 @@ defmodule Pleroma.Web.Streamer do      {:noreply, sockets}    end +  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do +    sockets_for_topic = sockets[topic] || [] +    sockets_for_topic = List.delete(sockets_for_topic, socket) +    sockets = Map.put(sockets, topic, sockets_for_topic) +    Logger.debug("Removed conn for #{topic}") +    IO.inspect(sockets) +    {:noreply, sockets} +  end +    def handle_cast(m, state) do      IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")      {:noreply, state} | 
