Skip to Content Skip to Search

Action Cable Server Base

A singleton ActionCable::Server instance is available via ActionCable.server. It’s used by the Rack process that starts the Action Cable server, but is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers.

Also, this is the server instance used for broadcasting. See Broadcasting for more information.

Methods
A
C
D
E
L
N
P
R
W
Included Modules

Attributes

[R] config
[R] mutex

Class Public methods

logger()

# File actioncable/lib/action_cable/server/base.rb, line 49
def self.logger; config.logger; end

new(config: self.class.config)

# File actioncable/lib/action_cable/server/base.rb, line 54
def initialize(config: self.class.config)
  @config = config
  @mutex = Monitor.new
  @remote_connections = @event_loop = @worker_pool = @executor = @pubsub = nil
end

Instance Public methods

allow_request_origin?(env)

Check if the request origin is allowed to connect to the Action Cable server.

# File actioncable/lib/action_cable/server/base.rb, line 146
def allow_request_origin?(env)
  return true if config.disable_request_forgery_protection

  request = ActionDispatch::Request.new(env)
  proto = request.ssl? ? "https" : "http"
  if config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{request.host_with_port}"
    true
  elsif Array(config.allowed_request_origins).any? { |allowed_origin|  allowed_origin === env["HTTP_ORIGIN"] }
    true
  else
    logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
    false
  end
end

call(env)

Called by Rack to set up the server.

# File actioncable/lib/action_cable/server/base.rb, line 61
def call(env)
  return config.health_check_application.call(env) if env["PATH_INFO"] == config.health_check_path
  setup_heartbeat_timer
  Socket.new(self, env).process
end

connection_identifiers()

All of the identifiers applied to the connection class associated with this server.

# File actioncable/lib/action_cable/server/base.rb, line 134
def connection_identifiers
  config.connection_class.call.identifiers
end

disconnect(identifiers)

Disconnect all the connections identified by identifiers on this server or any others via RemoteConnections.

# File actioncable/lib/action_cable/server/base.rb, line 69
def disconnect(identifiers)
  remote_connections.where(identifiers).disconnect
end

event_loop()

# File actioncable/lib/action_cable/server/base.rb, line 98
def event_loop
  @event_loop || @mutex.synchronize { @event_loop ||= StreamEventLoop.new }
end

executor()

Executor is used by various actions within Action Cable (e.g., pub/sub operations) to run code asynchronously.

# File actioncable/lib/action_cable/server/base.rb, line 123
def executor
  @executor || @mutex.synchronize { @executor ||= ThreadedExecutor.new(max_size: config.executor_pool_size, name: "streamer") }
end

new_tagged_logger(request = nil, &block)

Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. You can pass request object either directly or via block to lazily evaluate it.

# File actioncable/lib/action_cable/server/base.rb, line 140
def new_tagged_logger(request = nil, &block)
  TaggedLoggerProxy.new logger,
    tags: config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request ||= block.call) : tag.to_s.camelize }
end

pubsub()

Adapter used for all streams/broadcasting.

# File actioncable/lib/action_cable/server/base.rb, line 128
def pubsub
  @pubsub || (executor && @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) })
end

remote_connections()

Gateway to RemoteConnections. See that class for details.

# File actioncable/lib/action_cable/server/base.rb, line 94
def remote_connections
  @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end

restart()

# File actioncable/lib/action_cable/server/base.rb, line 73
def restart
  each_connection do |connection|
    connection.close(reason: ActionCable::INTERNAL[:disconnect_reasons][:server_restart])
  end

  @mutex.synchronize do
    # Shutdown the worker pool
    @worker_pool.halt if @worker_pool
    @worker_pool = nil

    # Shutdown the executor
    @executor.shutdown if @executor
    @executor = nil

    # Shutdown the pub/sub adapter
    @pubsub.shutdown if @pubsub
    @pubsub = nil
  end
end

worker_pool()

The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server’s main thread. The worker pool is an executor service that’s backed by a pool of threads working from a task queue. The thread pool size maxes out at 4 worker threads by default. Tune the size yourself with config.action_cable.worker_pool_size.

Using Active Record, Redis, etc within your channel actions means you’ll get a separate connection from each thread in the worker pool. Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database connections.

Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger database connection pool instead.

# File actioncable/lib/action_cable/server/base.rb, line 118
def worker_pool
  @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end