提交 e92f5a99 编写于 作者: T thedarkone

AR::ConPool - establish connections outside of critical section.

上级 a3923e66
......@@ -129,17 +129,15 @@ def clear
# - ConnectionTimeoutError if +timeout+ is given and no element
# becomes available within +timeout+ seconds,
def poll(timeout = nil)
synchronize do
if timeout
no_wait_poll || wait_poll(timeout)
else
no_wait_poll
end
end
synchronize { internal_poll(timeout) }
end
private
def internal_poll(timeout)
no_wait_poll || (timeout && wait_poll(timeout))
end
def synchronize(&block)
@lock.synchronize(&block)
end
......@@ -193,6 +191,20 @@ def wait_poll(timeout)
end
end
# Connections must be leased while holding the main pool mutex. This is
# an internal subclass that also +.leases+ returned connections while
# still in queue's critical section (queue synchronizes with the same
# +@lock+ as the main pool) so that a returned connection is already
# leased and there is no need to re-enter synchronized block.
class ConnectionLeasingQueue < Queue # :nodoc:
private
def internal_poll(timeout)
conn = super
conn.lease if conn
conn
end
end
# Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
# A reaper instantiated with a nil frequency will never reap the
# connection pool.
......@@ -247,7 +259,12 @@ def initialize(spec)
@connections = []
@automatic_reconnect = true
@available = Queue.new self
# Connection pool allows for concurrent (outside the main `synchronize` section)
# establishment of new connections. This variable tracks the number of threads
# currently in the process of independently establishing connections to the DB.
@now_connecting = 0
@available = ConnectionLeasingQueue.new self
end
# Retrieve the connection associated with the current thread, or call
......@@ -340,8 +357,7 @@ def clear_reloadable_connections!
# Raises:
# - ConnectionTimeoutError: no connection can be obtained from the pool.
def checkout
conn = synchronize { acquire_connection.tap(&:lease) }
checkout_and_verify(conn)
checkout_and_verify(acquire_connection)
end
# Check-in a database connection back into the pool, indicating that you
......@@ -366,6 +382,8 @@ def checkin(conn)
# Remove a connection from the connection pool. The connection will
# remain open and active but will no longer be managed by this pool.
def remove(conn)
needs_new_connection = false
synchronize do
@connections.delete conn
@available.delete conn
......@@ -381,7 +399,17 @@ def remove(conn)
# that are "stuck" there are helpless, they have no way of creating
# new connections and are completely reliant on us feeding available
# connections into the Queue.
@available.add checkout_new_connection if @available.any_waiting?
needs_new_connection = @available.any_waiting?
end
# This is intentionally done outside of the synchronized section as we
# would like not to hold the main mutex while checking out new connections,
# thus there is some chance that needs_new_connection information is now
# stale, we can live with that (try_to_checkout_new_connection will make
# sure not to exceed the pool's @size limit).
if needs_new_connection && new_conn = try_to_checkout_new_connection
# make the new_conn available to the starving threads stuck @available Queue
checkin new_conn
end
end
......@@ -409,14 +437,19 @@ def reap
private
def synchronized_connection_retrieval
conn = synchronize do
# re-checking under lock for correct DCL semantics
@reserved_connections[current_connection_id] ||= acquire_connection.tap(&:lease)
end
begin
checkout_and_verify(conn)
rescue Exception # clean up if something goes wrong in post_checkout
synchronize { @reserved_connections.delete_pair(current_connection_id, conn) }
conn = checkout
previous_value = nil
synchronize do # re-checking under lock for correct DCL semantics
# Cache#put_if_absent returns either `nil` (if insertion was successful, ie there was
# no previous current_connection_id mapping) or an existing value (if insertion
# failed because there already was a current_connection_id mapping)
previous_value = @reserved_connections.put_if_absent(current_connection_id, conn)
end
if previous_value # if we were too late and insertion failed
checkin(conn)
previous_value
else
conn
end
end
......@@ -427,11 +460,19 @@ def synchronized_connection_retrieval
#
# Raises:
# - ConnectionTimeoutError if a connection could not be acquired
#
#--
# Implementation detail: the connection returned by +acquire_connection+
# will already be "+connection.lease+ -ed" to the current thread.
def acquire_connection
if conn = @available.poll
# NOTE: we rely on `@available.poll` and `try_to_checkout_new_connection` to
# `conn.lease` the returned connection (and to do this in a `synchronized`
# section), this is not the cleanest implementation, as ideally we would
# `synchronize { conn.lease }` in this method, but by leaving it to `@available.poll`
# and `try_to_checkout_new_connection` we can piggyback on `synchronize` sections
# of the said methods and avoid an additional `synchronize` overhead.
if conn = @available.poll || try_to_checkout_new_connection
conn
elsif @connections.size < @size
checkout_new_connection
else
reap
@available.poll(@checkout_timeout)
......@@ -456,13 +497,46 @@ def current_connection_id #:nodoc:
Base.connection_id ||= Thread.current.object_id
end
# If the pool is not at a +@size+ limit, establish new connection. Connecting
# to the DB is done outside main synchronized section.
#--
# Implementation constraint: a newly established connection returned by this
# method must be in the +.leased+ state.
def try_to_checkout_new_connection
# first in synchronized section check if establishing new conns is allowed
# and increment @now_connecting, to prevent overstepping this pool's @size
# constraint
do_checkout = synchronize do
if (@connections.size + @now_connecting) < @size
@now_connecting += 1
end
end
if do_checkout
begin
# if successfully incremented @now_connecting establish new connection
# outside of synchronized section
conn = checkout_new_connection
ensure
synchronize do
if conn
adopt_connection(conn)
# returned conn needs to be already leased
conn.lease
end
@now_connecting -= 1
end
end
end
end
def adopt_connection(conn)
conn.pool = self
@connections << conn
end
def checkout_new_connection
raise ConnectionNotEstablished unless @automatic_reconnect
c = new_connection
c.pool = self
@connections << c
c
new_connection
end
def checkout_and_verify(c)
......
......@@ -6,7 +6,7 @@ class AdapterLeasingTest < ActiveRecord::TestCase
class Pool < ConnectionPool
def insert_connection_for_test!(c)
synchronize do
@connections << c
adopt_connection(c)
@available.add c
end
end
......
......@@ -356,6 +356,40 @@ def test_pool_sets_connection_schema_cache
pool.checkin connection
end
def test_concurrent_connection_establishment
all_threads_in_new_connection = ActiveSupport::Concurrency::Latch.new(@pool.size)
all_go = ActiveSupport::Concurrency::Latch.new
@pool.singleton_class.class_eval do
define_method(:new_connection) do
all_threads_in_new_connection.release
all_go.await
super()
end
end
connecting_threads = []
@pool.size.times do
connecting_threads << Thread.new { @pool.checkout }
end
begin
Timeout.timeout(5) do
# the kernel of the whole test is here, everything else is just scaffolding,
# this latch will not be released unless conn. pool allows for concurrent
# connection creation
all_threads_in_new_connection.await
end
rescue Timeout::Error
flunk 'pool unable to establish connections concurrently or implementation has ' <<
'changed, this test then needs to patch a different :new_connection method'
ensure
# clean up the threads
all_go.release
connecting_threads.map(&:join)
end
end
end
end
end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册