提交 aeb58ab7 编写于 作者: M Matthew Draper

Block new share attempts if there's an exclusive waiter

上级 92203edb
......@@ -48,14 +48,14 @@ def initialize
def start_exclusive(purpose: nil, compatible: [], no_wait: false)
synchronize do
unless @exclusive_thread == Thread.current
if busy?(purpose)
if busy_for_exclusive?(purpose)
return false if no_wait
loose_shares = @sharing.delete(Thread.current)
@waiting[Thread.current] = compatible if loose_shares
begin
@cv.wait_while { busy?(purpose) }
@cv.wait_while { busy_for_exclusive?(purpose) }
ensure
@waiting.delete Thread.current
@sharing[Thread.current] = loose_shares if loose_shares
......@@ -83,10 +83,10 @@ def stop_exclusive
end
end
def start_sharing
def start_sharing(purpose: :share)
synchronize do
if @exclusive_thread && @exclusive_thread != Thread.current
@cv.wait_while { @exclusive_thread }
if busy_for_sharing?(purpose)
@cv.wait_while { busy_for_sharing?(purpose) }
end
@sharing[Thread.current] += 1
end
......@@ -132,11 +132,15 @@ def sharing
private
# Must be called within synchronize
def busy?(purpose)
(@exclusive_thread && @exclusive_thread != Thread.current) ||
@waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) } ||
def busy_for_exclusive?(purpose)
busy_for_sharing?(purpose) ||
@sharing.size > (@sharing[Thread.current] > 0 ? 1 : 0)
end
def busy_for_sharing?(purpose)
(@exclusive_thread && @exclusive_thread != Thread.current) ||
@waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) }
end
end
end
end
......@@ -114,14 +114,17 @@ def test_exclusive_conflicting_purpose
[true, false].each do |use_upgrading|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
begin
together = Concurrent::CyclicBarrier.new(2)
conflicting_exclusive_threads = [
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
together.wait
@lock.exclusive(purpose: :red, compatible: [:green, :purple]) {}
end
end,
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
together.wait
@lock.exclusive(purpose: :blue, compatible: [:green]) {}
end
end
......@@ -183,11 +186,14 @@ def test_exclusive_ordering
load_params = [:load, [:load]]
unload_params = [:unload, [:unload, :load]]
all_sharing = Concurrent::CyclicBarrier.new(4)
[load_params, load_params, unload_params, unload_params].permutation do |thread_params|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
threads = thread_params.map do |purpose, compatible|
Thread.new do
@lock.sharing do
all_sharing.wait
@lock.exclusive(purpose: purpose, compatible: compatible) do
scratch_pad_mutex.synchronize { scratch_pad << purpose }
end
......@@ -209,6 +215,32 @@ def test_exclusive_ordering
end
end
def test_new_share_attempts_block_on_waiting_exclusive
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
release_exclusive = Concurrent::CountDownLatch.new
waiting_exclusive = Thread.new do
@lock.sharing do
@lock.exclusive do
release_exclusive.wait
end
end
end
assert_threads_stuck waiting_exclusive
late_share_attempt = Thread.new do
@lock.sharing {}
end
assert_threads_stuck late_share_attempt
sharing_thread_release_latch.count_down
assert_threads_stuck late_share_attempt
release_exclusive.count_down
assert_threads_not_stuck late_share_attempt
end
end
def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads
scratch_pad = []
scratch_pad_mutex = Mutex.new
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册