notifications.rb 4.6 KB
Newer Older
J
José Valim 已提交
1
require 'thread'
2
require 'active_support/core_ext/module/delegation'
3
require 'active_support/core_ext/module/attribute_accessors'
J
Jeremy Kemper 已提交
4
require 'active_support/secure_random'
J
José Valim 已提交
5 6

module ActiveSupport
7 8
  # Notifications provides an instrumentation API for Ruby. To instrument an
  # action in Ruby you just need to do:
J
José Valim 已提交
9
  #
10
  #   ActiveSupport::Notifications.instrument(:render, :extra => :information) do
J
José Valim 已提交
11 12 13
  #     render :text => "Foo"
  #   end
  #
14 15
  # You can consume those events and the information they provide by registering
  # a subscriber. For instance, let's store all instrumented events in an array:
J
José Valim 已提交
16
  #
17 18
  #   @events = []
  #
19
  #   ActiveSupport::Notifications.subscribe do |event|
20 21
  #     @events << event
  #   end
J
José Valim 已提交
22
  #
23
  #   ActiveSupport::Notifications.instrument(:render, :extra => :information) do
J
José Valim 已提交
24 25 26
  #     render :text => "Foo"
  #   end
  #
27
  #   event = @events.first
28
  #   event.class     #=> ActiveSupport::Notifications::Event
J
José Valim 已提交
29 30 31 32 33
  #   event.name      #=> :render
  #   event.duration  #=> 10 (in miliseconds)
  #   event.result    #=> "Foo"
  #   event.payload   #=> { :extra => :information }
  #
34
  # When subscribing to Notifications, you can pass a pattern, to only consume
35 36
  # events that match the pattern:
  #
37
  #   ActiveSupport::Notifications.subscribe(/render/) do |event|
38 39 40
  #     @render_events << event
  #   end
  #
41
  # Notifications ships with a queue implementation that consumes and publish events
42
  # to subscribers in a thread. You can use any queue implementation you want.
J
José Valim 已提交
43
  #
44
  module Notifications
45 46
    mattr_accessor :queue

47
    class << self
48
      delegate :instrument, :transaction_id, :generate_id, :to => :instrumenter
49 50

      def instrumenter
51
        Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
52 53 54 55 56 57 58 59 60 61 62 63 64 65
      end

      def publisher
        @publisher ||= Publisher.new(queue)
      end

      def subscribe(pattern=nil, &block)
        Subscriber.new(queue).bind(pattern).subscribe(&block)
      end
    end

    class Instrumenter
      def initialize(publisher)
        @publisher = publisher
66 67 68 69 70 71 72 73 74 75 76 77
        @id        = random_id
      end

      def transaction
        @id, old_id = random_id, @id
        yield
      ensure
        @id = old_id
      end

      def transaction_id
        @id
78 79
      end

J
José Valim 已提交
80
      def instrument(name, payload={})
81 82
        time   = Time.now
        result = yield if block_given?
83
      ensure
84
        @publisher.publish(name, time, Time.now, result, @id, payload)
85
      end
86 87 88 89 90

    private
      def random_id
        SecureRandom.hex(10)
      end
J
José Valim 已提交
91 92
    end

93 94 95 96 97
    class Publisher
      def initialize(queue)
        @queue = queue
      end

98 99
      def publish(*args)
        @queue.publish(*args)
100 101 102 103 104 105 106 107 108 109 110 111 112 113
      end
    end

    class Subscriber
      def initialize(queue)
        @queue = queue
      end

      def bind(pattern)
        @pattern = pattern
        self
      end

      def subscribe
114
        @queue.subscribe(@pattern) do |*args|
115
          yield *args
116 117
        end
      end
J
José Valim 已提交
118 119 120
    end

    class Event
121
      attr_reader :name, :time, :end, :thread_id, :result, :payload
J
José Valim 已提交
122

123
      def initialize(name, start, ending, result, thread_id, payload)
J
José Valim 已提交
124
        @name      = name
J
José Valim 已提交
125
        @payload   = payload.dup
126 127 128 129 130 131 132 133
        @time      = start
        @thread_id = thread_id
        @end       = ending
        @result    = result
      end

      def duration
        @duration ||= 1000.0 * (@end - @time)
J
José Valim 已提交
134
      end
135 136 137

      def parent_of?(event)
        start = (self.time - event.time) * 1000
138
        start <= 0 && (start + duration >= event.duration)
139
      end
J
José Valim 已提交
140 141
    end

142
    # This is a default queue implementation that ships with Notifications. It
143 144 145
    # consumes events in a thread and publish them to all registered subscribers.
    #
    class LittleFanout
J
José Valim 已提交
146
      def initialize
147 148 149
        @listeners = []
        @stream    = Queue.new
        Thread.new { consume }
J
José Valim 已提交
150 151
      end

152 153
      def publish(*args)
        @stream.push(args)
J
José Valim 已提交
154 155
      end

156 157
      def subscribe(pattern=nil, &block)
        @listeners << Listener.new(pattern, &block)
J
José Valim 已提交
158 159
      end

160
      def consume
161 162
        while args = @stream.shift
          @listeners.each { |l| l.publish(*args) }
163
        end
164 165 166
      end

      class Listener
167
        # attr_reader :thread
168

169 170 171
        def initialize(pattern, &block)
          @pattern = pattern
          @subscriber = block
172
          @queue = Queue.new
173
          Thread.new { consume }
174 175
        end

176 177 178
        def publish(name, *args)
          if !@pattern || @pattern === name.to_s
            @queue << args.unshift(name)
179 180 181 182
          end
        end

        def consume
183 184
          while args = @queue.shift
            @subscriber.call(*args)
185 186
          end
        end
J
José Valim 已提交
187 188 189
      end
    end
  end
190

191
  Notifications.queue = Notifications::LittleFanout.new
J
José Valim 已提交
192
end