edge badge
Methods
A
E
I
L
N
R
S
W
Class Public methods
new(adapter, event_loop)
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 61
def initialize(adapter, event_loop)
  super()

  @adapter = adapter
  @event_loop = event_loop

  @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
  @subscription_lock = Mutex.new

  @raw_client = nil

  @when_connected = []

  @thread = nil
end
Instance Public methods
add_channel(channel, on_success)
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 128
def add_channel(channel, on_success)
  @subscription_lock.synchronize do
    ensure_listener_running
    @subscribe_callbacks[channel] << on_success
    when_connected { send_command("subscribe", channel) }
  end
end
ensure_listener_running()
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 147
def ensure_listener_running
  @thread ||= Thread.new do
    Thread.current.abort_on_exception = true

    conn = @adapter.redis_connection_for_subscriptions
    listen conn
  end
end
invoke_callback(*)
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 142
def invoke_callback(*)
  @event_loop.post { super }
end
listen(conn)
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 77
def listen(conn)
  conn.without_reconnect do
    original_client = conn.respond_to?(:_client) ? conn._client : conn.client

    conn.subscribe("_action_cable_internal") do |on|
      on.subscribe do |chan, count|
        @subscription_lock.synchronize do
          if count == 1
            @raw_client = original_client

            until @when_connected.empty?
              @when_connected.shift.call
            end
          end

          if callbacks = @subscribe_callbacks[chan]
            next_callback = callbacks.shift
            @event_loop.post(&next_callback) if next_callback
            @subscribe_callbacks.delete(chan) if callbacks.empty?
          end
        end
      end

      on.message do |chan, message|
        broadcast(chan, message)
      end

      on.unsubscribe do |chan, count|
        if count == 0
          @subscription_lock.synchronize do
            @raw_client = nil
          end
        end
      end
    end
  end
end
remove_channel(channel)
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 136
def remove_channel(channel)
  @subscription_lock.synchronize do
    when_connected { send_command("unsubscribe", channel) }
  end
end
send_command(*command)
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 164
def send_command(*command)
  @raw_client.write(command)

  very_raw_connection =
    @raw_client.connection.instance_variable_defined?(:@connection) &&
    @raw_client.connection.instance_variable_get(:@connection)

  if very_raw_connection && very_raw_connection.respond_to?(:flush)
    very_raw_connection.flush
  end
end
shutdown()
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 115
  def shutdown
    @subscription_lock.synchronize do
      return if @thread.nil?

      when_connected do
        send_command("unsubscribe")
        @raw_client = nil
      end
    end

    Thread.pass while @thread.alive?
  end

  def add_channel(channel, on_success)
    @subscription_lock.synchronize do
      ensure_listener_running
      @subscribe_callbacks[channel] << on_success
      when_connected { send_command("subscribe", channel) }
    end
  end

  def remove_channel(channel)
    @subscription_lock.synchronize do
      when_connected { send_command("unsubscribe", channel) }
    end
  end

  def invoke_callback(*)
    @event_loop.post { super }
  end

  private
    def ensure_listener_running
      @thread ||= Thread.new do
        Thread.current.abort_on_exception = true

        conn = @adapter.redis_connection_for_subscriptions
        listen conn
      end
    end

    def when_connected(&block)
      if @raw_client
        block.call
      else
        @when_connected << block
      end
    end

    def send_command(*command)
      @raw_client.write(command)

      very_raw_connection =
        @raw_client.connection.instance_variable_defined?(:@connection) &&
        @raw_client.connection.instance_variable_get(:@connection)

      if very_raw_connection && very_raw_connection.respond_to?(:flush)
        very_raw_connection.flush
      end
    end
end
when_connected(&block)
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 156
def when_connected(&block)
  if @raw_client
    block.call
  else
    @when_connected << block
  end
end