connection_pool_test.rb 17.5 KB
Newer Older
1
require "cases/helper"
2
require 'concurrent/atomic/count_down_latch'
3 4 5 6

module ActiveRecord
  module ConnectionAdapters
    class ConnectionPoolTest < ActiveRecord::TestCase
7 8
      attr_reader :pool

9
      def setup
10 11
        super

12 13
        # Keep a duplicate pool so we do not bother others
        @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
14 15 16 17 18 19 20 21 22 23

        if in_memory_db?
          # Separate connections to an in-memory database create an entirely new database,
          # with an empty schema etc, so we just stub out this schema on the fly.
          @pool.with_connection do |connection|
            connection.create_table :posts do |t|
              t.integer :cololumn
            end
          end
        end
24 25
      end

G
Guo Xiang Tan 已提交
26
      teardown do
27
        @pool.disconnect!
28 29
      end

30 31 32 33
      def active_connections(pool)
        pool.connections.find_all(&:in_use?)
      end

34 35 36 37 38 39 40 41 42 43
      def test_checkout_after_close
        connection = pool.connection
        assert connection.in_use?

        connection.close
        assert !connection.in_use?

        assert pool.connection.in_use?
      end

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
      def test_released_connection_moves_between_threads
        thread_conn = nil

        Thread.new {
          pool.with_connection do |conn|
            thread_conn = conn
          end
        }.join

        assert thread_conn

        Thread.new {
          pool.with_connection do |conn|
            assert_equal thread_conn, conn
          end
        }.join
      end

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
      def test_with_connection
        assert_equal 0, active_connections(pool).size

        main_thread = pool.connection
        assert_equal 1, active_connections(pool).size

        Thread.new {
          pool.with_connection do |conn|
            assert conn
            assert_equal 2, active_connections(pool).size
          end
          assert_equal 1, active_connections(pool).size
        }.join

        main_thread.close
        assert_equal 0, active_connections(pool).size
      end

      def test_active_connection_in_use
        assert !pool.active_connection?
        main_thread = pool.connection

        assert pool.active_connection?

        main_thread.close

        assert !pool.active_connection?
      end

91
      def test_full_pool_exception
92
        @pool.size.times { @pool.checkout }
93
        assert_raises(ConnectionTimeoutError) do
94
          @pool.checkout
95 96 97
        end
      end

98 99 100 101 102
      def test_full_pool_blocks
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
103
        Thread.pass until @pool.num_waiting_in_queue == 1
104 105 106 107 108 109 110 111 112 113 114

        connection = cs.first
        connection.close
        assert_equal connection, t.join.value
      end

      def test_removing_releases_latch
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
115
        Thread.pass until @pool.num_waiting_in_queue == 1
116 117 118 119

        connection = cs.first
        @pool.remove connection
        assert_respond_to t.join.value, :execute
120
        connection.close
121 122
      end

123
      def test_reap_and_active
124 125 126 127 128 129 130 131
        @pool.checkout
        @pool.checkout
        @pool.checkout

        connections = @pool.connections.dup

        @pool.reap

132 133 134 135
        assert_equal connections.length, @pool.connections.length
      end

      def test_reap_inactive
136
        ready = Concurrent::CountDownLatch.new
137
        @pool.checkout
138 139 140
        child = Thread.new do
          @pool.checkout
          @pool.checkout
141
          ready.count_down
142
          Thread.stop
143
        end
144
        ready.wait
145 146

        assert_equal 3, active_connections(@pool).size
147

148 149
        child.terminate
        child.join
150 151
        @pool.reap

152
        assert_equal 1, active_connections(@pool).size
153
      ensure
154
        @pool.connections.each { |conn| conn.close if conn.in_use? }
155 156
      end

157 158 159 160 161 162 163 164 165 166 167 168
      def test_remove_connection
        conn = @pool.checkout
        assert conn.in_use?

        length = @pool.connections.length
        @pool.remove conn
        assert conn.in_use?
        assert_equal(length - 1, @pool.connections.length)
      ensure
        conn.close
      end

169 170 171 172 173
      def test_remove_connection_for_thread
        conn = @pool.connection
        @pool.remove conn
        assert_not_equal(conn, @pool.connection)
      ensure
174
        conn.close if conn
175 176
      end

177 178 179 180 181 182 183 184
      def test_active_connection?
        assert !@pool.active_connection?
        assert @pool.connection
        assert @pool.active_connection?
        @pool.release_connection
        assert !@pool.active_connection?
      end

185 186 187 188 189 190
      def test_checkout_behaviour
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        connection = pool.connection
        assert_not_nil connection
        threads = []
        4.times do |i|
V
Vipul A M 已提交
191
          threads << Thread.new(i) do
192 193
            connection = pool.connection
            assert_not_nil connection
194
            connection.close
195 196
          end
        end
A
Aaron Patterson 已提交
197

198
        threads.each(&:join)
A
Aaron Patterson 已提交
199

200
        Thread.new do
201
          assert pool.connection
202 203
          pool.connection.close
        end.join
204
      end
205

206
      # The connection pool is "fair" if threads waiting for
207
      # connections receive them in the order in which they began
208 209 210 211 212
      # waiting.  This ensures that we don't timeout one HTTP request
      # even while well under capacity in a multi-threaded environment
      # such as a Java servlet container.
      #
      # We don't need strict fairness: if two connections become
213
      # available at the same time, it's fine if two threads that were
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
      # waiting acquire the connections out of order.
      #
      # Thus this test prepares waiting threads and then trickles in
      # available connections slowly, ensuring the wakeup order is
      # correct in this case.
      def test_checkout_fairness
        @pool.instance_variable_set(:@size, 10)
        expected = (1..@pool.size).to_a.freeze
        # check out all connections so our threads start out waiting
        conns = expected.map { @pool.checkout }
        mutex = Mutex.new
        order = []
        errors = []

        threads = expected.map do |i|
          t = Thread.new {
            begin
231
              @pool.checkout # never checked back in
232 233 234 235 236
              mutex.synchronize { order << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
237
          Thread.pass until @pool.num_waiting_in_queue == i
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
          t
        end

        # this should wake up the waiting threads one by one in order
        conns.each { |conn| @pool.checkin(conn); sleep 0.1 }

        threads.each(&:join)

        raise errors.first if errors.any?

        assert_equal(expected, order)
      end

      # As mentioned in #test_checkout_fairness, we don't care about
      # strict fairness.  This test creates two groups of threads:
      # group1 whose members all start waiting before any thread in
      # group2.  Enough connections are checked in to wakeup all
      # group1 threads, and the fact that only group1 and no group2
      # threads acquired a connection is enforced.
      def test_checkout_fairness_by_group
        @pool.instance_variable_set(:@size, 10)
        # take all the connections
        conns = (1..10).map { @pool.checkout }
        mutex = Mutex.new
        successes = []    # threads that successfully got a connection
        errors = []

        make_thread = proc do |i|
          t = Thread.new {
            begin
268
              @pool.checkout # never checked back in
269 270 271 272 273
              mutex.synchronize { successes << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
274
          Thread.pass until @pool.num_waiting_in_queue == i
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
          t
        end

        # all group1 threads start waiting before any in group2
        group1 = (1..5).map(&make_thread)
        group2 = (6..10).map(&make_thread)

        # checkin n connections back to the pool
        checkin = proc do |n|
          n.times do
            c = conns.pop
            @pool.checkin(c)
          end
        end

        checkin.call(group1.size)         # should wake up all group1

        loop do
          sleep 0.1
          break if mutex.synchronize { (successes.size + errors.size) == group1.size }
        end

        winners = mutex.synchronize { successes.dup }
        checkin.call(group2.size)         # should wake up everyone remaining

        group1.each(&:join)
        group2.each(&:join)

        assert_equal((1..group1.size).to_a, winners.sort)

        if errors.any?
          raise errors.first
        end
      end

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
      def test_automatic_reconnect=
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        assert pool.automatic_reconnect
        assert pool.connection

        pool.disconnect!
        assert pool.connection

        pool.disconnect!
        pool.automatic_reconnect = false

        assert_raises(ConnectionNotEstablished) do
          pool.connection
        end

        assert_raises(ConnectionNotEstablished) do
          pool.with_connection
        end
      end
329 330 331 332

      def test_pool_sets_connection_visitor
        assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql)
      end
D
David 已提交
333

334
      # make sure exceptions are thrown when establish_connection
A
Angelo Capilleri 已提交
335
      # is called with an anonymous class
A
Arthur Neves 已提交
336 337 338 339 340 341 342
      def test_anonymous_class_exception
        anonymous = Class.new(ActiveRecord::Base)

        assert_raises(RuntimeError) do
          anonymous.establish_connection
        end
      end
343

344 345 346 347 348 349 350 351 352 353 354 355
      def test_connection_notification_is_called
        payloads = []
        subscription = ActiveSupport::Notifications.subscribe('!connection.active_record') do |name, started, finished, unique_id, payload|
          payloads << payload
        end
        ActiveRecord::Base.establish_connection :arunit
        assert_equal [:class_name, :config, :connection_id], payloads[0].keys.sort
        assert_equal 'primary', payloads[0][:class_name]
      ensure
        ActiveSupport::Notifications.unsubscribe(subscription) if subscription
      end

356 357 358 359 360 361 362 363 364 365 366 367 368 369
      def test_pool_sets_connection_schema_cache
        connection = pool.checkout
        schema_cache = SchemaCache.new connection
        schema_cache.add(:posts)
        pool.schema_cache = schema_cache

        pool.with_connection do |conn|
          assert_not_same pool.schema_cache, conn.schema_cache
          assert_equal pool.schema_cache.size, conn.schema_cache.size
          assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts)
        end

        pool.checkin connection
      end
370 371

      def test_concurrent_connection_establishment
372 373
        assert_operator @pool.connections.size, :<=, 1

374 375
        all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
        all_go                        = Concurrent::CountDownLatch.new
376 377 378

        @pool.singleton_class.class_eval do
          define_method(:new_connection) do
379 380
            all_threads_in_new_connection.count_down
            all_go.wait
381 382 383 384 385 386 387 388 389 390 391 392 393 394
            super()
          end
        end

        connecting_threads = []
        @pool.size.times do
          connecting_threads << Thread.new { @pool.checkout }
        end

        begin
          Timeout.timeout(5) do
            # the kernel of the whole test is here, everything else is just scaffolding,
            # this latch will not be released unless conn. pool allows for concurrent
            # connection creation
395
            all_threads_in_new_connection.wait
396 397 398 399 400 401
          end
        rescue Timeout::Error
          flunk 'pool unable to establish connections concurrently or implementation has ' <<
                'changed, this test then needs to patch a different :new_connection method'
        ensure
          # clean up the threads
402
          all_go.count_down
403 404 405
          connecting_threads.map(&:join)
        end
      end
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446

      def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect, :clear_reloadable_connections].each do |group_action_method|
          @pool.with_connection do |connection|
            assert_raises(ExclusiveConnectionTimeoutError) do
              Thread.new { @pool.send(group_action_method) }.join
            end
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
        [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
          begin
            thread = timed_join_result = nil
            @pool.with_connection do |connection|
              thread = Thread.new { @pool.send(group_action_method) }

              # give the other `thread` some time to get stuck in `group_action_method`
              timed_join_result = thread.join(0.3)
              # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
              # release our connection
              assert_nil timed_join_result

              # assert that since this is within default timeout our connection hasn't been forcefully taken away from us
              assert @pool.active_connection?
            end
          ensure
            thread.join if thread && !timed_join_result # clean up the other thread
          end
        end
      end

      def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_aquire_all_connections_proceed_anyway
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
          @pool.with_connection do |connection|
            Thread.new { @pool.send(group_action_method) }.join
            # assert connection has been forcefully taken away from us
            assert_not @pool.active_connection?
447 448 449

            # make a new connection for with_connection to clean up
            @pool.connection
450 451 452 453 454 455 456 457
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
        with_single_connection_pool do |pool|
          [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
            conn               = pool.connection # drain the only available connection
458
            second_thread_done = Concurrent::CountDownLatch.new
459 460 461

            # create a first_thread and let it get into the FIFO queue first
            first_thread = Thread.new do
462
              pool.with_connection { second_thread_done.wait }
463 464 465 466 467 468 469 470 471 472
            end

            # wait for first_thread to get in queue
            Thread.pass until pool.num_waiting_in_queue == 1

            # create a different, later thread, that will attempt to do a "group action",
            # but because of the group action semantics it should be able to preempt the
            # first_thread when a connection is made available
            second_thread = Thread.new do
              pool.send(group_action_method)
473
              second_thread_done.count_down
474 475 476 477 478 479 480 481 482 483 484 485 486 487
            end

            # wait for second_thread to get in queue
            Thread.pass until pool.num_waiting_in_queue == 2

            # return the only available connection
            pool.checkin(conn)

            # if the second_thread is not able to preempt the first_thread,
            # they will temporarily (until either of them timeouts with ConnectionTimeoutError)
            # deadlock and a join(2) timeout will be reached
            failed = true unless second_thread.join(2)

            #--- post test clean up start
488
            second_thread_done.count_down if failed
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536

            # after `pool.disconnect()` the first thread will be left stuck in queue, no need to wait for
            # it to timeout with ConnectionTimeoutError
            if (group_action_method == :disconnect || group_action_method == :disconnect!) && pool.num_waiting_in_queue > 0
              pool.with_connection {} # create a new connection in case there are threads still stuck in a queue
            end

            first_thread.join
            second_thread.join
            #--- post test clean up end

            flunk "#{group_action_method} is not able to preempt other waiting threads" if failed
          end
        end
      end

      def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
        with_single_connection_pool do |pool|
          conn = pool.connection # drain the only available connection
          def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
            true
          end

          stuck_thread = Thread.new do
            pool.with_connection {}
          end

          # wait for stuck_thread to get in queue
          Thread.pass until pool.num_waiting_in_queue == 1

          pool.clear_reloadable_connections

          unless stuck_thread.join(2)
            flunk 'clear_reloadable_connections must not let other connection waiting threads get stuck in queue'
          end

          assert_equal 0, pool.num_waiting_in_queue
        end
      end

      private
      def with_single_connection_pool
        one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
        one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
        yield(pool = ConnectionPool.new(one_conn_spec))
      ensure
        pool.disconnect! if pool
      end
537 538 539
    end
  end
end