Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
张重言
rails
提交
d2901f0f
R
rails
项目概览
张重言
/
rails
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
rails
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
d2901f0f
编写于
5月 21, 2012
作者:
A
Aaron Patterson
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #6416 from pmahoney/threadsafe-connection-pool
Make connection pool fair with respect to waiting threads.
上级
525839fd
d06674d5
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
202 addition
and
35 deletion
+202
-35
activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
...ve_record/connection_adapters/abstract/connection_pool.rb
+86
-34
activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
...d/test/cases/connection_adapters/abstract_adapter_test.rb
+1
-1
activerecord/test/cases/connection_pool_test.rb
activerecord/test/cases/connection_pool_test.rb
+115
-0
未找到文件。
activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
浏览文件 @
d2901f0f
...
...
@@ -2,7 +2,6 @@
require
'monitor'
require
'set'
require
'active_support/core_ext/module/deprecation'
require
'timeout'
module
ActiveRecord
# Raised when a connection could not be obtained within the connection
...
...
@@ -92,21 +91,6 @@ def run
attr_accessor
:automatic_reconnect
,
:timeout
attr_reader
:spec
,
:connections
,
:size
,
:reaper
class
Latch
# :nodoc:
def
initialize
@mutex
=
Mutex
.
new
@cond
=
ConditionVariable
.
new
end
def
release
@mutex
.
synchronize
{
@cond
.
broadcast
}
end
def
await
@mutex
.
synchronize
{
@cond
.
wait
@mutex
}
end
end
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
# host name, username, password, etc), as well as the maximum size for
...
...
@@ -128,9 +112,25 @@ def initialize(spec)
# default max pool size to 5
@size
=
(
spec
.
config
[
:pool
]
&&
spec
.
config
[
:pool
].
to_i
)
||
5
@latch
=
Latch
.
new
@connections
=
[]
@automatic_reconnect
=
true
# connections available to be checked out
@available
=
[]
# number of threads waiting to check out a connection
@num_waiting
=
0
# signal threads waiting
@cond
=
new_cond
end
# Hack for tests to be able to add connections. Do not call outside of tests
def
insert_connection_for_test!
(
c
)
synchronize
do
@connections
<<
c
@available
<<
c
end
end
# Retrieve the connection associated with the current thread, or call
...
...
@@ -188,6 +188,7 @@ def disconnect!
conn
.
disconnect!
end
@connections
=
[]
@available
=
[]
end
end
...
...
@@ -202,6 +203,9 @@ def clear_reloadable_connections!
@connections
.
delete_if
do
|
conn
|
conn
.
requires_reloading?
end
@available
.
delete_if
do
|
conn
|
conn
.
requires_reloading?
end
end
end
...
...
@@ -225,23 +229,19 @@ def clear_stale_cached_connections! # :nodoc:
# Raises:
# - PoolFullError: no connection can be obtained from the pool.
def
checkout
loop
do
# Checkout an available connection
synchronize
do
# Try to find a connection that hasn't been leased, and lease it
conn
=
connections
.
find
{
|
c
|
c
.
lease
}
# If all connections were leased, and we have room to expand,
# create a new connection and lease it.
if
!
conn
&&
connections
.
size
<
size
conn
=
checkout_new_connection
conn
.
lease
end
synchronize
do
conn
=
nil
if
@num_waiting
==
0
conn
=
acquire_connection
end
return
checkout_and_verify
(
conn
)
if
conn
unless
conn
conn
=
wait_until
(
@timeout
)
{
acquire_connection
}
end
Timeout
.
timeout
(
@timeout
,
PoolFullError
)
{
@latch
.
await
}
conn
.
lease
checkout_and_verify
(
conn
)
end
end
...
...
@@ -257,8 +257,10 @@ def checkin(conn)
end
release
conn
@available
.
unshift
conn
@cond
.
signal
end
@latch
.
release
end
# Remove a connection from the connection pool. The connection will
...
...
@@ -266,12 +268,14 @@ def checkin(conn)
def
remove
(
conn
)
synchronize
do
@connections
.
delete
conn
@available
.
delete
conn
# FIXME: we might want to store the key on the connection so that removing
# from the reserved hash will be a little easier.
release
conn
@cond
.
signal
# can make a new connection now
end
@latch
.
release
end
# Removes dead connections from the pool. A dead connection can occur
...
...
@@ -283,12 +287,60 @@ def reap
connections
.
dup
.
each
do
|
conn
|
remove
conn
if
conn
.
in_use?
&&
stale
>
conn
.
last_use
&&
!
conn
.
active?
end
@cond
.
broadcast
# may violate fairness
end
@latch
.
release
end
private
# Take an available connection or, if possible, create a new
# one, or nil.
#
# Monitor must be held while calling this method.
#
# Returns: a newly acquired connection.
def
acquire_connection
if
@available
.
any?
@available
.
pop
elsif
connections
.
size
<
size
checkout_new_connection
end
end
# Wait on +@cond+ until the block returns non-nil. Note that
# unlike MonitorMixin::ConditionVariable#wait_until, this method
# does not test the block before the first wait period.
#
# Monitor must be held when calling this method.
#
# +timeout+: Integer timeout in seconds
#
# Returns: the result of the block
#
# Raises:
# - PoolFullError: timeout elapsed before +&block+ returned a connection
def
wait_until
(
timeout
,
&
block
)
@num_waiting
+=
1
begin
t0
=
Time
.
now
loop
do
elapsed
=
Time
.
now
-
t0
if
elapsed
>=
timeout
msg
=
'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)'
%
[
timeout
,
elapsed
]
raise
PoolFullError
,
msg
end
@cond
.
wait
(
timeout
-
elapsed
)
conn
=
yield
return
conn
if
conn
end
ensure
@num_waiting
-=
1
end
end
def
release
(
conn
)
thread_id
=
if
@reserved_connections
[
current_connection_id
]
==
conn
current_connection_id
...
...
activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
浏览文件 @
d2901f0f
...
...
@@ -36,7 +36,7 @@ def test_expire_mutates_in_use
def
test_close
pool
=
ConnectionPool
.
new
(
ConnectionSpecification
.
new
({},
nil
))
pool
.
connections
<<
adapter
pool
.
insert_connection_for_test!
adapter
adapter
.
pool
=
pool
# Make sure the pool marks the connection in use
...
...
activerecord/test/cases/connection_pool_test.rb
浏览文件 @
d2901f0f
...
...
@@ -200,6 +200,121 @@ def test_checkout_behaviour
end
.
join
end
# The connection pool is "fair" if threads waiting for
# connections receive them the order in which they began
# waiting. This ensures that we don't timeout one HTTP request
# even while well under capacity in a multi-threaded environment
# such as a Java servlet container.
#
# We don't need strict fairness: if two connections become
# available at the same time, it's fine of two threads that were
# waiting acquire the connections out of order.
#
# Thus this test prepares waiting threads and then trickles in
# available connections slowly, ensuring the wakeup order is
# correct in this case.
#
# Try a few times since it might work out just by chance.
def
test_checkout_fairness
4
.
times
{
setup
;
do_checkout_fairness
}
end
def
do_checkout_fairness
expected
=
(
1
..
@pool
.
size
).
to_a
.
freeze
# check out all connections so our threads start out waiting
conns
=
expected
.
map
{
@pool
.
checkout
}
mutex
=
Mutex
.
new
order
=
[]
errors
=
[]
threads
=
expected
.
map
do
|
i
|
t
=
Thread
.
new
{
begin
conn
=
@pool
.
checkout
# never checked back in
mutex
.
synchronize
{
order
<<
i
}
rescue
=>
e
mutex
.
synchronize
{
errors
<<
e
}
end
}
Thread
.
pass
until
t
.
status
==
"sleep"
t
end
# this should wake up the waiting threads one by one in order
conns
.
each
{
|
conn
|
@pool
.
checkin
(
conn
);
sleep
0.1
}
threads
.
each
(
&
:join
)
raise
errors
.
first
if
errors
.
any?
assert_equal
(
expected
,
order
)
end
# As mentioned in #test_checkout_fairness, we don't care about
# strict fairness. This test creates two groups of threads:
# group1 whose members all start waiting before any thread in
# group2. Enough connections are checked in to wakeup all
# group1 threads, and the fact that only group1 and no group2
# threads acquired a connection is enforced.
#
# Try a few times since it might work out just by chance.
def
test_checkout_fairness_by_group
4
.
times
{
setup
;
do_checkout_fairness_by_group
}
end
def
do_checkout_fairness_by_group
@pool
.
instance_variable_set
(
:@size
,
10
)
# take all the connections
conns
=
(
1
..
10
).
map
{
@pool
.
checkout
}
mutex
=
Mutex
.
new
successes
=
[]
# threads that successfully got a connection
errors
=
[]
make_thread
=
proc
do
|
i
|
t
=
Thread
.
new
{
begin
conn
=
@pool
.
checkout
# never checked back in
mutex
.
synchronize
{
successes
<<
i
}
rescue
=>
e
mutex
.
synchronize
{
errors
<<
e
}
end
}
Thread
.
pass
until
t
.
status
==
"sleep"
t
end
# all group1 threads start waiting before any in group2
group1
=
(
1
..
5
).
map
(
&
make_thread
)
group2
=
(
6
..
10
).
map
(
&
make_thread
)
# checkin n connections back to the pool
checkin
=
proc
do
|
n
|
n
.
times
do
c
=
conns
.
pop
@pool
.
checkin
(
c
)
end
end
checkin
.
call
(
group1
.
size
)
# should wake up all group1
loop
do
sleep
0.1
break
if
mutex
.
synchronize
{
(
successes
.
size
+
errors
.
size
)
==
group1
.
size
}
end
winners
=
mutex
.
synchronize
{
successes
.
dup
}
checkin
.
call
(
group2
.
size
)
# should wake up everyone remaining
group1
.
each
(
&
:join
)
group2
.
each
(
&
:join
)
assert_equal
((
1
..
group1
.
size
).
to_a
,
winners
.
sort
)
if
errors
.
any?
raise
errors
.
first
end
end
def
test_automatic_reconnect
=
pool
=
ConnectionPool
.
new
ActiveRecord
::
Base
.
connection_pool
.
spec
assert
pool
.
automatic_reconnect
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录