connection_pool.rb 12.5 KB
Newer Older
1
require 'monitor'
2 3
require 'set'

N
Nick 已提交
4
module ActiveRecord
5 6 7 8 9
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

N
Nick 已提交
10
  module ConnectionAdapters
11 12 13
    # Connection pool base class for managing ActiveRecord database
    # connections.
    #
P
Pratik Naik 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
29 30 31
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
32 33 34 35 36 37
    # 1. Simply use ActiveRecord::Base.connection as with ActiveRecord 2.1 and
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
    #    default behavior for ActiveRecord when used in conjunction with
    #    ActionPack's request handling cycle.
38 39 40 41 42 43 44
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
45
    #
P
Pratik Naik 已提交
46 47 48 49 50
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
51 52 53 54 55 56
    # There are two connection-pooling-related options that you can add to
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
    # * +wait_timeout+: number of seconds to block and wait for a connection
    #   before giving up and raising a timeout error (default 5 seconds).
57
    class ConnectionPool
58
      attr_reader :spec
59

P
Pratik Naik 已提交
60 61 62 63 64 65
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
66 67
      def initialize(spec)
        @spec = spec
68 69
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
70 71
        # The mutex used to synchronize pool access
        @connection_mutex = Monitor.new
72 73 74 75 76 77 78
        @queue = @connection_mutex.new_cond
        # default 5 second timeout
        @timeout = spec.config[:wait_timeout] || 5
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
        @connections = []
        @checked_out = []
N
Nick 已提交
79 80
      end

81 82 83 84 85 86
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
87
        if conn = @reserved_connections[current_connection_id]
88 89
          conn
        else
90
          @reserved_connections[current_connection_id] = checkout
91
        end
N
Nick 已提交
92 93
      end

94
      # Signal that the thread is finished with the current connection.
95
      # #release_connection releases the connection-thread association
96
      # and returns the connection to the pool.
97
      def release_connection
98
        conn = @reserved_connections.delete(current_connection_id)
99 100 101 102 103 104 105 106 107 108
        checkin conn if conn
      end

      # Reserve a connection, and yield it to a block. Ensure the connection is
      # checked back in when finished.
      def with_connection
        conn = checkout
        yield conn
      ensure
        checkin conn
N
Nick 已提交
109 110
      end

111 112
      # Returns true if a connection has already been opened.
      def connected?
113
        !@connections.empty?
N
Nick 已提交
114 115
      end

P
Pratik Naik 已提交
116
      # Disconnects all connections in the pool, and clears the pool.
117 118
      def disconnect!
        @reserved_connections.each do |name,conn|
119
          checkin conn
120
        end
121 122
        @reserved_connections = {}
        @connections.each do |conn|
N
Nick 已提交
123 124
          conn.disconnect!
        end
125
        @connections = []
N
Nick 已提交
126 127
      end

128
      # Clears the cache which maps classes
N
Nick 已提交
129
      def clear_reloadable_connections!
130
        @reserved_connections.each do |name, conn|
131
          checkin conn
132 133
        end
        @reserved_connections = {}
134 135
        @connections.each do |conn|
          conn.disconnect! if conn.requires_reloading?
N
Nick 已提交
136
        end
137
        @connections = []
N
Nick 已提交
138 139
      end

140 141
      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
N
Nick 已提交
142
      def verify_active_connections! #:nodoc:
143
        clear_stale_cached_connections!
144
        @connections.each do |connection|
145
          connection.verify!
N
Nick 已提交
146 147 148
        end
      end

149 150 151 152 153 154 155 156
      # Return any checked-out connections back to the pool by threads that
      # are no longer alive.
      def clear_stale_cached_connections!
        remove_stale_cached_threads!(@reserved_connections) do |name, conn|
          checkin conn
        end
      end

P
Pratik Naik 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
      # This is done by either returning an existing connection, or by creating
      # a new connection. If the maximum number of connections for this pool has
      # already been reached, but the pool is empty (i.e. they're all being used),
      # then this method will wait until a thread has checked in a connection.
      # The wait time is bounded however: if no connection can be checked out
      # within the timeout specified for this pool, then a ConnectionTimeoutError
      # exception will be raised.
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
      # - ConnectionTimeoutError: no connection can be obtained from the pool
      #   within the timeout period.
173
      def checkout
174 175
        # Checkout an available connection
        @connection_mutex.synchronize do
176 177 178 179 180 181 182 183 184 185 186
          loop do
            conn = if @checked_out.size < @connections.size
                     checkout_existing_connection
                   elsif @connections.size < @size
                     checkout_new_connection
                   end
            return conn if conn
            # No connections available; wait for one
            if @queue.wait(@timeout)
              next
            else
187 188 189 190 191
              # try looting dead threads
              clear_stale_cached_connections!
              if @size == @checked_out.size
                raise ConnectionTimeoutError, "could not obtain a database connection within #{@timeout} seconds.  The pool size is currently #{@size}, perhaps you need to increase it?"
              end
192
            end
193 194
          end
        end
N
Nick 已提交
195 196
      end

P
Pratik Naik 已提交
197 198 199 200 201
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
202 203 204 205 206 207
      def checkin(conn)
        @connection_mutex.synchronize do
          conn.run_callbacks :checkin
          @checked_out.delete conn
          @queue.signal
        end
N
Nick 已提交
208
      end
209

210
      synchronize :clear_reloadable_connections!, :verify_active_connections!,
211 212 213
        :connected?, :disconnect!, :with => :@connection_mutex

      private
214
      def new_connection
215
        ActiveRecord::Base.send(spec.adapter_method, spec.config)
216 217
      end

218
      def current_connection_id #:nodoc:
219 220
        Thread.current.object_id
      end
N
Nick 已提交
221

222 223 224 225 226 227 228 229 230 231 232
      # Remove stale threads from the cache.
      def remove_stale_cached_threads!(cache, &block)
        keys = Set.new(cache.keys)

        Thread.list.each do |thread|
          keys.delete(thread.object_id) if thread.alive?
        end
        keys.each do |key|
          next unless cache.has_key?(key)
          block.call(key, cache[key])
          cache.delete(key)
N
Nick 已提交
233 234
        end
      end
235 236 237 238

      def checkout_new_connection
        c = new_connection
        @connections << c
239
        checkout_and_verify(c)
240 241 242
      end

      def checkout_existing_connection
243
        c = (@connections - @checked_out).first
244 245 246 247
        checkout_and_verify(c)
      end

      def checkout_and_verify(c)
248
        c.verify!
249
        c.run_callbacks :checkout
250 251 252 253 254
        @checked_out << c
        c
      end
    end

P
Pratik Naik 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
    # for keeping separate connection pools for ActiveRecord models that connect
    # to different databases.
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
    #  |
    #  +-- Book
    #  |    |
    #  |    +-- ScaryBook
    #  |    +-- GoodBook
    #  +-- Author
    #  +-- BankAccount
    #
    # Suppose that Book is to connect to a separate database (i.e. one other
    # than the default database). Then Book, ScaryBook and GoodBook will all use
    # the same connection pool. Likewise, Author and BankAccount will use the
    # same connection pool. However, the connection pool used by Author/BankAccount
    # is not the same as the one used by Book/ScaryBook/GoodBook.
    #
    # Normally there is only a single ConnectionHandler instance, accessible via
    # ActiveRecord::Base.connection_handler. ActiveRecord models use this to
    # determine that connection pool that they should use.
278
    class ConnectionHandler
279 280 281
      def initialize(pools = {})
        @connection_pools = pools
      end
282

283 284
      def connection_pools
        @connection_pools ||= {}
285 286 287
      end

      def establish_connection(name, spec)
288
        @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
289 290
      end

291 292 293
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
294
      def clear_active_connections!
295
        @connection_pools.each_value {|pool| pool.release_connection }
296 297 298 299 300 301 302 303
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
304
        @connection_pools.each_value {|pool| pool.disconnect! }
305 306 307 308
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
309
        @connection_pools.each_value {|pool| pool.verify_active_connections! }
310 311 312 313 314 315 316 317 318 319 320
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

321 322
      # Returns true if a connection that's accessible to this class has
      # already been opened.
323
      def connected?(klass)
324 325
        conn = retrieve_connection_pool(klass)
        conn ? conn.connected? : false
326 327 328 329 330 331 332 333 334 335 336 337 338
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
        pool = @connection_pools[klass.name]
        @connection_pools.delete_if { |key, value| value == pool }
        pool.disconnect! if pool
        pool.spec.config if pool
      end

339
      def retrieve_connection_pool(klass)
340 341 342 343
        pool = @connection_pools[klass.name]
        return pool if pool
        return nil if ActiveRecord::Base == klass
        retrieve_connection_pool klass.superclass
344
      end
345
    end
N
Nick 已提交
346
  end
347
end