未验证 提交 384e7d13 编写于 作者: E eileencodes

Add support for horizontal sharding

Applications can now connect to multiple shards and switch between
their shards in an application. Note that the shard swapping is
still a manual process as this change does not include an API for
automatic shard swapping.

Usage:

Given the following configuration:

```yaml
production:
  primary:
    database: my_database
  primary_shard_one:
    database: my_database_shard_one
```

Connect to multiple shards:

```ruby
class ApplicationRecord < ActiveRecord::Base
  self.abstract_class = true

  connects_to shards: {
    default: { writing: :primary },
    shard_one: { writing: :primary_shard_one }
  }
```

Swap between shards in your controller / model code:

```ruby
  ActiveRecord::Base.connected_to(shard: :shard_one) do
    # Read from shard one
  end
```

The horizontal sharding API also supports read replicas. See
guides for more details.

This PR also moves some no-doc'd methods into the private namespace as
they were unnecessarily public. We've updated some error messages and
documentation.
Co-authored-by: NJohn Crepezzi <john.crepezzi@gmail.com>
上级 541ec328
* Add support for horizontal sharding to `connects_to` and `connected_to`.
Applications can now connect to multiple shards and switch between their shards in an application. Note that the shard swapping is still a manual process as this change does not include an API for automatic shard swapping.
Usage:
Given the following configuration:
```yaml
# config/database.yml
production:
primary:
database: my_database
primary_shard_one:
database: my_database_shard_one
```
Connect to multiple shards:
```ruby
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true
connects_to shards: {
default: { writing: :primary },
shard_one: { writing: :primary_shard_one }
}
```
Swap between shards in your controller / model code:
```ruby
ActiveRecord::Base.connected_to(shard: :shard_one) do
# Read from shard one
end
```
The horizontal sharding API also supports read replicas. See guides for more details.
*Eileen M. Uchitelle*, *John Crepezzi*
* Deprecate `spec_name` in favor of `name` on database configurations
The accessors for `spec_name` on `configs_for` and `DatabaseConfig` are deprecated. Please use `name` instead.
......
......@@ -1039,7 +1039,7 @@ def connection_pool_list
end
alias :connection_pools :connection_pool_list
def establish_connection(config, pool_key = :default)
def establish_connection(config, pool_key = ActiveRecord::Base.default_pool_key)
pool_config = resolve_pool_config(config)
db_config = pool_config.db_config
......@@ -1100,16 +1100,19 @@ def flush_idle_connections!
# active or defined connection: if it is the latter, it will be
# opened and set as the active connection for the class it was defined
# for (not necessarily the current class).
def retrieve_connection(spec_name) # :nodoc:
pool = retrieve_connection_pool(spec_name)
def retrieve_connection(spec_name, pool_key = ActiveRecord::Base.default_pool_key) # :nodoc:
pool = retrieve_connection_pool(spec_name, pool_key)
unless pool
# multiple database application
if ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler
raise ConnectionNotEstablished, "No connection pool for '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role."
if pool_key != ActiveRecord::Base.default_pool_key
message = "No connection pool for '#{spec_name}' found for the '#{pool_key}' shard."
elsif ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler
message = "No connection pool for '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role."
else
raise ConnectionNotEstablished, "No connection pool for '#{spec_name}' found."
message = "No connection pool for '#{spec_name}' found."
end
raise ConnectionNotEstablished, message
end
pool.connection
......@@ -1117,7 +1120,7 @@ def retrieve_connection(spec_name) # :nodoc:
# Returns true if a connection that's accessible to this class has
# already been opened.
def connected?(spec_name, pool_key = :default)
def connected?(spec_name, pool_key = ActiveRecord::Base.default_pool_key)
pool = retrieve_connection_pool(spec_name, pool_key)
pool && pool.connected?
end
......@@ -1126,12 +1129,12 @@ def connected?(spec_name, pool_key = :default)
# connection and the defined connection (if they exist). The result
# can be used as an argument for #establish_connection, for easily
# re-establishing the connection.
def remove_connection(owner, pool_key = :default)
def remove_connection(owner, pool_key = ActiveRecord::Base.default_pool_key)
remove_connection_pool(owner, pool_key)&.configuration_hash
end
deprecate remove_connection: "Use #remove_connection_pool, which now returns a DatabaseConfig object instead of a Hash"
def remove_connection_pool(owner, pool_key = :default)
def remove_connection_pool(owner, pool_key = ActiveRecord::Base.default_pool_key)
if pool_manager = get_pool_manager(owner)
pool_config = pool_manager.remove_pool_config(pool_key)
......@@ -1145,7 +1148,7 @@ def remove_connection_pool(owner, pool_key = :default)
# Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool_manager.
# This makes retrieving the connection pool O(1) once the process is warm.
# When a connection is established or removed, we invalidate the cache.
def retrieve_connection_pool(owner, pool_key = :default)
def retrieve_connection_pool(owner, pool_key = ActiveRecord::Base.default_pool_key)
pool_config = get_pool_manager(owner)&.get_pool_config(pool_key)
pool_config&.pool
end
......
......@@ -48,7 +48,7 @@ module ConnectionHandling
# may be returned on an error.
def establish_connection(config_or_env = nil)
db_config = resolve_config_for_connection(config_or_env)
connection_handler.establish_connection(db_config)
connection_handler.establish_connection(db_config, current_pool_key)
end
# Connects a model to the databases specified. The +database+ keyword
......@@ -64,8 +64,24 @@ def establish_connection(config_or_env = nil)
# connects_to database: { writing: :primary, reading: :primary_replica }
# end
#
# Returns an array of established connections.
def connects_to(database: {})
# +connects_to+ also supports horizontal sharding. The horizontal sharding API
# also supports read replicas. Connect a model to a list of shards like this:
#
# class AnimalsModel < ApplicationRecord
# self.abstract_class = true
#
# connects_to shards: {
# default: { writing: :primary, reading: :primary_replica },
# shard_two: { writing: :primary_shard_two, reading: :primary_shard_replica_two }
# }
# end
#
# Returns an array of database connections.
def connects_to(database: {}, shards: {})
if database.present? && shards.present?
raise ArgumentError, "connects_to can only accept a `database` or `shards` argument, but not both arguments."
end
connections = []
database.each do |role, database_key|
......@@ -75,14 +91,25 @@ def connects_to(database: {})
connections << handler.establish_connection(db_config)
end
shards.each do |pool_key, database_keys|
database_keys.each do |role, database_key|
db_config = resolve_config_for_connection(database_key)
handler = lookup_connection_handler(role.to_sym)
connections << handler.establish_connection(db_config, pool_key.to_sym)
end
end
connections
end
# Connects to a database or role (ex writing, reading, or another
# custom role) for the duration of the block.
# Connects to a role (ex writing, reading or a custom role) and/or
# shard for the duration of the block. At the end of the block the
# connection will be returned to the original role / shard.
#
# If a role is passed, Active Record will look up the connection
# based on the requested role:
# If only a role is passed, Active Record will look up the connection
# based on the requested role. If a non-established role is requested
# an `ActiveRecord::ConnectionNotEstablished` error will be raised:
#
# ActiveRecord::Base.connected_to(role: :writing) do
# Dog.create! # creates dog using dog writing connection
......@@ -92,16 +119,29 @@ def connects_to(database: {})
# Dog.create! # throws exception because we're on a replica
# end
#
# ActiveRecord::Base.connected_to(role: :unknown_role) do
# # raises exception due to non-existent role
# If only a shard is passed, Active Record will look up the shard on the
# current role. If a non-existent shard is passed, an
# `ActiveRecord::ConnectionNotEstablished` error will be raised.
#
# ActiveRecord::Base.connected_to(shard: :default) do
# # Dog.create! # creates dog in shard with the default key
# end
def connected_to(database: nil, role: nil, prevent_writes: false, &blk)
#
# If a shard and role is passed, Active Record will first lookup the role,
# and then look up the connection by shard key.
#
# ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one_replica) do
# # Dog.create! # would raise as we're on a readonly connection
# end
#
# The database kwarg is deprecated and will be removed in 6.2.0 without replacement.
def connected_to(database: nil, role: nil, shard: nil, prevent_writes: false, &blk)
if database
ActiveSupport::Deprecation.warn("The database key in `connected_to` is deprecated. It will be removed in Rails 6.2.0 without replacement.")
end
if database && role
raise ArgumentError, "connected_to can only accept a `database` or a `role` argument, but not both arguments."
if database && (role || shard)
raise ArgumentError, "`connected_to` cannot accept a `database` argument with any other arguments."
elsif database
if database.is_a?(Hash)
role, database = database.first
......@@ -114,14 +154,12 @@ def connected_to(database: nil, role: nil, prevent_writes: false, &blk)
handler.establish_connection(db_config)
with_handler(role, &blk)
elsif shard
with_shard(connection_specification_name, shard, role || current_role, prevent_writes, &blk)
elsif role
prevent_writes = true if role == reading_role
with_handler(role.to_sym) do
connection_handler.while_preventing_writes(prevent_writes, &blk)
end
with_role(role, prevent_writes, &blk)
else
raise ArgumentError, "must provide a `database` or a `role`."
raise ArgumentError, "must provide a `shard` and/or `role`."
end
end
......@@ -131,8 +169,8 @@ def connected_to(database: nil, role: nil, prevent_writes: false, &blk)
# ActiveRecord::Base.connected_to?(role: :writing) #=> true
# ActiveRecord::Base.connected_to?(role: :reading) #=> false
# end
def connected_to?(role:)
current_role == role.to_sym
def connected_to?(role:, shard: ActiveRecord::Base.default_pool_key)
current_role == role.to_sym && current_pool_key == shard.to_sym
end
# Returns the symbol representing the current connected role.
......@@ -153,11 +191,6 @@ def lookup_connection_handler(handler_key) # :nodoc:
connection_handlers[handler_key] ||= ActiveRecord::ConnectionAdapters::ConnectionHandler.new
end
def with_handler(handler_key, &blk) # :nodoc:
handler = lookup_connection_handler(handler_key)
swap_connection_handler(handler, &blk)
end
# Clears the query cache for all connections associated with the current thread.
def clear_query_caches_for_current_thread
ActiveRecord::Base.connection_handlers.each_value do |handler|
......@@ -211,16 +244,16 @@ def connection_db_config
end
def connection_pool
connection_handler.retrieve_connection_pool(connection_specification_name) || raise(ConnectionNotEstablished)
connection_handler.retrieve_connection_pool(connection_specification_name, current_pool_key) || raise(ConnectionNotEstablished)
end
def retrieve_connection
connection_handler.retrieve_connection(connection_specification_name)
connection_handler.retrieve_connection(connection_specification_name, current_pool_key)
end
# Returns +true+ if Active Record is connected.
def connected?
connection_handler.connected?(connection_specification_name)
connection_handler.connected?(connection_specification_name, current_pool_key)
end
def remove_connection(name = nil)
......@@ -228,11 +261,11 @@ def remove_connection(name = nil)
# if removing a connection that has a pool, we reset the
# connection_specification_name so it will use the parent
# pool.
if connection_handler.retrieve_connection_pool(name)
if connection_handler.retrieve_connection_pool(name, current_pool_key)
self.connection_specification_name = nil
end
connection_handler.remove_connection_pool(name)
connection_handler.remove_connection_pool(name, current_pool_key)
end
def clear_cache! # :nodoc:
......@@ -255,6 +288,30 @@ def resolve_config_for_connection(config_or_env)
db_config
end
def with_handler(handler_key, &blk)
handler = lookup_connection_handler(handler_key)
swap_connection_handler(handler, &blk)
end
def with_role(role, prevent_writes, &blk)
prevent_writes = true if role == reading_role
with_handler(role.to_sym) do
connection_handler.while_preventing_writes(prevent_writes, &blk)
end
end
def with_shard(connection_specification_name, pool_key, role, prevent_writes)
old_pool_key = current_pool_key
with_role(role, prevent_writes) do
self.current_pool_key = pool_key
yield
end
ensure
self.current_pool_key = old_pool_key
end
def swap_connection_handler(handler, &blk) # :nodoc:
old_handler, ActiveRecord::Base.connection_handler = ActiveRecord::Base.connection_handler, handler
return_value = yield
......
......@@ -132,6 +132,8 @@ def self.configurations
class_attribute :default_connection_handler, instance_writer: false
class_attribute :default_pool_key, instance_writer: false
self.filter_attributes = []
def self.connection_handler
......@@ -142,7 +144,16 @@ def self.connection_handler=(handler)
Thread.current.thread_variable_set(:ar_connection_handler, handler)
end
def self.current_pool_key
Thread.current.thread_variable_get(:ar_pool_key) || default_pool_key
end
def self.current_pool_key=(pool_key)
Thread.current.thread_variable_set(:ar_pool_key, pool_key)
end
self.default_connection_handler = ConnectionAdapters::ConnectionHandler.new
self.default_pool_key = :default
end
module ClassMethods
......
......@@ -215,14 +215,14 @@ def test_switching_connections_with_database_and_role_raises
ActiveRecord::Base.connected_to(database: :readonly, role: :writing) { }
end
end
assert_equal "connected_to can only accept a `database` or a `role` argument, but not both arguments.", error.message
assert_equal "`connected_to` cannot accept a `database` argument with any other arguments.", error.message
end
def test_switching_connections_without_database_and_role_raises
error = assert_raises(ArgumentError) do
ActiveRecord::Base.connected_to { }
end
assert_equal "must provide a `database` or a `role`.", error.message
assert_equal "must provide a `shard` and/or `role`.", error.message
end
def test_switching_connections_with_database_symbol_uses_default_role
......@@ -376,7 +376,7 @@ def test_connection_handlers_are_per_thread_and_not_per_fiber
reading_handler = ActiveRecord::Base.connection_handlers[:reading]
reading = ActiveRecord::Base.with_handler(:reading) do
reading = ActiveRecord::Base.connected_to(role: :reading) do
Person.connection_handler
end
......@@ -397,7 +397,7 @@ def test_connection_handlers_swapping_connections_in_fiber
r << ActiveRecord::Base.connection_handler
end
reading = ActiveRecord::Base.with_handler(:reading) do
reading = ActiveRecord::Base.connected_to(role: :reading) do
enum.next
end
......
# frozen_string_literal: true
require "cases/helper"
require "models/person"
module ActiveRecord
module ConnectionAdapters
class ConnectionHandlersShardingDbTest < ActiveRecord::TestCase
self.use_transactional_tests = false
fixtures :people
def setup
@handlers = { writing: ConnectionHandler.new, reading: ConnectionHandler.new }
@rw_handler = @handlers[:writing]
@ro_handler = @handlers[:reading]
@owner_name = "ActiveRecord::Base"
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", spec_name: "primary")
@rw_pool = @handlers[:writing].establish_connection(db_config)
@ro_pool = @handlers[:reading].establish_connection(db_config)
end
def teardown
ActiveRecord::Base.connection_handlers = { writing: ActiveRecord::Base.default_connection_handler }
end
unless in_memory_db?
def test_establish_connection_using_3_levels_config
previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env"
config = {
"default_env" => {
"primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" },
"primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" },
}
}
@prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config
ActiveRecord::Base.connects_to(shards: {
default: { writing: :primary },
shard_one: { writing: :primary_shard_one }
})
base_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base")
default_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :default)
assert_equal base_pool, default_pool
assert_equal "db/primary.sqlite3", default_pool.db_config.database
assert_equal "primary", default_pool.db_config.spec_name
assert_not_nil pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :shard_one)
assert_equal "db/primary_shard_one.sqlite3", pool.db_config.database
assert_equal "primary_shard_one", pool.db_config.spec_name
ensure
ActiveRecord::Base.configurations = @prev_configs
ActiveRecord::Base.establish_connection(:arunit)
ENV["RAILS_ENV"] = previous_env
end
def test_establish_connection_using_3_levels_config_with_shards_and_replica
previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env"
config = {
"default_env" => {
"primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" },
"primary_replica" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3", "replica" => true },
"primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" },
"primary_shard_one_replica" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3", "replica" => true }
}
}
@prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config
ActiveRecord::Base.connects_to(shards: {
default: { writing: :primary, reading: :primary_replica },
shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica }
})
default_writing_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :default)
base_writing_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base")
assert_equal base_writing_pool, default_writing_pool
assert_equal "db/primary.sqlite3", default_writing_pool.db_config.database
assert_equal "primary", default_writing_pool.db_config.spec_name
default_reading_pool = ActiveRecord::Base.connection_handlers[:reading].retrieve_connection_pool("ActiveRecord::Base", :default)
base_reading_pool = ActiveRecord::Base.connection_handlers[:reading].retrieve_connection_pool("ActiveRecord::Base")
assert_equal base_reading_pool, default_reading_pool
assert_equal "db/primary.sqlite3", default_reading_pool.db_config.database
assert_equal "primary_replica", default_reading_pool.db_config.spec_name
assert_not_nil pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :shard_one)
assert_equal "db/primary_shard_one.sqlite3", pool.db_config.database
assert_equal "primary_shard_one", pool.db_config.spec_name
assert_not_nil pool = ActiveRecord::Base.connection_handlers[:reading].retrieve_connection_pool("ActiveRecord::Base", :shard_one)
assert_equal "db/primary_shard_one.sqlite3", pool.db_config.database
assert_equal "primary_shard_one_replica", pool.db_config.spec_name
ensure
ActiveRecord::Base.configurations = @prev_configs
ActiveRecord::Base.establish_connection(:arunit)
ENV["RAILS_ENV"] = previous_env
end
def test_switching_connections_via_handler
previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env"
config = {
"default_env" => {
"primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" },
"primary_replica" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3", "replica" => true },
"primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" },
"primary_shard_one_replica" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3", "replica" => true }
}
}
@prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config
ActiveRecord::Base.connects_to(shards: {
default: { writing: :primary, reading: :primary_replica },
shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica }
})
ActiveRecord::Base.connected_to(role: :reading, shard: :default) do
@ro_handler = ActiveRecord::Base.connection_handler
assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:reading]
assert_equal :reading, ActiveRecord::Base.current_role
assert ActiveRecord::Base.connected_to?(role: :reading, shard: :default)
assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :default)
assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one)
assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one)
assert_predicate ActiveRecord::Base.connection, :preventing_writes?
end
ActiveRecord::Base.connected_to(role: :writing, shard: :default) do
assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:writing]
assert_not_equal @ro_handler, ActiveRecord::Base.connection_handler
assert_equal :writing, ActiveRecord::Base.current_role
assert ActiveRecord::Base.connected_to?(role: :writing, shard: :default)
assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :default)
assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one)
assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one)
assert_not_predicate ActiveRecord::Base.connection, :preventing_writes?
end
ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one) do
@ro_handler = ActiveRecord::Base.connection_handler
assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:reading]
assert_equal :reading, ActiveRecord::Base.current_role
assert ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one)
assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one)
assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :default)
assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :default)
assert_predicate ActiveRecord::Base.connection, :preventing_writes?
end
ActiveRecord::Base.connected_to(role: :writing, shard: :shard_one) do
assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:writing]
assert_not_equal @ro_handler, ActiveRecord::Base.connection_handler
assert_equal :writing, ActiveRecord::Base.current_role
assert ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one)
assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one)
assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :default)
assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :default)
assert_not_predicate ActiveRecord::Base.connection, :preventing_writes?
end
ensure
ActiveRecord::Base.configurations = @prev_configs
ActiveRecord::Base.establish_connection(:arunit)
ENV["RAILS_ENV"] = previous_env
FileUtils.rm_rf("db")
end
def test_retrieves_proper_connection_with_nested_connected_to
previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env"
config = {
"default_env" => {
"primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" },
"primary_replica" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3", "replica" => true },
"primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" },
"primary_shard_one_replica" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3", "replica" => true }
}
}
@prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config
ActiveRecord::Base.connects_to(shards: {
default: { writing: :primary, reading: :primary_replica },
shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica }
})
ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one) do
# Uses the correct connection
assert_equal "primary_shard_one_replica", ActiveRecord::Base.connection_pool.db_config.spec_name
# Uses the shard currently in use
ActiveRecord::Base.connected_to(role: :writing) do
assert_equal "primary_shard_one", ActiveRecord::Base.connection_pool.db_config.spec_name
end
# Allows overriding the shard as well
ActiveRecord::Base.connected_to(role: :reading, shard: :default) do
assert_equal "primary_replica", ActiveRecord::Base.connection_pool.db_config.spec_name
end
# Uses the current role
ActiveRecord::Base.connected_to(shard: :default) do
assert_equal "primary_replica", ActiveRecord::Base.connection_pool.db_config.spec_name
end
# Resets correctly
assert_equal "primary_shard_one_replica", ActiveRecord::Base.connection_pool.db_config.spec_name
end
ensure
ActiveRecord::Base.configurations = @prev_configs
ActiveRecord::Base.establish_connection(:arunit)
ENV["RAILS_ENV"] = previous_env
FileUtils.rm_rf("db")
end
def test_connected_to_raises_without_a_shard_or_role
error = assert_raises(ArgumentError) do
ActiveRecord::Base.connected_to { }
end
assert_equal "must provide a `shard` and/or `role`.", error.message
end
def test_connects_to_raises_with_a_shard_and_database_key
error = assert_raises(ArgumentError) do
ActiveRecord::Base.connects_to(database: { writing: :arunit }, shards: { shard_one: { writing: :arunit } })
end
assert_equal "connects_to can only accept a `database` or `shards` argument, but not both arguments.", error.message
end
def test_retrieve_connection_pool_with_invalid_shard
assert_not_nil @rw_handler.retrieve_connection_pool("ActiveRecord::Base")
assert_nil @rw_handler.retrieve_connection_pool("ActiveRecord::Base", :foo)
assert_not_nil @ro_handler.retrieve_connection_pool("ActiveRecord::Base")
assert_nil @ro_handler.retrieve_connection_pool("ActiveRecord::Base", :foo)
end
def test_calling_connected_to_on_a_non_existent_shard_raises
ActiveRecord::Base.connects_to(shards: {
default: { writing: :arunit, reading: :arunit }
})
error = assert_raises ActiveRecord::ConnectionNotEstablished do
ActiveRecord::Base.connected_to(role: :reading, shard: :foo) do
Person.first
end
end
assert_equal "No connection pool for 'ActiveRecord::Base' found for the 'foo' shard.", error.message
end
end
class ShardConnectionTestModel < ActiveRecord::Base
end
class ShardConnectionTestModelB < ActiveRecord::Base
end
def test_same_shards_across_clusters
ShardConnectionTestModel.connects_to shards: { one: { writing: { database: ":memory:", adapter: "sqlite3" } } }
ShardConnectionTestModelB.connects_to shards: { one: { writing: { database: ":memory:", adapter: "sqlite3" } } }
ActiveRecord::Base.connected_to(shard: :one) do
ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))")
ShardConnectionTestModel.create!(shard_key: "test_model_default")
ShardConnectionTestModelB.connection.execute("CREATE TABLE `shard_connection_test_model_bs` (shard_key VARCHAR (255))")
ShardConnectionTestModelB.create!(shard_key: "test_model_b_default")
assert_equal "test_model_default", ShardConnectionTestModel.where(shard_key: "test_model_default").first.shard_key
assert_equal "test_model_b_default", ShardConnectionTestModelB.where(shard_key: "test_model_b_default").first.shard_key
end
end
def test_sharding_separation
ShardConnectionTestModel.connects_to shards: {
default: { writing: { database: ":memory:", adapter: "sqlite3" } },
one: { writing: { database: ":memory:", adapter: "sqlite3" } }
}
[:default, :one].each do |shard_name|
ActiveRecord::Base.connected_to(shard: shard_name) do
ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))")
end
end
# Create a record on :default
ShardConnectionTestModel.create!(shard_key: "foo")
# Make sure we can read it when explicitly connecting to :default
ActiveRecord::Base.connected_to(shard: :default) do
assert ShardConnectionTestModel.find_by_shard_key("foo")
end
# Switch to shard and make sure we can't read the record from :default
# Also add a new record on :one
ActiveRecord::Base.connected_to(shard: :one) do
assert_not ShardConnectionTestModel.find_by_shard_key("foo")
ShardConnectionTestModel.create!(shard_key: "bar")
end
# Make sure we can't read the record from :one but can read the record
# from :default
assert_not ShardConnectionTestModel.find_by_shard_key("bar")
assert ShardConnectionTestModel.find_by_shard_key("foo")
end
def test_swapping_shards_in_a_multi_threaded_environment
tf_default = Tempfile.open "shard_key_default"
tf_shard_one = Tempfile.open "shard_key_one"
ShardConnectionTestModel.connects_to shards: {
default: { writing: { database: tf_default.path, adapter: "sqlite3" } },
one: { writing: { database: tf_shard_one.path, adapter: "sqlite3" } }
}
[:default, :one].each do |shard_name|
ActiveRecord::Base.connected_to(shard: shard_name) do
ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))")
ShardConnectionTestModel.connection.execute("INSERT INTO `shard_connection_test_models` VALUES ('shard_key_#{shard_name}')")
end
end
shard_one_latch = Concurrent::CountDownLatch.new
shard_default_latch = Concurrent::CountDownLatch.new
ShardConnectionTestModel.connection
thread = Thread.new do
ShardConnectionTestModel.connection
shard_default_latch.wait
assert_equal "shard_key_default", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models")
shard_one_latch.count_down
end
ActiveRecord::Base.connected_to(shard: :one) do
shard_default_latch.count_down
assert_equal "shard_key_one", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models")
shard_one_latch.wait
end
thread.join
ensure
tf_shard_one.close
tf_shard_one.unlink
tf_default.close
tf_default.unlink
end
def test_swapping_shards_and_roles_in_a_multi_threaded_environment
tf_default = Tempfile.open "shard_key_default"
tf_shard_one = Tempfile.open "shard_key_one"
tf_default_reading = Tempfile.open "shard_key_default_reading"
tf_shard_one_reading = Tempfile.open "shard_key_one_reading"
ShardConnectionTestModel.connects_to shards: {
default: { writing: { database: tf_default.path, adapter: "sqlite3" }, secondary: { database: tf_default_reading.path, adapter: "sqlite3" } },
one: { writing: { database: tf_shard_one.path, adapter: "sqlite3" }, secondary: { database: tf_shard_one_reading.path, adapter: "sqlite3" } }
}
[:default, :one].each do |shard_name|
ActiveRecord::Base.connected_to(shard: shard_name) do
ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))")
ShardConnectionTestModel.connection.execute("INSERT INTO `shard_connection_test_models` VALUES ('shard_key_#{shard_name}')")
end
ActiveRecord::Base.connected_to(shard: shard_name, role: :secondary) do
ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))")
ShardConnectionTestModel.connection.execute("INSERT INTO `shard_connection_test_models` VALUES ('shard_key_#{shard_name}_secondary')")
end
end
shard_one_latch = Concurrent::CountDownLatch.new
shard_default_latch = Concurrent::CountDownLatch.new
ShardConnectionTestModel.connection
thread = Thread.new do
ShardConnectionTestModel.connection
shard_default_latch.wait
assert_equal "shard_key_default", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models")
shard_one_latch.count_down
end
ActiveRecord::Base.connected_to(shard: :one, role: :secondary) do
shard_default_latch.count_down
assert_equal "shard_key_one_secondary", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models")
shard_one_latch.wait
end
thread.join
ensure
tf_shard_one.close
tf_shard_one.unlink
tf_default.close
tf_default.unlink
tf_shard_one_reading.close
tf_shard_one_reading.unlink
tf_default_reading.close
tf_default_reading.unlink
end
end
end
end
......@@ -9,6 +9,7 @@ After reading this guide you will know:
* How to set up your application for multiple databases.
* How automatic connection switching works.
* How to use horizontal sharding for multiple databases.
* What features are supported and what's still a work in progress.
--------------------------------------------------------------------------------
......@@ -29,7 +30,7 @@ databases
The following features are not (yet) supported:
* Sharding
* Automatic swapping for horizontal sharding
* Joining across clusters
* Load balancing replicas
* Dumping schema caches for multiple databases
......@@ -259,15 +260,75 @@ like `connected_to(role: :nonexistent)` you will get an error that says
`ActiveRecord::ConnectionNotEstablished (No connection pool with 'AnimalsBase' found
for the 'nonexistent' role.)`
## Horizontal sharding
Horizontal sharding is when you split up your database to reduce the number of rows on each
database server, but maintain the same schema across "shards". This is commonly called "multi-tenant"
sharding.
The API for supporting horizontal sharding in Rails is similar to the multiple database / vertical
sharding API that's existed since Rails 6.0.
Shards are declared in the three-tier config like this:
```yaml
production:
primary:
database: my_primary_database
adapter: mysql
primary_replica:
database: my_primary_database
adapter: mysql
replica: true
primary_shard_one:
database: my_primary_shard_one
adapter: mysql
primary_shard_one_replica:
database: my_primary_shard_one
adapter: mysql
replica: true
```
Models are then connected with the `connects_to` API via the `shards` key:
```ruby
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true
connects_to shards: {
default: { writing: :primary, reading: :primary_replica },
shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica }
}
end
Then models can swap connections manually via the `connected_to` API:
```ruby
ActiveRecord::Base.connected_to(shard: :default) do
@id = Record.create! # creates a record in shard one
end
ActiveRecord::Base.connected_to(shard: :shard_one) do
Record.find(@id) # can't find record, doesn't exist
end
```
The horizontal sharding API also supports read replicas. You can swap the
role and the shard with the `connected_to` API.
```ruby
ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one) do
Record.first # lookup record from read replica of shard one
end
```
## Caveats
### Sharding
### Automatic swapping for horizontal sharding
As noted at the top, Rails doesn't (yet) support sharding. We had to do a lot of work
to support multiple databases for Rails 6.0. The lack of support for sharding isn't
an oversight, but does require additional work that didn't make it in for 6.0. For now
if you need sharding it may be advisable to continue using one of the many gems
that supports this.
While Rails now supports an API for connecting to and swapping connections of shards, it does
not yet support an automatic swapping strategy. Any shard swapping will need to be done manually
in your app via a middleware or `around_action`.
### Load Balancing Replicas
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册