notifications.rb 4.7 KB
Newer Older
J
José Valim 已提交
1
require 'thread'
2
require 'active_support/core_ext/module/delegation'
3
require 'active_support/core_ext/module/attribute_accessors'
J
Jeremy Kemper 已提交
4
require 'active_support/secure_random'
J
José Valim 已提交
5 6

module ActiveSupport
7 8
  # Notifications provides an instrumentation API for Ruby. To instrument an
  # action in Ruby you just need to do:
J
José Valim 已提交
9
  #
10
  #   ActiveSupport::Notifications.instrument(:render, :extra => :information) do
J
José Valim 已提交
11 12 13
  #     render :text => "Foo"
  #   end
  #
14 15
  # You can consume those events and the information they provide by registering
  # a subscriber. For instance, let's store all instrumented events in an array:
J
José Valim 已提交
16
  #
17 18
  #   @events = []
  #
19 20
  #   ActiveSupport::Notifications.subscribe do |*args|
  #     @events << ActiveSupport::Notifications::Event.new(*args)
21
  #   end
J
José Valim 已提交
22
  #
23
  #   ActiveSupport::Notifications.instrument(:render, :extra => :information) do
J
José Valim 已提交
24 25 26
  #     render :text => "Foo"
  #   end
  #
27
  #   event = @events.first
J
José Valim 已提交
28 29 30 31 32
  #   event.name      #=> :render
  #   event.duration  #=> 10 (in miliseconds)
  #   event.result    #=> "Foo"
  #   event.payload   #=> { :extra => :information }
  #
33
  # When subscribing to Notifications, you can pass a pattern, to only consume
34 35
  # events that match the pattern:
  #
36
  #   ActiveSupport::Notifications.subscribe(/render/) do |event|
37 38 39
  #     @render_events << event
  #   end
  #
40
  # Notifications ships with a queue implementation that consumes and publish events
41
  # to subscribers in a thread. You can use any queue implementation you want.
J
José Valim 已提交
42
  #
43
  module Notifications
44
    mattr_accessor :queue
45

46
    class << self
47
      delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
48 49

      def instrumenter
50
        Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
51 52 53 54 55 56
      end

      def publisher
        @publisher ||= Publisher.new(queue)
      end

57 58
      def subscribe(pattern=nil, &block)
        Subscriber.new(queue).bind(pattern).subscribe(&block)
59 60 61 62 63 64
      end
    end

    class Instrumenter
      def initialize(publisher)
        @publisher = publisher
65 66 67 68 69 70 71 72 73 74 75 76
        @id        = random_id
      end

      def transaction
        @id, old_id = random_id, @id
        yield
      ensure
        @id = old_id
      end

      def transaction_id
        @id
77 78
      end

J
José Valim 已提交
79
      def instrument(name, payload={})
80 81
        time   = Time.now
        result = yield if block_given?
82
      ensure
83
        @publisher.publish(name, time, Time.now, result, @id, payload)
84
      end
85 86 87 88 89

    private
      def random_id
        SecureRandom.hex(10)
      end
J
José Valim 已提交
90 91
    end

92 93 94 95 96
    class Publisher
      def initialize(queue)
        @queue = queue
      end

97 98
      def publish(*args)
        @queue.publish(*args)
99 100 101 102 103 104 105 106
      end
    end

    class Subscriber
      def initialize(queue)
        @queue = queue
      end

107 108
      def bind(pattern)
        @pattern = pattern
109 110 111 112
        self
      end

      def subscribe
113
        @queue.subscribe(@pattern) do |*args|
J
Jeremy Kemper 已提交
114
          yield(*args)
115 116
        end
      end
J
José Valim 已提交
117 118 119
    end

    class Event
120 121 122 123 124 125 126 127 128
      attr_reader :name, :time, :end, :transaction_id, :result, :payload

      def initialize(name, start, ending, result, transaction_id, payload)
        @name           = name
        @payload        = payload.dup
        @time           = start
        @transaction_id = transaction_id
        @end            = ending
        @result         = result
129 130 131 132
      end

      def duration
        @duration ||= 1000.0 * (@end - @time)
J
José Valim 已提交
133
      end
134 135 136

      def parent_of?(event)
        start = (self.time - event.time) * 1000
137
        start <= 0 && (start + duration >= event.duration)
138
      end
J
José Valim 已提交
139 140
    end

141
    # This is a default queue implementation that ships with Notifications. It
142 143 144
    # consumes events in a thread and publish them to all registered subscribers.
    #
    class LittleFanout
J
José Valim 已提交
145
      def initialize
146
        @listeners = []
J
José Valim 已提交
147 148
      end

149
      def publish(*args)
150
        @listeners.each { |l| l.publish(*args) }
J
José Valim 已提交
151 152
      end

153 154
      def subscribe(pattern=nil, &block)
        @listeners << Listener.new(pattern, &block)
J
José Valim 已提交
155 156
      end

157 158
      def wait
        sleep 0.05 until drained?
159
      end
160

161 162 163 164 165
      private
        def drained?
          @listeners.all? &:drained?
        end

166 167
      # Used for internal implementation only.
      class Listener #:nodoc:
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
        def initialize(pattern, &block)
          @pattern = pattern
          @subscriber = block
          @queue = Queue.new
          Thread.new { consume }
        end

        def publish(name, *args)
          if !@pattern || @pattern === name.to_s
            @queue << args.unshift(name)
          end
        end

        def consume
          while args = @queue.shift
            @subscriber.call(*args)
          end
        end

        def drained?
          @queue.size.zero?
        end
      end
J
José Valim 已提交
191 192
    end
  end
193

194
  Notifications.queue = Notifications::LittleFanout.new
J
José Valim 已提交
195
end