提交 d6466beb 编写于 作者: E eileencodes

Ensure test threads share a DB connection

This ensures multiple threads inside a transactional test to see consistent
database state.

When a system test starts Puma spins up one thread and Capybara spins up
another thread. Because of this when tests are run the database cannot
see what was inserted into the database on teardown. This is because
there are two threads using two different connections.

This change uses the statement cache to lock the threads to using a
single connection ID instead of each not being able to see each other.
This code only runs in the fixture setup and teardown so it does not
affect real production databases.

When a transaction is opened we set `lock_thread` to `Thread.current` so
we can keep track of which connection the thread is using. When we
rollback the transaction we unlock the thread and then there will be no
left-over data in the database because the transaction will roll back
the correct connections.

[ Eileen M. Uchitelle, Matthew Draper ]
上级 71da3909
......@@ -353,6 +353,16 @@ def initialize(spec)
@threads_blocking_new_connections = 0
@available = ConnectionLeasingQueue.new self
@lock_thread = false
end
def lock_thread=(lock_thread)
if lock_thread
@lock_thread = Thread.current
else
@lock_thread = nil
end
end
# Retrieve the connection associated with the current thread, or call
......@@ -361,7 +371,7 @@ def initialize(spec)
# #connection can be called any number of times; the connection is
# held in a cache keyed by a thread.
def connection
@thread_cached_conns[connection_cache_key(Thread.current)] ||= checkout
@thread_cached_conns[connection_cache_key(@lock_thread || Thread.current)] ||= checkout
end
# Returns true if there is an open connection being used for the current thread.
......
......@@ -83,7 +83,9 @@ def uncached
# the same SQL query and repeatedly return the same result each time, silently
# undermining the randomness you were expecting.
def clear_query_cache
@query_cache.clear
@lock.synchronize do
@query_cache.clear
end
end
def select_all(arel, name = nil, binds = [], preparable: nil)
......@@ -99,21 +101,23 @@ def select_all(arel, name = nil, binds = [], preparable: nil)
private
def cache_sql(sql, name, binds)
result =
if @query_cache[sql].key?(binds)
ActiveSupport::Notifications.instrument(
"sql.active_record",
sql: sql,
binds: binds,
name: name,
connection_id: object_id,
cached: true,
)
@query_cache[sql][binds]
else
@query_cache[sql][binds] = yield
end
result.dup
@lock.synchronize do
result =
if @query_cache[sql].key?(binds)
ActiveSupport::Notifications.instrument(
"sql.active_record",
sql: sql,
binds: binds,
name: name,
connection_id: object_id,
cached: true,
)
@query_cache[sql][binds]
else
@query_cache[sql][binds] = yield
end
result.dup
end
end
# If arel is locked this is a SELECT ... FOR UPDATE or somesuch. Such
......
......@@ -107,6 +107,7 @@ def initialize(connection, logger = nil, config = {}) # :nodoc:
@schema_cache = SchemaCache.new self
@quoted_column_names, @quoted_table_names = {}, {}
@visitor = arel_visitor
@lock = Monitor.new
if self.class.type_cast_config_to_boolean(config.fetch(:prepared_statements) { true })
@prepared_statements = true
......@@ -605,7 +606,11 @@ def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name =
binds: binds,
type_casted_binds: type_casted_binds,
statement_name: statement_name,
connection_id: object_id) { yield }
connection_id: object_id) do
@lock.synchronize do
yield
end
end
rescue => e
raise translate_exception_class(e, sql)
end
......
......@@ -236,7 +236,9 @@ def initialize(connection, logger, connection_parameters, config)
# Clears the prepared statements cache.
def clear_cache!
@statements.clear
@lock.synchronize do
@statements.clear
end
end
def truncate(table_name, name = nil)
......@@ -637,8 +639,10 @@ def exec_cache(sql, name, binds)
if in_transaction?
raise ActiveRecord::PreparedStatementCacheExpired.new(e.cause.message)
else
# outside of transactions we can simply flush this query and retry
@statements.delete sql_key(sql)
@lock.synchronize do
# outside of transactions we can simply flush this query and retry
@statements.delete sql_key(sql)
end
retry
end
end
......@@ -674,19 +678,21 @@ def sql_key(sql)
# Prepare the statement if it hasn't been prepared, return
# the statement key.
def prepare_statement(sql)
sql_key = sql_key(sql)
unless @statements.key? sql_key
nextkey = @statements.next_key
begin
@connection.prepare nextkey, sql
rescue => e
raise translate_exception_class(e, sql)
@lock.synchronize do
sql_key = sql_key(sql)
unless @statements.key? sql_key
nextkey = @statements.next_key
begin
@connection.prepare nextkey, sql
rescue => e
raise translate_exception_class(e, sql)
end
# Clear the queue
@connection.get_last_result
@statements[sql_key] = nextkey
end
# Clear the queue
@connection.get_last_result
@statements[sql_key] = nextkey
@statements[sql_key]
end
@statements[sql_key]
end
# Connects to a PostgreSQL server and sets up the adapter depending on the
......
......@@ -970,6 +970,7 @@ def setup_fixtures(config = ActiveRecord::Base)
@fixture_connections = enlist_fixture_connections
@fixture_connections.each do |connection|
connection.begin_transaction joinable: false
connection.pool.lock_thread = true
end
# When connections are established in the future, begin a transaction too
......@@ -985,6 +986,7 @@ def setup_fixtures(config = ActiveRecord::Base)
if connection && !@fixture_connections.include?(connection)
connection.begin_transaction joinable: false
connection.pool.lock_thread = true
@fixture_connections << connection
end
end
......@@ -1007,6 +1009,7 @@ def teardown_fixtures
ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber
@fixture_connections.each do |connection|
connection.rollback_transaction if connection.transaction_open?
connection.pool.lock_thread = false
end
@fixture_connections.clear
else
......
......@@ -640,6 +640,8 @@ class TransactionalFixturesOnConnectionNotification < ActiveRecord::TestCase
def test_transaction_created_on_connection_notification
connection = stub(transaction_open?: false)
connection.expects(:begin_transaction).with(joinable: false)
pool = connection.stubs(:pool).returns(ActiveRecord::ConnectionAdapters::ConnectionPool.new(ActiveRecord::Base.connection_pool.spec))
pool.stubs(:lock_thread=).with(false)
fire_connection_notification(connection)
end
......@@ -647,12 +649,16 @@ def test_notification_established_transactions_are_rolled_back
# Mocha is not thread-safe so define our own stub to test
connection = Class.new do
attr_accessor :rollback_transaction_called
attr_accessor :pool
def transaction_open?; true; end
def begin_transaction(*args); end
def rollback_transaction(*args)
@rollback_transaction_called = true
end
end.new
connection.pool = Class.new do
def lock_thread=(lock_thread); false; end
end.new
fire_connection_notification(connection)
teardown_fixtures
assert(connection.rollback_transaction_called, "Expected <mock connection>#rollback_transaction to be called but was not")
......
......@@ -532,4 +532,16 @@ def test_cache_is_expired_by_habtm_delete
end
end
end
test "threads use the same connection" do
@connection_1 = ActiveRecord::Base.connection.object_id
thread_a = Thread.new do
@connection_2 = ActiveRecord::Base.connection.object_id
end
thread_a.join
assert_equal @connection_1, @connection_2
end
end
......@@ -10,6 +10,8 @@
class DefaultScopingTest < ActiveRecord::TestCase
fixtures :developers, :posts, :comments
self.use_transactional_tests = false
def test_default_scope
expected = Developer.all.merge!(order: "salary DESC").to_a.collect(&:salary)
received = DeveloperOrderedBySalary.all.collect(&:salary)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册