dingus

/ˈdɪŋəs/ Something whose name is unknown or forgotten

Server-sent events with Rails and Rack hijack

February 2022

Server-sent events1 (SSE) are a simple way to push data to clients over plain-old HTTP and rails has also provided a tidy DLS for SSE (via [ActionController::Live]) since Rails 4.

Unfortunately long-running HTTP connections in Rails controllers tie-up server threads, causing incoming requests to queue. Borrowing from Action Cable it’s possible to move these long running connections to their own threads and put those server threads back to work serving incoming requests.

The secret sauce is Rack “hijack”2 which let’s us take control of the actual [TCPSocket] backing the incoming request. When combined with the myriad concurrency primitives in modern Rails apps (via concurrent-ruby) it’s possible to handle as many open connections as system RAM and ulimit will allow.

class ApplicationController < ActionController::API
  def stream
    # Get the `[TCPSocket]` instance backing the request
    io = request.env["rack.hijack"].call

    # Send HTTP response line and relevant headers
    io.write(
      "HTTP/1.1 200\r\n" \
      "Content-Type: text/event-stream\r\n" \
      "Cache-Control: no-cache\r\n" \
      "\r\n"
    )

    # Periodically spawn a thread to send a keepalive
    keepalive = Concurrent::TimerTask.execute(execution_interval: 5) do
      io.write(":keepalive\n\n")
    end

    # Watch for and handle failed keepalives
    keepalive.add_observer do |_time, _result, ex|
      break unless ex.present?

      if ex.is_a?(Errno::EPIPE)
        # We expect "broken pipe" errors if we've written to a closed socket
        logger.debug("Client disconnected")
      end

      # Stop the timer task spawning new threads
      keepalive.shutdown

      # Close the socket
      io.close

      # Dereference everything so it can be garbage collected
      io = keepalive = nil
    end
  end
end

Testing our new action with curl we see the following:

$> curl -v --no-buffer http://localhost:3000/
*   Trying ::1:3000...
* Connected to localhost (::1) port 3000 (#0)
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/7.77.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: text/event-stream
< Cache-Control: no-cache
* no chunk, no close, no size. Assume close to signal end
<
:keepalive

:keepalive

By “hijacking” the socket and passing it to a separate thread of sending data it’s possible to hold open as many connections as ulimit or system memory will allow, event on a single threaded server, while also still serving regular requests.

Reusing the configured Action Cable pub/sub adapter, available through the global ActionCable.server.pubsub, it’s possible to subscribe to and deliver events to clients in near realtime.

class ApplicationController < ActionController::API
  def stream
    # Get the `[TCPSocket]` instance backing the request
    io = request.env["rack.hijack"].call

    # Handler for new broadcasts
    on_message = ->(data) { io.write("data: #{data}\n\n") }

    # Send HTTP response line and relevant headers
    io.write(
      "HTTP/1.1 200\r\n" \
      "Content-Type: text/event-stream\r\n" \
      "Cache-Control: no-cache\r\n" \
      "\r\n"
    )

    # Subscribe to the "/sse/test" channel
    ActionCable.server.pubsub.subscribe("/sse/test", on_message)

    # Periodically spawn a thread to send a keepalive
    keepalive = Concurrent::TimerTask.execute(execution_interval: 5) do
      io.write(":keepalive\n\n")
    end

    # Watch for and handle failed keepalives
    keepalive.add_observer do |_time, _result, ex|
      break unless ex.present?

      if ex.is_a?(Errno::EPIPE)
        # We expect "broken pipe" errors if we've written to a closed socket
        logger.debug("Client disconnected")
      end

      # Unsubscribe from the "/sse/test" channel
      ActionCable.server.pubsub.unsubscribe("/sse/test", on_message)

      # Stop the timer task spawning new threads
      keepalive.shutdown

      # Close the socket
      io.close

      # Dereference everything so it can be garbage collected
      io = keepalive = on_message = nil
    end
  end
end

Broadcasting from the Rails console:

$> bin/rails c
Loading development environment (Rails 7.0.2)
irb(main):001:0> ActionCable.server.pubsub.broadcast("/sse/test", {"foo" => "bar"}.to_json)
=> 1

In curl we see the following:

$> curl -v --no-buffer http://localhost:3000/
*   Trying ::1:3000...
* Connected to localhost (::1) port 3000 (#0)
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/7.77.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: text/event-stream
< Cache-Control: no-cache
* no chunk, no close, no size. Assume close to signal end
<
:keepalive

:keepalive

data: {"foo":"bar"}

:keepalive

📬