提交 8e5e02bd 编写于 作者: N Nick Sieger

Collapse connection pool class hierarchy; YAGNI.

- Add connection checkin and checkout callbacks to adapter to allow
  adapter-specific customization of behavior (e.g., JRuby w/ JNDI)
上级 d07a6b1a
...@@ -27,22 +27,14 @@ module ConnectionAdapters ...@@ -27,22 +27,14 @@ module ConnectionAdapters
# 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
# obtains a connection, yields it as the sole argument to the block, # obtains a connection, yields it as the sole argument to the block,
# and returns it to the pool after the block completes. # and returns it to the pool after the block completes.
#
# There are two connection-pooling-related options that you can add to
# your database connection configuration:
#
# * +pool+: number indicating size of connection pool (default 5)
# * +wait_timeout+: number of seconds to block and wait for a connection
# before giving up and raising a timeout error (default 5 seconds).
class ConnectionPool class ConnectionPool
# Factory method for connection pools.
# Determines pool type to use based on contents of connection
# specification. Additional options for connection specification:
#
# * +pool+: number indicating size of fixed connection pool to use
# * +wait_timeout+ (optional): number of seconds to block and wait
# for a connection before giving up and raising a timeout error.
def self.create(spec)
if spec.config[:jndi] # JRuby appserver datasource pool; passthrough
NewConnectionEveryTime.new(spec)
else
FixedSizeConnectionPool.new(spec)
end
end
delegate :verification_timeout, :to => "::ActiveRecord::Base" delegate :verification_timeout, :to => "::ActiveRecord::Base"
attr_reader :spec attr_reader :spec
...@@ -52,6 +44,13 @@ def initialize(spec) ...@@ -52,6 +44,13 @@ def initialize(spec)
@reserved_connections = {} @reserved_connections = {}
# The mutex used to synchronize pool access # The mutex used to synchronize pool access
@connection_mutex = Monitor.new @connection_mutex = Monitor.new
@queue = @connection_mutex.new_cond
# default 5 second timeout
@timeout = spec.config[:wait_timeout] || 5
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
@connections = []
@checked_out = []
end end
# Retrieve the connection associated with the current thread, or call # Retrieve the connection associated with the current thread, or call
...@@ -87,7 +86,7 @@ def with_connection ...@@ -87,7 +86,7 @@ def with_connection
# Returns true if a connection has already been opened. # Returns true if a connection has already been opened.
def connected? def connected?
!connections.empty? !@connections.empty?
end end
# Disconnect all connections in the pool. # Disconnect all connections in the pool.
...@@ -95,10 +94,11 @@ def disconnect! ...@@ -95,10 +94,11 @@ def disconnect!
@reserved_connections.each do |name,conn| @reserved_connections.each do |name,conn|
checkin conn checkin conn
end end
connections.each do |conn| @reserved_connections = {}
@connections.each do |conn|
conn.disconnect! conn.disconnect!
end end
@reserved_connections = {} @connections = []
end end
# Clears the cache which maps classes # Clears the cache which maps classes
...@@ -107,19 +107,17 @@ def clear_reloadable_connections! ...@@ -107,19 +107,17 @@ def clear_reloadable_connections!
checkin conn checkin conn
end end
@reserved_connections = {} @reserved_connections = {}
connections.each do |conn| @connections.each do |conn|
if conn.requires_reloading? conn.disconnect! if conn.requires_reloading?
conn.disconnect!
remove_connection conn
end
end end
@connections = []
end end
# Verify active connections and remove and disconnect connections # Verify active connections and remove and disconnect connections
# associated with stale threads. # associated with stale threads.
def verify_active_connections! #:nodoc: def verify_active_connections! #:nodoc:
clear_stale_cached_connections! clear_stale_cached_connections!
connections.each do |connection| @connections.each do |connection|
connection.verify!(verification_timeout) connection.verify!(verification_timeout)
end end
end end
...@@ -134,23 +132,34 @@ def clear_stale_cached_connections! ...@@ -134,23 +132,34 @@ def clear_stale_cached_connections!
# Check-out a database connection from the pool. # Check-out a database connection from the pool.
def checkout def checkout
raise NotImplementedError, "checkout is an abstract method" # Checkout an available connection
end conn = @connection_mutex.synchronize do
if @checked_out.size < @connections.size
# Check-in a database connection back into the pool. checkout_existing_connection
def checkin(connection) elsif @connections.size < @size
raise NotImplementedError, "checkin is an abstract method" checkout_new_connection
end end
end
return conn if conn
def remove_connection(conn) #:nodoc: # No connections available; wait for one
raise NotImplementedError, "remove_connection is an abstract method" @connection_mutex.synchronize do
if @queue.wait(@timeout)
checkout_existing_connection
else
raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion"
end
end
end end
private :remove_connection
def connections #:nodoc: # Check-in a database connection back into the pool.
raise NotImplementedError, "connections is an abstract method" def checkin(conn)
@connection_mutex.synchronize do
conn.run_callbacks :checkin
@checked_out.delete conn
@queue.signal
end
end end
private :connections
synchronize :connection, :release_connection, synchronize :connection, :release_connection,
:clear_reloadable_connections!, :verify_active_connections!, :clear_reloadable_connections!, :verify_active_connections!,
...@@ -179,72 +188,7 @@ def remove_stale_cached_threads!(cache, &block) ...@@ -179,72 +188,7 @@ def remove_stale_cached_threads!(cache, &block)
cache.delete(key) cache.delete(key)
end end
end end
end
# NewConnectionEveryTime is a simple implementation: always
# create/disconnect on checkout/checkin.
class NewConnectionEveryTime < ConnectionPool
def checkout
new_connection
end
def checkin(conn)
conn.disconnect!
end
private
def connections
@reserved_connections.values
end
def remove_connection(conn)
@reserved_connections.delete_if {|k,v| v == conn}
end
end
# FixedSizeConnectionPool provides a full, fixed-size connection pool with
# timed waits when the pool is exhausted.
class FixedSizeConnectionPool < ConnectionPool
def initialize(spec)
super
# default 5 second timeout
@timeout = spec.config[:wait_timeout] || 5
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
@queue = @connection_mutex.new_cond
@connections = []
@checked_out = []
end
def checkout
# Checkout an available connection
conn = @connection_mutex.synchronize do
if @checked_out.size < @connections.size
checkout_existing_connection
elsif @connections.size < @size
checkout_new_connection
end
end
return conn if conn
# No connections available; wait for one
@connection_mutex.synchronize do
if @queue.wait(@timeout)
checkout_existing_connection
else
raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion"
end
end
end
def checkin(conn)
@connection_mutex.synchronize do
@checked_out.delete conn
@queue.signal
end
end
private
def checkout_new_connection def checkout_new_connection
c = new_connection c = new_connection
@connections << c @connections << c
...@@ -257,19 +201,11 @@ def checkout_existing_connection ...@@ -257,19 +201,11 @@ def checkout_existing_connection
end end
def checkout_and_verify(c) def checkout_and_verify(c)
c.reset! c.run_callbacks :checkout
c.verify!(verification_timeout) c.verify!(verification_timeout)
@checked_out << c @checked_out << c
c c
end end
def connections
@connections
end
def remove_connection(conn)
@connections.delete conn
end
end end
class ConnectionHandler class ConnectionHandler
...@@ -285,7 +221,7 @@ def connection_pools ...@@ -285,7 +221,7 @@ def connection_pools
end end
def establish_connection(name, spec) def establish_connection(name, spec)
@connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec) @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
end end
# Returns any connections in use by the current thread back to the pool, # Returns any connections in use by the current thread back to the pool,
......
...@@ -25,6 +25,9 @@ module ConnectionAdapters # :nodoc: ...@@ -25,6 +25,9 @@ module ConnectionAdapters # :nodoc:
class AbstractAdapter class AbstractAdapter
include Quoting, DatabaseStatements, SchemaStatements include Quoting, DatabaseStatements, SchemaStatements
include QueryCache include QueryCache
include ActiveSupport::Callbacks
define_callbacks :checkout, :checkin
checkout :reset!
@@row_even = true @@row_even = true
def initialize(connection, logger = nil) #:nodoc: def initialize(connection, logger = nil) #:nodoc:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册