diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index aeaaa48945531b314c08349de7308a0ddd788017..1d550c0b397033b128a84d980e34f0ff7ce1e255 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -129,17 +129,15 @@ def clear # - ConnectionTimeoutError if +timeout+ is given and no element # becomes available within +timeout+ seconds, def poll(timeout = nil) - synchronize do - if timeout - no_wait_poll || wait_poll(timeout) - else - no_wait_poll - end - end + synchronize { internal_poll(timeout) } end private + def internal_poll(timeout) + no_wait_poll || (timeout && wait_poll(timeout)) + end + def synchronize(&block) @lock.synchronize(&block) end @@ -193,6 +191,20 @@ def wait_poll(timeout) end end + # Connections must be leased while holding the main pool mutex. This is + # an internal subclass that also +.leases+ returned connections while + # still in queue's critical section (queue synchronizes with the same + # +@lock+ as the main pool) so that a returned connection is already + # leased and there is no need to re-enter synchronized block. + class ConnectionLeasingQueue < Queue # :nodoc: + private + def internal_poll(timeout) + conn = super + conn.lease if conn + conn + end + end + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. # A reaper instantiated with a nil frequency will never reap the # connection pool. @@ -247,7 +259,12 @@ def initialize(spec) @connections = [] @automatic_reconnect = true - @available = Queue.new self + # Connection pool allows for concurrent (outside the main `synchronize` section) + # establishment of new connections. This variable tracks the number of threads + # currently in the process of independently establishing connections to the DB. + @now_connecting = 0 + + @available = ConnectionLeasingQueue.new self end # Retrieve the connection associated with the current thread, or call @@ -340,8 +357,7 @@ def clear_reloadable_connections! # Raises: # - ConnectionTimeoutError: no connection can be obtained from the pool. def checkout - conn = synchronize { acquire_connection.tap(&:lease) } - checkout_and_verify(conn) + checkout_and_verify(acquire_connection) end # Check-in a database connection back into the pool, indicating that you @@ -366,6 +382,8 @@ def checkin(conn) # Remove a connection from the connection pool. The connection will # remain open and active but will no longer be managed by this pool. def remove(conn) + needs_new_connection = false + synchronize do @connections.delete conn @available.delete conn @@ -381,7 +399,17 @@ def remove(conn) # that are "stuck" there are helpless, they have no way of creating # new connections and are completely reliant on us feeding available # connections into the Queue. - @available.add checkout_new_connection if @available.any_waiting? + needs_new_connection = @available.any_waiting? + end + + # This is intentionally done outside of the synchronized section as we + # would like not to hold the main mutex while checking out new connections, + # thus there is some chance that needs_new_connection information is now + # stale, we can live with that (try_to_checkout_new_connection will make + # sure not to exceed the pool's @size limit). + if needs_new_connection && new_conn = try_to_checkout_new_connection + # make the new_conn available to the starving threads stuck @available Queue + checkin new_conn end end @@ -409,14 +437,19 @@ def reap private def synchronized_connection_retrieval - conn = synchronize do - # re-checking under lock for correct DCL semantics - @reserved_connections[current_connection_id] ||= acquire_connection.tap(&:lease) - end - begin - checkout_and_verify(conn) - rescue Exception # clean up if something goes wrong in post_checkout - synchronize { @reserved_connections.delete_pair(current_connection_id, conn) } + conn = checkout + previous_value = nil + synchronize do # re-checking under lock for correct DCL semantics + # Cache#put_if_absent returns either `nil` (if insertion was successful, ie there was + # no previous current_connection_id mapping) or an existing value (if insertion + # failed because there already was a current_connection_id mapping) + previous_value = @reserved_connections.put_if_absent(current_connection_id, conn) + end + if previous_value # if we were too late and insertion failed + checkin(conn) + previous_value + else + conn end end @@ -427,11 +460,19 @@ def synchronized_connection_retrieval # # Raises: # - ConnectionTimeoutError if a connection could not be acquired + # + #-- + # Implementation detail: the connection returned by +acquire_connection+ + # will already be "+connection.lease+ -ed" to the current thread. def acquire_connection - if conn = @available.poll + # NOTE: we rely on `@available.poll` and `try_to_checkout_new_connection` to + # `conn.lease` the returned connection (and to do this in a `synchronized` + # section), this is not the cleanest implementation, as ideally we would + # `synchronize { conn.lease }` in this method, but by leaving it to `@available.poll` + # and `try_to_checkout_new_connection` we can piggyback on `synchronize` sections + # of the said methods and avoid an additional `synchronize` overhead. + if conn = @available.poll || try_to_checkout_new_connection conn - elsif @connections.size < @size - checkout_new_connection else reap @available.poll(@checkout_timeout) @@ -456,13 +497,46 @@ def current_connection_id #:nodoc: Base.connection_id ||= Thread.current.object_id end + # If the pool is not at a +@size+ limit, establish new connection. Connecting + # to the DB is done outside main synchronized section. + #-- + # Implementation constraint: a newly established connection returned by this + # method must be in the +.leased+ state. + def try_to_checkout_new_connection + # first in synchronized section check if establishing new conns is allowed + # and increment @now_connecting, to prevent overstepping this pool's @size + # constraint + do_checkout = synchronize do + if (@connections.size + @now_connecting) < @size + @now_connecting += 1 + end + end + if do_checkout + begin + # if successfully incremented @now_connecting establish new connection + # outside of synchronized section + conn = checkout_new_connection + ensure + synchronize do + if conn + adopt_connection(conn) + # returned conn needs to be already leased + conn.lease + end + @now_connecting -= 1 + end + end + end + end + + def adopt_connection(conn) + conn.pool = self + @connections << conn + end + def checkout_new_connection raise ConnectionNotEstablished unless @automatic_reconnect - - c = new_connection - c.pool = self - @connections << c - c + new_connection end def checkout_and_verify(c) diff --git a/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb b/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb index fd5f183ab0e3711a6cce0a8fed4cef82e47fac20..580568c8ac3da57c792ba45d942657f0716880c4 100644 --- a/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb +++ b/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb @@ -6,7 +6,7 @@ class AdapterLeasingTest < ActiveRecord::TestCase class Pool < ConnectionPool def insert_connection_for_test!(c) synchronize do - @connections << c + adopt_connection(c) @available.add c end end diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index f5928814a31cd52ca937bffa8786d96edf34fbba..3e563cd7cff14b18db99fbb6faa39c4d119506ac 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -356,6 +356,40 @@ def test_pool_sets_connection_schema_cache pool.checkin connection end + + def test_concurrent_connection_establishment + all_threads_in_new_connection = ActiveSupport::Concurrency::Latch.new(@pool.size) + all_go = ActiveSupport::Concurrency::Latch.new + + @pool.singleton_class.class_eval do + define_method(:new_connection) do + all_threads_in_new_connection.release + all_go.await + super() + end + end + + connecting_threads = [] + @pool.size.times do + connecting_threads << Thread.new { @pool.checkout } + end + + begin + Timeout.timeout(5) do + # the kernel of the whole test is here, everything else is just scaffolding, + # this latch will not be released unless conn. pool allows for concurrent + # connection creation + all_threads_in_new_connection.await + end + rescue Timeout::Error + flunk 'pool unable to establish connections concurrently or implementation has ' << + 'changed, this test then needs to patch a different :new_connection method' + ensure + # clean up the threads + all_go.release + connecting_threads.map(&:join) + end + end end end end