提交 bf8e2058 编写于 作者: A Aaron Patterson

move fanout back to a global variable, add a mutex for safety

上级 ceba010e
......@@ -135,24 +135,9 @@ module ActiveSupport
# to log subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
class Registry # :nodoc:
def self.instance
Thread.current[name] ||= new
end
attr_reader :notifier, :instrumenter
def initialize
self.notifier = Fanout.new
end
def notifier=(notifier)
@notifier = notifier
@instrumenter = Instrumenter.new(notifier)
end
end
class << self
attr_accessor :notifier
def publish(name, *args)
notifier.publish(name, *args)
end
......@@ -181,16 +166,10 @@ def unsubscribe(args)
end
def instrumenter
Registry.instance.instrumenter
end
def notifier
Registry.instance.notifier
end
def notifier=(notifier)
Registry.instance.notifier = notifier
Thread.current[:"instrumentation_#{notifier.object_id}"] ||= Instrumenter.new(notifier)
end
end
self.notifier = Fanout.new
end
end
require 'mutex_m'
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# Only one of these objects should instantiated per thread. Concurrent
# access to this class is not allowed.
# This class is thread safe. All methods are reentrant.
class Fanout
include Mutex_m
def initialize
@subscribers = []
@listeners_for = {}
super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
@subscribers << subscriber
@listeners_for.clear
synchronize do
@subscribers << subscriber
@listeners_for.clear
end
subscriber
end
def unsubscribe(subscriber)
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
synchronize do
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
end
end
def start(name, id, payload)
......@@ -36,7 +44,9 @@ def publish(name, *args)
end
def listeners_for(name)
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
synchronize do
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end
def listening?(name)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册