connection_pool.rb 20.8 KB
Newer Older
1
require 'thread'
2
require 'monitor'
3
require 'set'
4
require 'active_support/core_ext/module/deprecation'
5

N
Nick 已提交
6
module ActiveRecord
7 8 9 10 11
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

12 13 14 15
  # Raised when a connection pool is full and another connection is requested
  class PoolFullError < ConnectionNotEstablished
  end

N
Nick 已提交
16
  module ConnectionAdapters
17
    # Connection pool base class for managing Active Record database
18 19
    # connections.
    #
P
Pratik Naik 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # database connections. The basic idea is that each thread checks out a
    # database connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
35 36 37
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
38
    # 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and
39 40 41
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
42 43
    #    default behavior for Active Record when used in conjunction with
    #    Action Pack's request handling cycle.
44 45 46 47 48 49 50
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
51
    #
P
Pratik Naik 已提交
52 53 54 55 56
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
57
    # There are several connection-pooling-related options that you can add to
58 59 60
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
61
    # * +checkout_timeout+: number of seconds to block and wait for a connection
62
    #   before giving up and raising a timeout error (default 5 seconds).
63 64 65
    # * +reaping_frequency+: frequency in seconds to periodically run the
    #   Reaper, which attempts to find and close dead connections, which can
    #   occur if a programmer forgets to close a connection at the end of a
66
    #   thread or a thread dies unexpectedly. (Default nil, which means don't
67
    #   run the Reaper).
68 69
    # * +dead_connection_timeout+: number of seconds from last checkout
    #   after which the Reaper will consider a connection reapable. (default
70
    #   5 seconds).
71
    class ConnectionPool
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
      # Threadsafe, fair, FIFO queue.  Meant to be used by ConnectionPool
      # with which it shares a Monitor.  But could be a generic Queue.
      #
      # The Queue in stdlib's 'thread' could replace this class except
      # stdlib's doesn't support waiting with a timeout.
      class Queue
        def initialize(lock = Monitor.new)
          @lock = lock
          @cond = @lock.new_cond
          @num_waiting = 0
          @queue = []
        end

        # Test if any threads are currently waiting on the queue.
        def any_waiting?
          synchronize do
            @num_waiting > 0
          end
        end

        # Return the number of threads currently waiting on this
        # queue.
        def num_waiting
          synchronize do
            @num_waiting
          end
        end

        # Add +element+ to the queue.  Never blocks.
        def add(element)
          synchronize do
            @queue.push element
            @cond.signal
          end
        end

        # If +element+ is in the queue, remove and return it, or nil.
        def delete(element)
          synchronize do
            @queue.delete(element)
          end
        end

        # Remove all elements from the queue.
        def clear
          synchronize do
            @queue.clear
          end
        end

        # Remove the head of the queue.
        #
        # If +timeout+ is not given, remove and return the head the
        # queue if the number of available elements is strictly
        # greater than the number of threads currently waiting (that
        # is, don't jump ahead in line).  Otherwise, return nil.
        #
        # If +timeout+ is given, block if it there is no element
        # available, waiting up to +timeout+ seconds for an element to
        # become available.
        #
        # Raises:
        # - ConnectionTimeoutError if +timeout+ is given and no element
        # becomes available after +timeout+ seconds,
        def poll(timeout = nil)
          synchronize do
            if timeout
              no_wait_poll || wait_poll(timeout)
            else
              no_wait_poll
            end
          end
        end

        private

        def synchronize(&block)
          @lock.synchronize(&block)
        end

        # Test if the queue currently contains any elements.
        def any?
          !@queue.empty?
        end

        # A thread can remove an element from the queue without
        # waiting if an only if the number of currently available
        # connections is strictly greater than the number of waiting
        # threads.
        def can_remove_no_wait?
          @queue.size > @num_waiting
        end

        # Removes and returns the head of the queue if possible, or nil.
        def remove
          @queue.shift
        end

        # Remove and return the head the queue if the number of
        # available elements is strictly greater than the number of
        # threads currently waiting.  Otherwise, return nil.
        def no_wait_poll
          remove if can_remove_no_wait?
        end

        # Waits on the queue up to +timeout+ seconds, then removes and
        # returns the head of the queue.
        def wait_poll(timeout)
          @num_waiting += 1

          t0 = Time.now
          elapsed = 0
          loop do
            @cond.wait(timeout - elapsed)

            return remove if any?

            elapsed = Time.now - t0
            raise ConnectionTimeoutError if elapsed >= timeout
          end
        ensure
          @num_waiting -= 1
        end
      end

197 198 199
      # Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
      # A reaper instantiated with a nil frequency will never reap the
      # connection pool.
200 201
      #
      # Configure the frequency by setting "reaping_frequency" in your
202
      # database yaml file.
203 204 205 206 207 208 209 210
      class Reaper
        attr_reader :pool, :frequency

        def initialize(pool, frequency)
          @pool      = pool
          @frequency = frequency
        end

211
        def run
212 213 214 215 216 217 218 219 220 221
          return unless frequency
          Thread.new(frequency, pool) { |t, p|
            while true
              sleep t
              p.reap
            end
          }
        end
      end

222 223
      include MonitorMixin

224
      attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout
225
      attr_reader :spec, :connections, :size, :reaper
226

P
Pratik Naik 已提交
227 228 229 230 231 232
      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
N
Nick 已提交
233
      def initialize(spec)
234 235
        super()

N
Nick 已提交
236
        @spec = spec
237

238 239
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
240

241 242
        @checkout_timeout = spec.config[:checkout_timeout] || 5
        @dead_connection_timeout = spec.config[:dead_connection_timeout]
243
        @reaper  = Reaper.new self, spec.config[:reaping_frequency]
244
        @reaper.run
245

246 247
        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
248

249
        @connections         = []
250
        @automatic_reconnect = true
251 252 253 254 255 256 257 258 259 260

        @available = Queue.new self
      end

      # Hack for tests to be able to add connections.  Do not call outside of tests
      def insert_connection_for_test!(c) #:nodoc:
        synchronize do
          @connections << c
          @available.add c
        end
A
Aaron Patterson 已提交
261 262
      end

263 264 265 266 267 268
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
269 270 271
        synchronize do
          @reserved_connections[current_connection_id] ||= checkout
        end
N
Nick 已提交
272 273
      end

274
      # Is there an open connection that is being used for the current thread?
275
      def active_connection?
276 277 278 279 280
        synchronize do
          @reserved_connections.fetch(current_connection_id) {
            return false
          }.in_use?
        end
281 282
      end

283
      # Signal that the thread is finished with the current connection.
284
      # #release_connection releases the connection-thread association
285
      # and returns the connection to the pool.
286
      def release_connection(with_id = current_connection_id)
287 288 289 290
        synchronize do
          conn = @reserved_connections.delete(with_id)
          checkin conn if conn
        end
291 292
      end

293
      # If a connection already exists yield it to the block. If no connection
294
      # exists checkout a connection, yield it to the block, and checkin the
295
      # connection when finished.
296
      def with_connection
297
        connection_id = current_connection_id
298
        fresh_connection = true unless active_connection?
299
        yield connection
300
      ensure
301
        release_connection(connection_id) if fresh_connection
N
Nick 已提交
302 303
      end

304 305
      # Returns true if a connection has already been opened.
      def connected?
306
        synchronize { @connections.any? }
N
Nick 已提交
307 308
      end

P
Pratik Naik 已提交
309
      # Disconnects all connections in the pool, and clears the pool.
310
      def disconnect!
311 312 313 314 315 316 317
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect!
          end
          @connections = []
318
          @available.clear
N
Nick 已提交
319 320 321
        end
      end

S
Sebastian Martinez 已提交
322
      # Clears the cache which maps classes.
N
Nick 已提交
323
      def clear_reloadable_connections!
324 325 326 327 328 329 330 331 332
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect! if conn.requires_reloading?
          end
          @connections.delete_if do |conn|
            conn.requires_reloading?
          end
333 334 335 336
          @available.clear
          @connections.each do |conn|
            @available.add conn
          end
337
        end
N
Nick 已提交
338 339
      end

340
      def clear_stale_cached_connections! # :nodoc:
341
        reap
342
      end
343
      deprecate :clear_stale_cached_connections! => "Please use #reap instead"
344

P
Pratik Naik 已提交
345 346 347
      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
348 349 350 351 352 353
      # This is done by either returning and leasing existing connection, or by
      # creating a new connection and leasing it.
      #
      # If all connections are leased and the pool is at capacity (meaning the
      # number of currently leased connections is greater than or equal to the
      # size limit set), an ActiveRecord::PoolFullError exception will be raised.
P
Pratik Naik 已提交
354 355 356 357
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
358
      # - PoolFullError: no connection can be obtained from the pool.
359
      def checkout
360 361 362 363
        synchronize do
          conn = acquire_connection
          conn.lease
          checkout_and_verify(conn)
364
        end
N
Nick 已提交
365 366
      end

P
Pratik Naik 已提交
367 368 369 370 371
      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
372
      def checkin(conn)
373
        synchronize do
374
          conn.run_callbacks :checkin do
375
            conn.expire
376
          end
377 378

          release conn
379 380

          @available.add conn
381
        end
N
Nick 已提交
382
      end
383

384 385 386 387 388
      # Remove a connection from the connection pool.  The connection will
      # remain open and active but will no longer be managed by this pool.
      def remove(conn)
        synchronize do
          @connections.delete conn
389
          @available.delete conn
390 391 392

          # FIXME: we might want to store the key on the connection so that removing
          # from the reserved hash will be a little easier.
393
          release conn
394 395

          @available.add checkout_new_connection if @available.any_waiting?
396 397 398
        end
      end

399 400 401 402 403
      # Removes dead connections from the pool.  A dead connection can occur
      # if a programmer forgets to close a connection at the end of a thread
      # or a thread dies unexpectedly.
      def reap
        synchronize do
404
          stale = Time.now - @dead_connection_timeout
405
          connections.dup.each do |conn|
406
            remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
407 408 409 410
          end
        end
      end

411
      private
412

413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
      # Acquire a connection by one of 1) immediately removing one
      # from the queue of available connections, 2) creating a new
      # connection if the pool is not at capacity, 3) waiting on the
      # queue for a connection to become available.
      #
      # Raises:
      # - PoolFullError if a connection could not be acquired (FIXME:
      #   why not ConnectionTimeoutError?
      def acquire_connection
        if conn = @available.poll
          conn
        elsif @connections.size < @size
          checkout_new_connection
        else
          t0 = Time.now
          begin
            @available.poll(@checkout_timeout)
          rescue ConnectionTimeoutError
            msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
              [@checkout_timeout, Time.now - t0]
            raise PoolFullError, msg
          end
        end
      end

438
      def release(conn)
M
Mark Rushakoff 已提交
439 440
        thread_id = if @reserved_connections[current_connection_id] == conn
          current_connection_id
441
        else
M
Mark Rushakoff 已提交
442
          @reserved_connections.keys.find { |k|
443 444 445 446 447 448 449
            @reserved_connections[k] == conn
          }
        end

        @reserved_connections.delete thread_id if thread_id
      end

450
      def new_connection
J
Jon Leighton 已提交
451
        ActiveRecord::Model.send(spec.adapter_method, spec.config)
452 453
      end

454
      def current_connection_id #:nodoc:
J
Jon Leighton 已提交
455
        ActiveRecord::Model.connection_id ||= Thread.current.object_id
456
      end
N
Nick 已提交
457

458
      def checkout_new_connection
459 460
        raise ConnectionNotEstablished unless @automatic_reconnect

461
        c = new_connection
462
        c.pool = self
463
        @connections << c
464
        c
465 466 467
      end

      def checkout_and_verify(c)
468 469 470
        c.run_callbacks :checkout do
          c.verify!
        end
471 472 473 474
        c
      end
    end

P
Pratik Naik 已提交
475
    # ConnectionHandler is a collection of ConnectionPool objects. It is used
476
    # for keeping separate connection pools for Active Record models that connect
P
Pratik Naik 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495
    # to different databases.
    #
    # For example, suppose that you have 5 models, with the following hierarchy:
    #
    #  |
    #  +-- Book
    #  |    |
    #  |    +-- ScaryBook
    #  |    +-- GoodBook
    #  +-- Author
    #  +-- BankAccount
    #
    # Suppose that Book is to connect to a separate database (i.e. one other
    # than the default database). Then Book, ScaryBook and GoodBook will all use
    # the same connection pool. Likewise, Author and BankAccount will use the
    # same connection pool. However, the connection pool used by Author/BankAccount
    # is not the same as the one used by Book/ScaryBook/GoodBook.
    #
    # Normally there is only a single ConnectionHandler instance, accessible via
496
    # ActiveRecord::Base.connection_handler. Active Record models use this to
A
Andreas Loupasakis 已提交
497
    # determine the connection pool that they should use.
498
    class ConnectionHandler
J
Jon Leighton 已提交
499
      def initialize
500
        @owner_to_pool = Hash.new { |h,k| h[k] = {} }
J
Jon Leighton 已提交
501
        @class_to_pool = Hash.new { |h,k| h[k] = {} }
502 503 504
      end

      def connection_pools
505
        owner_to_pool.values.compact
506
      end
507

508 509 510
      def establish_connection(owner, spec)
        @class_to_pool.clear
        owner_to_pool[owner] = ConnectionAdapters::ConnectionPool.new(spec)
511 512
      end

513 514 515
      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
J
Jon Leighton 已提交
516
        connection_pools.any?(&:active_connection?)
517 518
      end

519 520 521
      # Returns any connections in use by the current thread back to the pool,
      # and also returns connections to the pool cached by threads that are no
      # longer alive.
522
      def clear_active_connections!
J
Jon Leighton 已提交
523
        connection_pools.each(&:release_connection)
524 525
      end

S
Sebastian Martinez 已提交
526
      # Clears the cache which maps classes.
527
      def clear_reloadable_connections!
J
Jon Leighton 已提交
528
        connection_pools.each(&:clear_reloadable_connections!)
529 530 531
      end

      def clear_all_connections!
J
Jon Leighton 已提交
532
        connection_pools.each(&:disconnect!)
533 534 535 536 537 538 539 540 541 542 543
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

544 545
      # Returns true if a connection that's accessible to this class has
      # already been opened.
546
      def connected?(klass)
547
        conn = retrieve_connection_pool(klass)
A
Aaron Patterson 已提交
548
        conn && conn.connected?
549 550 551 552 553 554
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
555 556 557
      def remove_connection(owner)
        if pool = owner_to_pool.delete(owner)
          @class_to_pool.clear
J
Jon Leighton 已提交
558 559 560 561
          pool.automatic_reconnect = false
          pool.disconnect!
          pool.spec.config
        end
562 563
      end

564 565 566 567 568 569 570 571 572
      # Retrieving the connection pool happens a lot so we cache it in @class_to_pool.
      # This makes retrieving the connection pool O(1) once the process is warm.
      # When a connection is established or removed, we invalidate the cache.
      #
      # Ideally we would use #fetch here, as class_to_pool[klass] may sometimes be nil.
      # However, benchmarking (https://gist.github.com/3552829) showed that #fetch is
      # significantly slower than #[]. So in the nil case, no caching will take place,
      # but that's ok since the nil case is not the common one that we wish to optimise
      # for.
573
      def retrieve_connection_pool(klass)
574 575 576 577 578
        class_to_pool[klass] ||= begin
          until pool = pool_for(klass)
            klass = klass.superclass
            break unless klass < Model::Tag
          end
579

580
          class_to_pool[klass] = pool || pool_for(ActiveRecord::Model)
J
Jon Leighton 已提交
581
        end
582
      end
583 584 585

      private

586 587 588 589
      def owner_to_pool
        @owner_to_pool[Process.pid]
      end

590
      def class_to_pool
A
Aaron Patterson 已提交
591
        @class_to_pool[Process.pid]
592 593
      end

594 595 596
      def pool_for(owner)
        owner_to_pool.fetch(owner) {
          if ancestor_pool = pool_from_any_process_for(owner)
J
Jon Leighton 已提交
597 598 599
            # A connection was established in an ancestor process that must have
            # subsequently forked. We can't reuse the connection, but we can copy
            # the specification and establish a new connection with it.
600
            establish_connection owner, ancestor_pool.spec
601
          else
602
            owner_to_pool[owner] = nil
603 604 605
          end
        }
      end
J
Jon Leighton 已提交
606

607 608 609
      def pool_from_any_process_for(owner)
        owner_to_pool = @owner_to_pool.values.find { |v| v[owner] }
        owner_to_pool && owner_to_pool[owner]
J
Jon Leighton 已提交
610
      end
611
    end
612 613 614 615 616 617 618

    class ConnectionManagement
      def initialize(app)
        @app = app
      end

      def call(env)
619 620
        testing = env.key?('rack.test')

621 622 623 624
        response = @app.call(env)
        response[2] = ::Rack::BodyProxy.new(response[2]) do
          ActiveRecord::Base.clear_active_connections! unless testing
        end
625

626
        response
627
      rescue
628
        ActiveRecord::Base.clear_active_connections! unless testing
629
        raise
630 631
      end
    end
N
Nick 已提交
632
  end
633
end