提交 40ce686b 编写于 作者: M Matthew Draper

Merge pull request #20928 from matthewd/unload-interlock

We need stricter locking before we can unload
......@@ -9,7 +9,7 @@ module Concurrency
#--
# Note that a pending Exclusive lock attempt does not block incoming
# Share requests (i.e., we are "read-preferring"). That seems
# consistent with the behavior of +loose_upgrades+, but may be the
# consistent with the behavior of "loose" upgrades, but may be the
# wrong choice otherwise: it nominally reduces the possibility of
# deadlock by risking starvation instead.
class ShareLock
......@@ -20,47 +20,48 @@ class ShareLock
# to upgrade share locks to exclusive.
# If +loose_upgrades+ is false (the default), then a thread that
# is waiting on an Exclusive lock will continue to hold any Share
# lock that it has already established. This is safer, but can
# lead to deadlock.
#
# If +loose_upgrades+ is true, a thread waiting on an Exclusive
# lock will temporarily relinquish its Share lock. Being less
# strict, this behavior prevents some classes of deadlocks. For
# many resources, loose upgrades are sufficient: if a thread is
# awaiting a lock, it is not running any other code.
attr_reader :loose_upgrades
def initialize(loose_upgrades = false)
@loose_upgrades = loose_upgrades
def initialize
super()
@cv = new_cond
@sharing = Hash.new(0)
@waiting = {}
@exclusive_thread = nil
@exclusive_depth = 0
end
# Returns false if +no_wait+ is specified and the lock is not
# Returns false if +no_wait+ is set and the lock is not
# immediately available. Otherwise, returns true after the lock
# has been acquired.
def start_exclusive(no_wait=false)
#
# +purpose+ and +compatible+ work together; while this thread is
# waiting for the exclusive lock, it will yield its share (if any)
# to any other attempt whose +purpose+ appears in this attempt's
# +compatible+ list. This allows a "loose" upgrade, which, being
# less strict, prevents some classes of deadlocks.
#
# For many resources, loose upgrades are sufficient: if a thread
# is awaiting a lock, it is not running any other code. With
# +purpose+ matching, it is possible to yield only to other
# threads whose activity will not interfere.
def start_exclusive(purpose: nil, compatible: [], no_wait: false)
synchronize do
unless @exclusive_thread == Thread.current
return false if no_wait && busy?
if busy?(purpose)
return false if no_wait
loose_shares = nil
if @loose_upgrades
loose_shares = @sharing.delete(Thread.current)
@waiting[Thread.current] = compatible if loose_shares
begin
@cv.wait_while { busy?(purpose) }
ensure
@waiting.delete Thread.current
@sharing[Thread.current] = loose_shares if loose_shares
end
end
@cv.wait_while { busy? } if busy?
@exclusive_thread = Thread.current
@sharing[Thread.current] = loose_shares if loose_shares
end
@exclusive_depth += 1
......@@ -106,8 +107,10 @@ def stop_sharing
# +no_wait+ is set and the lock is not immediately available,
# returns +nil+ without yielding. Otherwise, returns the result of
# the block.
def exclusive(no_wait=false)
if start_exclusive(no_wait)
#
# See +start_exclusive+ for other options.
def exclusive(purpose: nil, compatible: [], no_wait: false)
if start_exclusive(purpose: purpose, compatible: compatible, no_wait: no_wait)
begin
yield
ensure
......@@ -129,8 +132,9 @@ def sharing
private
# Must be called within synchronize
def busy?
def busy?(purpose)
(@exclusive_thread && @exclusive_thread != Thread.current) ||
@waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) } ||
@sharing.size > (@sharing[Thread.current] > 0 ? 1 : 0)
end
end
......
......@@ -37,6 +37,13 @@ def self.load_interlock
Dependencies.interlock.loading { yield }
end
# Execute the supplied block while holding an exclusive lock,
# preventing any other thread from being inside a #run_interlock
# block at the same time
def self.unload_interlock
Dependencies.interlock.unloading { yield }
end
# :nodoc:
# Should we turn on Ruby warnings on the first load of dependent files?
......@@ -346,7 +353,7 @@ def depend_on(file_name, message = "No such file to load -- %s.rb")
def clear
log_call
Dependencies.load_interlock do
Dependencies.unload_interlock do
loaded.clear
loading.clear
remove_unloadable_constants!
......
......@@ -4,21 +4,27 @@ module ActiveSupport #:nodoc:
module Dependencies #:nodoc:
class Interlock
def initialize # :nodoc:
@lock = ActiveSupport::Concurrency::ShareLock.new(true)
@lock = ActiveSupport::Concurrency::ShareLock.new
end
def loading
@lock.exclusive do
@lock.exclusive(purpose: :load, compatible: [:load]) do
yield
end
end
# Attempt to obtain a "loading" (exclusive) lock. If possible,
def unloading
@lock.exclusive(purpose: :unload, compatible: [:load, :unload]) do
yield
end
end
# Attempt to obtain an "unloading" (exclusive) lock. If possible,
# execute the supplied block while holding the lock. If there is
# concurrent activity, return immediately (without executing the
# block) instead of waiting.
def attempt_loading
@lock.exclusive(true) do
def attempt_unloading
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], no_wait: true) do
yield
end
end
......
require 'abstract_unit'
require 'concurrent/atomics'
require 'active_support/concurrency/share_lock'
class ShareLockTest < ActiveSupport::TestCase
def setup
@lock = ActiveSupport::Concurrency::ShareLock.new
end
def test_reentrancy
thread = Thread.new do
@lock.sharing { @lock.sharing {} }
@lock.exclusive { @lock.exclusive {} }
end
assert_threads_not_stuck thread
end
def test_sharing_doesnt_block
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_latch|
assert_threads_not_stuck(Thread.new {@lock.sharing {} })
end
end
def test_sharing_blocks_exclusive
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
@lock.exclusive(no_wait: true) { flunk } # polling should fail
exclusive_thread = Thread.new { @lock.exclusive {} }
assert_threads_stuck_but_releasable_by_latch exclusive_thread, sharing_thread_release_latch
end
end
def test_exclusive_blocks_sharing
with_thread_waiting_in_lock_section(:exclusive) do |exclusive_thread_release_latch|
sharing_thread = Thread.new { @lock.sharing {} }
assert_threads_stuck_but_releasable_by_latch sharing_thread, exclusive_thread_release_latch
end
end
def test_multiple_exlusives_are_able_to_progress
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
exclusive_threads = (1..2).map do
Thread.new do
@lock.exclusive {}
end
end
assert_threads_stuck_but_releasable_by_latch exclusive_threads, sharing_thread_release_latch
end
end
def test_sharing_is_upgradeable_to_exclusive
upgrading_thread = Thread.new do
@lock.sharing do
@lock.exclusive {}
end
end
assert_threads_not_stuck upgrading_thread
end
def test_exclusive_upgrade_waits_for_other_sharers_to_leave
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
in_sharing = Concurrent::CountDownLatch.new
upgrading_thread = Thread.new do
@lock.sharing do
in_sharing.count_down
@lock.exclusive {}
end
end
in_sharing.wait
assert_threads_stuck_but_releasable_by_latch upgrading_thread, sharing_thread_release_latch
end
end
def test_exclusive_matching_purpose
[true, false].each do |use_upgrading|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
exclusive_threads = (1..2).map do
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
@lock.exclusive(purpose: :load, compatible: [:load, :unload]) {}
end
end
end
assert_threads_stuck_but_releasable_by_latch exclusive_threads, sharing_thread_release_latch
end
end
end
def test_killed_thread_loses_lock
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
thread = Thread.new do
@lock.sharing do
@lock.exclusive {}
end
end
assert_threads_stuck thread
thread.kill
sharing_thread_release_latch.count_down
thread = Thread.new do
@lock.exclusive {}
end
assert_threads_not_stuck thread
end
end
def test_exclusive_conflicting_purpose
[true, false].each do |use_upgrading|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
begin
conflicting_exclusive_threads = [
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
@lock.exclusive(purpose: :red, compatible: [:green, :purple]) {}
end
end,
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
@lock.exclusive(purpose: :blue, compatible: [:green]) {}
end
end
]
assert_threads_stuck conflicting_exclusive_threads # wait for threads to get into their respective `exclusive {}` blocks
# This thread will be stuck as long as any other thread is in
# a sharing block. While it's blocked, it holds no lock, so it
# doesn't interfere with any other attempts.
no_purpose_thread = Thread.new do
@lock.exclusive {}
end
assert_threads_stuck no_purpose_thread
# This thread is compatible with both of the "primary"
# attempts above. It's initially stuck on the outer share
# lock, but as soon as that's released, it can run --
# regardless of whether those threads hold share locks.
compatible_thread = Thread.new do
@lock.exclusive(purpose: :green, compatible: []) {}
end
assert_threads_stuck compatible_thread
assert_threads_stuck conflicting_exclusive_threads
sharing_thread_release_latch.count_down
assert_threads_not_stuck compatible_thread # compatible thread is now able to squeak through
if use_upgrading
# The "primary" threads both each hold a share lock, and are
# mutually incompatible; they're still stuck.
assert_threads_stuck conflicting_exclusive_threads
# The thread without a specified purpose is also stuck; it's
# not compatible with anything.
assert_threads_stuck no_purpose_thread
else
# As the primaries didn't hold a share lock, as soon as the
# outer one was released, all the exclusive locks are free
# to be acquired in turn.
assert_threads_not_stuck conflicting_exclusive_threads
assert_threads_not_stuck no_purpose_thread
end
ensure
conflicting_exclusive_threads.each(&:kill)
no_purpose_thread.kill
end
end
end
end
def test_exclusive_ordering
scratch_pad = []
scratch_pad_mutex = Mutex.new
load_params = [:load, [:load]]
unload_params = [:unload, [:unload, :load]]
[load_params, load_params, unload_params, unload_params].permutation do |thread_params|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
threads = thread_params.map do |purpose, compatible|
Thread.new do
@lock.sharing do
@lock.exclusive(purpose: purpose, compatible: compatible) do
scratch_pad_mutex.synchronize { scratch_pad << purpose }
end
end
end
end
sleep(0.01)
scratch_pad_mutex.synchronize { assert_empty scratch_pad }
sharing_thread_release_latch.count_down
assert_threads_not_stuck threads
scratch_pad_mutex.synchronize do
assert_equal [:load, :load, :unload, :unload], scratch_pad
scratch_pad.clear
end
end
end
end
def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads
scratch_pad = []
scratch_pad_mutex = Mutex.new
upgrading_load_params = [:load, [:load], true]
non_upgrading_unload_params = [:unload, [:load, :unload], false]
[upgrading_load_params, non_upgrading_unload_params].permutation do |thread_params|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
threads = thread_params.map do |purpose, compatible, use_upgrading|
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
@lock.exclusive(purpose: purpose, compatible: compatible) do
scratch_pad_mutex.synchronize { scratch_pad << purpose }
end
end
end
end
assert_threads_stuck threads
scratch_pad_mutex.synchronize { assert_empty scratch_pad }
sharing_thread_release_latch.count_down
assert_threads_not_stuck threads
scratch_pad_mutex.synchronize do
assert_equal [:load, :unload], scratch_pad
scratch_pad.clear
end
end
end
end
private
module CustomAssertions
SUFFICIENT_TIMEOUT = 0.2
private
def assert_threads_stuck_but_releasable_by_latch(threads, latch)
assert_threads_stuck threads
latch.count_down
assert_threads_not_stuck threads
end
def assert_threads_stuck(threads)
sleep(SUFFICIENT_TIMEOUT) # give threads time to do their business
assert(Array(threads).all? { |t| t.join(0.001).nil? })
end
def assert_threads_not_stuck(threads)
assert(Array(threads).all? { |t| t.join(SUFFICIENT_TIMEOUT) })
end
end
class CustomAssertionsTest < ActiveSupport::TestCase
include CustomAssertions
def setup
@latch = Concurrent::CountDownLatch.new
@thread = Thread.new { @latch.wait }
end
def teardown
@latch.count_down
@thread.join
end
def test_happy_path
assert_threads_stuck_but_releasable_by_latch @thread, @latch
end
def test_detects_stuck_thread
assert_raises(Minitest::Assertion) do
assert_threads_not_stuck @thread
end
end
def test_detects_free_thread
@latch.count_down
assert_raises(Minitest::Assertion) do
assert_threads_stuck @thread
end
end
def test_detects_already_released
@latch.count_down
assert_raises(Minitest::Assertion) do
assert_threads_stuck_but_releasable_by_latch @thread, @latch
end
end
def test_detects_remains_latched
another_latch = Concurrent::CountDownLatch.new
assert_raises(Minitest::Assertion) do
assert_threads_stuck_but_releasable_by_latch @thread, another_latch
end
end
end
include CustomAssertions
def with_thread_waiting_in_lock_section(lock_section)
in_section = Concurrent::CountDownLatch.new
section_release = Concurrent::CountDownLatch.new
stuck_thread = Thread.new do
@lock.send(lock_section) do
in_section.count_down
section_release.wait
end
end
in_section.wait
yield section_release
ensure
section_release.count_down
stuck_thread.join # clean up
end
end
......@@ -86,7 +86,7 @@ module Finisher
# added in the hook are taken into account.
initializer :set_clear_dependencies_hook, group: :all do
callback = lambda do
ActiveSupport::Dependencies.interlock.attempt_loading do
ActiveSupport::Dependencies.interlock.attempt_unloading do
ActiveSupport::DescendantsTracker.clear
ActiveSupport::Dependencies.clear
end
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册