class ActiveSupport::Notifications::Fanout
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Public class methods
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 50
def initialize
@mutex = Mutex.new
@string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } }
@other_subscribers = []
@all_listeners_for = Concurrent::Map.new
@groups_for = Concurrent::Map.new
@silenceable_groups_for = Concurrent::Map.new
end
Public instance methods
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 297
def all_listeners_for(name)
# this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
@all_listeners_for[name] || @mutex.synchronize do
# use synchronisation when accessing @subscribers
@all_listeners_for[name] ||=
@string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
end
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 272
def build_handle(name, id, payload)
Handle.new(self, name, id, payload)
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 283
def finish(name, id, payload, listeners = nil)
handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
handle = handle_stack.pop
handle.finish_with_values(name, id, payload)
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 306
def listeners_for(name)
all_listeners_for(name).reject { |s| s.silenced?(name) }
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 310
def listening?(name)
all_listeners_for(name).any? { |s| !s.silenced?(name) }
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 289
def publish(name, *args)
iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 293
def publish_event(event)
iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 276
def start(name, id, payload)
handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
handle = build_handle(name, id, payload)
handle_stack << handle
handle.start
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 64
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
subscriber = Subscribers.new(pattern, callable || block, monotonic)
@mutex.synchronize do
case pattern
when String
@string_subscribers[pattern] << subscriber
clear_cache(pattern)
when NilClass, Regexp
@other_subscribers << subscriber
clear_cache
else
raise ArgumentError, "pattern must be specified as a String, Regexp or empty"
end
end
subscriber
end
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 81
def unsubscribe(subscriber_or_name)
@mutex.synchronize do
case subscriber_or_name
when String
@string_subscribers[subscriber_or_name].clear
clear_cache(subscriber_or_name)
@other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
else
pattern = subscriber_or_name.try(:pattern)
if String === pattern
@string_subscribers[pattern].delete(subscriber_or_name)
clear_cache(pattern)
else
@other_subscribers.delete(subscriber_or_name)
clear_cache
end
end
end
end
This is a sync queue, so there is no waiting.
Source code GitHub
# File activesupport/lib/active_support/notifications/fanout.rb, line 315
def wait
end