connection_pool.rb 14.1 KB
Newer Older
1
require 'thread'
2
require 'monitor'
3
require 'set'
4
require 'active_support/core_ext/module/deprecation'
5

N
Nick 已提交
6
module ActiveRecord
7 8 9 10 11
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

N
Nick 已提交
12
  module ConnectionAdapters
13
    # Connection pool base class for managing Active Record database
14 15
    # connections.
    #
P
Pratik Naik 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
31 32 33
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
34
    # 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and
35 36 37
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
38 39
    #    default behavior for Active Record when used in conjunction with
    #    Action Pack's request handling cycle.
40 41 42 43 44 45 46
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
47
    #
P
Pratik Naik 已提交
48 49 50 51 52
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
53 54 55 56 57 58
    # There are two connection-pooling-related options that you can add to
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
    # * +wait_timeout+: number of seconds to block and wait for a connection
    #   before giving up and raising a timeout error (default 5 seconds).
59
    class ConnectionPool
60 61
      include MonitorMixin

62
      attr_accessor :automatic_reconnect, :timeout
63
      attr_reader :spec, :connections
64

P
Pratik Naik 已提交
65 66 67 68 69 70
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
71
      def initialize(spec)
72 73
        super()

N
Nick 已提交
74
        @spec = spec
75

76 77
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
78

79
        @timeout = spec.config[:wait_timeout] || 5
80

81 82
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
83

84
        @connections         = []
85
        @automatic_reconnect = true
A
Aaron Patterson 已提交
86 87
      end

88 89 90 91 92 93
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
A
Aaron Patterson 已提交
94
        @reserved_connections[current_connection_id] ||= checkout
N
Nick 已提交
95 96
      end

97 98 99
      # Check to see if there is an active connection in this connection
      # pool.
      def active_connection?
100
        active_connections.any?
101 102
      end

103
      # Signal that the thread is finished with the current connection.
104
      # #release_connection releases the connection-thread association
105
      # and returns the connection to the pool.
106 107
      def release_connection(with_id = current_connection_id)
        conn = @reserved_connections.delete(with_id)
108 109 110
        checkin conn if conn
      end

111
      # If a connection already exists yield it to the block. If no connection
112
      # exists checkout a connection, yield it to the block, and checkin the
113
      # connection when finished.
114
      def with_connection
115
        connection_id = current_connection_id
116
        fresh_connection = true unless active_connection?
117
        yield connection
118
      ensure
119
        release_connection(connection_id) if fresh_connection
N
Nick 已提交
120 121
      end

122 123
      # Returns true if a connection has already been opened.
      def connected?
124
        synchronize { @connections.any? }
N
Nick 已提交
125 126
      end

P
Pratik Naik 已提交
127
      # Disconnects all connections in the pool, and clears the pool.
128
      def disconnect!
129 130 131 132 133 134 135
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect!
          end
          @connections = []
N
Nick 已提交
136 137 138
        end
      end

S
Sebastian Martinez 已提交
139
      # Clears the cache which maps classes.
N
Nick 已提交
140
      def clear_reloadable_connections!
141 142 143 144 145 146 147 148 149
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect! if conn.requires_reloading?
          end
          @connections.delete_if do |conn|
            conn.requires_reloading?
          end
150
        end
N
Nick 已提交
151 152
      end

153 154
      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
N
Nick 已提交
155
      def verify_active_connections! #:nodoc:
156 157 158 159
        synchronize do
          @connections.each do |connection|
            connection.verify!
          end
N
Nick 已提交
160 161 162
        end
      end

163 164 165 166
      # Return any checked-out connections back to the pool by threads that
      # are no longer alive.
      def clear_stale_cached_connections!
      end
167
      deprecate :clear_stale_cached_connections!
168

P
Pratik Naik 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
      # This is done by either returning an existing connection, or by creating
      # a new connection. If the maximum number of connections for this pool has
      # already been reached, but the pool is empty (i.e. they're all being used),
      # then this method will wait until a thread has checked in a connection.
      # The wait time is bounded however: if no connection can be checked out
      # within the timeout specified for this pool, then a ConnectionTimeoutError
      # exception will be raised.
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
      # - ConnectionTimeoutError: no connection can be obtained from the pool
      #   within the timeout period.
185
      def checkout
186
        # Checkout an available connection
187
        synchronize do
188
          # Try to find a connection that hasn't been leased
189
          conn = @connections.find { |c| c.lease }
190

191 192 193 194 195
          # If all connections were leased, and we have room to expand,
          # create a new connection and lease it.
          if !conn && @connections.size < @size
            conn = checkout_new_connection
            conn.lease
196
          end
197

198 199 200 201
          if conn
            checkout_and_verify conn
          else
            raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}. The max pool size is currently #{@size}; consider increasing it."
202
          end
203

204
        end
N
Nick 已提交
205 206
      end

P
Pratik Naik 已提交
207 208 209 210 211
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
212
      def checkin(conn)
213
        synchronize do
214
          conn.run_callbacks :checkin do
215
            conn.expire
216
          end
217
        end
N
Nick 已提交
218
      end
219

220 221 222 223 224
      # Remove a connection from the connection pool.  The connection will
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
        synchronize do
          @connections.delete conn
225 226 227 228 229 230 231

          # FIXME: we might want to store the key on the connection so that removing
          # from the reserved hash will be a little easier.
          thread_id = @reserved_connections.keys.find { |k|
            @reserved_connections[k] == conn
          }
          @reserved_connections.delete thread_id if thread_id
232 233 234
        end
      end

235
      private
236

237
      def new_connection
238
        ActiveRecord::Base.send(spec.adapter_method, spec.config)
239 240
      end

241
      def current_connection_id #:nodoc:
242
        ActiveRecord::Base.connection_id ||= Thread.current.object_id
243
      end
N
Nick 已提交
244

245
      def checkout_new_connection
246 247
        raise ConnectionNotEstablished unless @automatic_reconnect

248
        c = new_connection
249
        c.pool = self
250
        @connections << c
251
        c
252 253 254
      end

      def checkout_and_verify(c)
255 256 257
        c.run_callbacks :checkout do
          c.verify!
        end
258 259
        c
      end
260

261
      def active_connections
262 263
        @connections.find_all { |c| c.in_use? }
      end
264 265
    end

P
Pratik Naik 已提交
266
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
267
    # for keeping separate connection pools for Active Record models that connect
P
Pratik Naik 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
    # to different databases.
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
    #  |
    #  +-- Book
    #  |    |
    #  |    +-- ScaryBook
    #  |    +-- GoodBook
    #  +-- Author
    #  +-- BankAccount
    #
    # Suppose that Book is to connect to a separate database (i.e. one other
    # than the default database). Then Book, ScaryBook and GoodBook will all use
    # the same connection pool. Likewise, Author and BankAccount will use the
    # same connection pool. However, the connection pool used by Author/BankAccount
    # is not the same as the one used by Book/ScaryBook/GoodBook.
    #
    # Normally there is only a single ConnectionHandler instance, accessible via
287
    # ActiveRecord::Base.connection_handler. Active Record models use this to
P
Pratik Naik 已提交
288
    # determine that connection pool that they should use.
289
    class ConnectionHandler
290 291
      attr_reader :connection_pools

292 293
      def initialize(pools = {})
        @connection_pools = pools
294
        @class_to_pool    = {}
295
      end
296 297

      def establish_connection(name, spec)
298 299
        @connection_pools[spec] ||= ConnectionAdapters::ConnectionPool.new(spec)
        @class_to_pool[name] = @connection_pools[spec]
300 301
      end

302 303 304 305 306 307
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
        connection_pools.values.any? { |pool| pool.active_connection? }
      end

308 309 310
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
311
      def clear_active_connections!
312
        @connection_pools.each_value {|pool| pool.release_connection }
313 314
      end

S
Sebastian Martinez 已提交
315
      # Clears the cache which maps classes.
316 317 318 319 320
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
321
        @connection_pools.each_value {|pool| pool.disconnect! }
322 323 324 325
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
326
        @connection_pools.each_value {|pool| pool.verify_active_connections! }
327 328 329 330 331 332 333 334 335 336 337
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

338 339
      # Returns true if a connection that's accessible to this class has
      # already been opened.
340
      def connected?(klass)
341
        conn = retrieve_connection_pool(klass)
A
Aaron Patterson 已提交
342
        conn && conn.connected?
343 344 345 346 347 348 349
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
350
        pool = @class_to_pool.delete(klass.name)
351 352
        return nil unless pool

353
        @connection_pools.delete pool.spec
354
        pool.automatic_reconnect = false
355 356
        pool.disconnect!
        pool.spec.config
357 358
      end

359
      def retrieve_connection_pool(klass)
360
        pool = @class_to_pool[klass.name]
361
        return pool if pool
362
        return nil if ActiveRecord::Model == klass
J
Jon Leighton 已提交
363
        retrieve_connection_pool klass.active_record_super
364
      end
365
    end
366 367

    class ConnectionManagement
368 369 370 371 372 373 374 375
      class Proxy # :nodoc:
        attr_reader :body, :testing

        def initialize(body, testing = false)
          @body    = body
          @testing = testing
        end

376 377 378 379 380 381 382 383
        def method_missing(method_sym, *arguments, &block)
          @body.send(method_sym, *arguments, &block)
        end

        def respond_to?(method_sym, include_private = false)
          super || @body.respond_to?(method_sym)
        end

384 385 386 387 388 389 390 391 392 393 394 395 396
        def each(&block)
          body.each(&block)
        end

        def close
          body.close if body.respond_to?(:close)

          # Don't return connection (and perform implicit rollback) if
          # this request is a part of integration test
          ActiveRecord::Base.clear_active_connections! unless testing
        end
      end

397 398 399 400 401
      def initialize(app)
        @app = app
      end

      def call(env)
402 403
        testing = env.key?('rack.test')

404 405
        status, headers, body = @app.call(env)

406
        [status, headers, Proxy.new(body, testing)]
407
      rescue
408
        ActiveRecord::Base.clear_active_connections! unless testing
409
        raise
410 411
      end
    end
N
Nick 已提交
412
  end
413
end