提交 f836630f 编写于 作者: M Matthew Draper

After completing a load, give other threads a chance too

While we know no user code is running, we should do as much loading as
we can. That way, all the threads will then be able to resume running
user code together.

Otherwise, only the last arriving thread would get to do its load, and
would then return to userspace, leaving the others still blocked.
上级 f02bd2a9
......@@ -51,14 +51,8 @@ def start_exclusive(purpose: nil, compatible: [], no_wait: false)
if busy_for_exclusive?(purpose)
return false if no_wait
loose_shares = @sharing.delete(Thread.current)
@waiting[Thread.current] = compatible if loose_shares
begin
yield_shares(purpose, compatible) do
@cv.wait_while { busy_for_exclusive?(purpose) }
ensure
@waiting.delete Thread.current
@sharing[Thread.current] = loose_shares if loose_shares
end
end
@exclusive_thread = Thread.current
......@@ -71,14 +65,18 @@ def start_exclusive(purpose: nil, compatible: [], no_wait: false)
# Relinquish the exclusive lock. Must only be called by the thread
# that called start_exclusive (and currently holds the lock).
def stop_exclusive
def stop_exclusive(compatible: [])
synchronize do
raise "invalid unlock" if @exclusive_thread != Thread.current
@exclusive_depth -= 1
if @exclusive_depth == 0
@exclusive_thread = nil
@cv.broadcast
yield_shares(nil, compatible) do
@cv.broadcast
@cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) }
end
end
end
end
......@@ -109,12 +107,12 @@ def stop_sharing
# the block.
#
# See +start_exclusive+ for other options.
def exclusive(purpose: nil, compatible: [], no_wait: false)
def exclusive(purpose: nil, compatible: [], after_compatible: [], no_wait: false)
if start_exclusive(purpose: purpose, compatible: compatible, no_wait: no_wait)
begin
yield
ensure
stop_exclusive
stop_exclusive(compatible: after_compatible)
end
end
end
......@@ -139,7 +137,23 @@ def busy_for_exclusive?(purpose)
def busy_for_sharing?(purpose)
(@exclusive_thread && @exclusive_thread != Thread.current) ||
@waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) }
@waiting.any? { |t, (_, c)| t != Thread.current && !c.include?(purpose) }
end
def eligible_waiters?(compatible)
@waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } }
end
def yield_shares(purpose, compatible)
loose_shares = @sharing.delete(Thread.current)
@waiting[Thread.current] = [purpose, compatible] if loose_shares
begin
yield
ensure
@waiting.delete Thread.current
@sharing[Thread.current] = loose_shares if loose_shares
end
end
end
end
......
......@@ -8,13 +8,13 @@ def initialize # :nodoc:
end
def loading
@lock.exclusive(purpose: :load, compatible: [:load]) do
@lock.exclusive(purpose: :load, compatible: [:load], after_compatible: [:load]) do
yield
end
end
def unloading
@lock.exclusive(purpose: :unload, compatible: [:load, :unload]) do
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload]) do
yield
end
end
......@@ -24,7 +24,7 @@ def unloading
# concurrent activity, return immediately (without executing the
# block) instead of waiting.
def attempt_unloading
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], no_wait: true) do
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload], no_wait: true) do
yield
end
end
......
......@@ -270,6 +270,23 @@ def test_share_remains_reentrant_ignoring_a_waiting_exclusive
end
end
def test_compatible_exclusives_cooperate_to_both_proceed
ready = Concurrent::CyclicBarrier.new(2)
done = Concurrent::CyclicBarrier.new(2)
threads = 2.times.map do
Thread.new do
@lock.sharing do
ready.wait
@lock.exclusive(purpose: :x, compatible: [:x], after_compatible: [:x]) {}
done.wait
end
end
end
assert_threads_not_stuck threads
end
def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads
scratch_pad = []
scratch_pad_mutex = Mutex.new
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册