connection_pool.rb 12.3 KB
Newer Older
1
require 'monitor'
2 3
require 'set'

N
Nick 已提交
4
module ActiveRecord
5 6 7 8 9
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

N
Nick 已提交
10
  module ConnectionAdapters
11 12 13 14 15 16
    # Connection pool base class for managing ActiveRecord database
    # connections.
    #
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
17 18 19 20 21 22
    # 1. Simply use ActiveRecord::Base.connection as with ActiveRecord 2.1 and
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
    #    default behavior for ActiveRecord when used in conjunction with
    #    ActionPack's request handling cycle.
23 24 25 26 27 28 29
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
30
    class ConnectionPool
31
      # Factory method for connection pools.
32 33 34 35 36 37
      # Determines pool type to use based on contents of connection
      # specification. Additional options for connection specification:
      #
      # * +pool+: number indicating size of fixed connection pool to use
      # * +wait_timeout+ (optional): number of seconds to block and wait
      #   for a connection before giving up and raising a timeout error.
38
      def self.create(spec)
39
        if spec.config[:pool] && spec.config[:pool].to_i > 0
40 41 42
          FixedSizeConnectionPool.new(spec)
        elsif spec.config[:jndi] # JRuby appserver datasource pool; passthrough
          NewConnectionEveryTime.new(spec)
43
        else
44
          CachedConnectionPerThread.new(spec)
45
        end
46
      end
N
Nick 已提交
47

48 49
      delegate :verification_timeout, :to => "::ActiveRecord::Base"
      attr_reader :spec
50

N
Nick 已提交
51 52
      def initialize(spec)
        @spec = spec
53 54
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
55 56
        # The mutex used to synchronize pool access
        @connection_mutex = Monitor.new
N
Nick 已提交
57 58
      end

59 60 61 62 63 64
      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
65 66 67 68
        if conn = @reserved_connections[active_connection_name]
          conn.verify!(verification_timeout)
          conn
        else
69
          @reserved_connections[active_connection_name] = checkout
70
        end
N
Nick 已提交
71 72
      end

73
      # Signal that the thread is finished with the current connection.
74
      # #release_connection releases the connection-thread association
75
      # and returns the connection to the pool.
76
      def release_connection
77
        conn = @reserved_connections.delete(active_connection_name)
78 79 80 81 82 83 84 85 86 87
        checkin conn if conn
      end

      # Reserve a connection, and yield it to a block. Ensure the connection is
      # checked back in when finished.
      def with_connection
        conn = checkout
        yield conn
      ensure
        checkin conn
N
Nick 已提交
88 89
      end

90 91 92
      # Returns true if a connection has already been opened.
      def connected?
        !connections.empty?
N
Nick 已提交
93 94
      end

95 96 97
      # Disconnect all connections in the pool.
      def disconnect!
        @reserved_connections.each do |name,conn|
98
          checkin conn
99 100
        end
        connections.each do |conn|
N
Nick 已提交
101 102
          conn.disconnect!
        end
103
        @reserved_connections = {}
N
Nick 已提交
104 105
      end

106
      # Clears the cache which maps classes
N
Nick 已提交
107
      def clear_reloadable_connections!
108
        @reserved_connections.each do |name, conn|
109
          checkin conn
110 111 112
        end
        @reserved_connections = {}
        connections.each do |conn|
N
Nick 已提交
113 114
          if conn.requires_reloading?
            conn.disconnect!
115
            remove_connection conn
N
Nick 已提交
116 117 118 119
          end
        end
      end

120 121
      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
N
Nick 已提交
122
      def verify_active_connections! #:nodoc:
123
        remove_stale_cached_threads!(@reserved_connections) do |name, conn|
124
          checkin conn
N
Nick 已提交
125
        end
126
        connections.each do |connection|
127
          connection.verify!(verification_timeout)
N
Nick 已提交
128 129 130
        end
      end

131 132 133 134
      # Check-out a database connection from the pool.
      def checkout
        raise NotImplementedError, "checkout is an abstract method"
      end
N
Nick 已提交
135

136 137 138
      # Check-in a database connection back into the pool.
      def checkin(connection)
        raise NotImplementedError, "checkin is an abstract method"
N
Nick 已提交
139 140
      end

141
      def remove_connection(conn) #:nodoc:
142
        raise NotImplementedError, "remove_connection is an abstract method"
N
Nick 已提交
143
      end
144
      private :remove_connection
N
Nick 已提交
145

146
      def connections #:nodoc:
147
        raise NotImplementedError, "connections is an abstract method"
N
Nick 已提交
148
      end
149 150
      private :connections

151
      synchronize :connection, :release_connection,
152 153 154 155
        :clear_reloadable_connections!, :verify_active_connections!,
        :connected?, :disconnect!, :with => :@connection_mutex

      private
156 157 158 159 160
      def new_connection
        config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
        ActiveRecord::Base.send(spec.adapter_method, config)
      end

161 162 163
      def active_connection_name #:nodoc:
        Thread.current.object_id
      end
N
Nick 已提交
164

165 166 167 168 169 170 171 172 173 174 175
      # Remove stale threads from the cache.
      def remove_stale_cached_threads!(cache, &block)
        keys = Set.new(cache.keys)

        Thread.list.each do |thread|
          keys.delete(thread.object_id) if thread.alive?
        end
        keys.each do |key|
          next unless cache.has_key?(key)
          block.call(key, cache[key])
          cache.delete(key)
N
Nick 已提交
176 177
        end
      end
178 179
    end

180 181 182
    # NewConnectionEveryTime is a simple implementation: always
    # create/disconnect on checkout/checkin.
    class NewConnectionEveryTime < ConnectionPool
183 184 185
      def active_connection
        @reserved_connections[active_connection_name]
      end
N
Nick 已提交
186

187
      def active_connections; @reserved_connections; end
188

189
      def checkout
190 191
        new_connection
      end
N
Nick 已提交
192

193
      def checkin(conn)
194 195
        conn.disconnect!
      end
N
Nick 已提交
196

197 198 199 200 201 202 203 204
      private
      def connections
        @reserved_connections.values
      end

      def remove_connection(conn)
        @reserved_connections.delete_if {|k,v| v == conn}
      end
N
Nick 已提交
205
    end
206

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    # CachedConnectionPerThread is a compatible pseudo-connection pool that
    # caches connections per-thread. In order to hold onto threads in the same
    # manner as ActiveRecord 2.1 and earlier, it only disconnects the
    # connection when the connection is checked in, or when calling
    # ActiveRecord::Base.clear_all_connections!, and not during
    # #release_connection.
    class CachedConnectionPerThread < NewConnectionEveryTime
      def release_connection
        # no-op; keep the connection
      end
    end

    # FixedSizeConnectionPool provides a full, fixed-size connection pool with
    # timed waits when the pool is exhausted.
    class FixedSizeConnectionPool < ConnectionPool
222 223 224 225 226 227 228 229 230 231 232 233 234
      def initialize(spec)
        super
        # default 5 second timeout
        @timeout = spec.config[:wait_timeout] || 5
        @size = spec.config[:pool].to_i
        @queue = @connection_mutex.new_cond
        @connections = []
        @checked_out = []
      end

      def checkout
        # Checkout an available connection
        conn = @connection_mutex.synchronize do
235
          if @checked_out.size < @connections.size
236
            checkout_existing_connection
237 238
          elsif @connections.size < @size
            checkout_new_connection
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
          end
        end
        return conn if conn

        # No connections available; wait for one
        @connection_mutex.synchronize do
          if @queue.wait(@timeout)
            checkout_existing_connection
          else
            raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion"
          end
        end
      end

      def checkin(conn)
        @connection_mutex.synchronize do
255
          @checked_out.delete conn
256 257 258 259 260 261 262 263 264 265 266 267 268
          @queue.signal
        end
      end

      private
      def checkout_new_connection
        c = new_connection
        @connections << c
        @checked_out << c
        c
      end

      def checkout_existing_connection
269
        c = (@connections - @checked_out).first
270 271 272 273 274 275 276 277 278 279 280 281 282
        @checked_out << c
        c
      end

      def connections
        @connections
      end

      def remove_connection(conn)
        @connections.delete conn
      end
    end

283 284 285 286
    module ConnectionHandlerMethods
      def initialize(pools = {})
        @connection_pools = pools
      end
287

288 289
      def connection_pools
        @connection_pools ||= {}
290 291 292
      end

      def establish_connection(name, spec)
293
        @connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec)
294 295
      end

296 297
      # for internal use only and for testing;
      # only works with ConnectionPerThread pool class
298 299 300 301 302 303 304 305 306 307
      def active_connections #:nodoc:
        @connection_pools.inject({}) do |hash,kv|
          hash[kv.first] = kv.last.active_connection
          hash.delete(kv.first) unless hash[kv.first]
          hash
        end
      end

      # Clears the cache which maps classes to connections.
      def clear_active_connections!
308
        @connection_pools.each_value {|pool| pool.release_connection }
309 310 311 312 313 314 315 316
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
317
        @connection_pools.each_value {|pool| pool.disconnect! }
318 319 320 321
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
322
        @connection_pools.each_value {|pool| pool.verify_active_connections! }
323 324 325 326 327 328 329 330 331 332 333
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

334 335
      # Returns true if a connection that's accessible to this class has
      # already been opened.
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
      def connected?(klass)
        retrieve_connection_pool(klass).connected?
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
        pool = @connection_pools[klass.name]
        @connection_pools.delete_if { |key, value| value == pool }
        pool.disconnect! if pool
        pool.spec.config if pool
      end

351 352 353 354 355 356
      def retrieve_connection_pool(klass)
        loop do
          pool = @connection_pools[klass.name]
          return pool if pool
          return nil if ActiveRecord::Base == klass
          klass = klass.superclass
357
        end
358
      end
359
    end
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384

    # This connection handler is not thread-safe, as it does not protect access
    # to the underlying connection pools.
    class SingleThreadConnectionHandler
      include ConnectionHandlerMethods
    end

    # This connection handler is thread-safe. Each access or modification of a thread
    # pool is synchronized by an internal monitor.
    class MultipleThreadConnectionHandler
      attr_reader :connection_pools_lock
      include ConnectionHandlerMethods

      def initialize(pools = {})
        super
        @connection_pools_lock = Monitor.new
      end

      # Apply monitor to all public methods that access the pool.
      synchronize :establish_connection, :retrieve_connection,
        :connected?, :remove_connection, :active_connections,
        :clear_active_connections!, :clear_reloadable_connections!,
        :clear_all_connections!, :verify_active_connections!,
        :with => :connection_pools_lock
    end
N
Nick 已提交
385 386
  end
end