From 384e7d139eb3c2da1ff857033605284d6e70aba7 Mon Sep 17 00:00:00 2001 From: eileencodes Date: Fri, 24 Jan 2020 09:01:32 -0500 Subject: [PATCH] Add support for horizontal sharding Applications can now connect to multiple shards and switch between their shards in an application. Note that the shard swapping is still a manual process as this change does not include an API for automatic shard swapping. Usage: Given the following configuration: ```yaml production: primary: database: my_database primary_shard_one: database: my_database_shard_one ``` Connect to multiple shards: ```ruby class ApplicationRecord < ActiveRecord::Base self.abstract_class = true connects_to shards: { default: { writing: :primary }, shard_one: { writing: :primary_shard_one } } ``` Swap between shards in your controller / model code: ```ruby ActiveRecord::Base.connected_to(shard: :shard_one) do # Read from shard one end ``` The horizontal sharding API also supports read replicas. See guides for more details. This PR also moves some no-doc'd methods into the private namespace as they were unnecessarily public. We've updated some error messages and documentation. Co-authored-by: John Crepezzi --- activerecord/CHANGELOG.md | 40 ++ .../abstract/connection_pool.rb | 25 +- .../lib/active_record/connection_handling.rb | 117 +++-- activerecord/lib/active_record/core.rb | 11 + .../connection_handlers_multi_db_test.rb | 8 +- .../connection_handlers_sharding_db_test.rb | 412 ++++++++++++++++++ .../active_record_multiple_databases.md | 75 +++- 7 files changed, 636 insertions(+), 52 deletions(-) create mode 100644 activerecord/test/cases/connection_adapters/connection_handlers_sharding_db_test.rb diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 614c2f2e3d..ca5eae7976 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,3 +1,43 @@ +* Add support for horizontal sharding to `connects_to` and `connected_to`. + + Applications can now connect to multiple shards and switch between their shards in an application. Note that the shard swapping is still a manual process as this change does not include an API for automatic shard swapping. + + Usage: + + Given the following configuration: + + ```yaml + # config/database.yml + production: + primary: + database: my_database + primary_shard_one: + database: my_database_shard_one + ``` + + Connect to multiple shards: + + ```ruby + class ApplicationRecord < ActiveRecord::Base + self.abstract_class = true + + connects_to shards: { + default: { writing: :primary }, + shard_one: { writing: :primary_shard_one } + } + ``` + + Swap between shards in your controller / model code: + + ```ruby + ActiveRecord::Base.connected_to(shard: :shard_one) do + # Read from shard one + end + ``` + + The horizontal sharding API also supports read replicas. See guides for more details. + + *Eileen M. Uchitelle*, *John Crepezzi* * Deprecate `spec_name` in favor of `name` on database configurations The accessors for `spec_name` on `configs_for` and `DatabaseConfig` are deprecated. Please use `name` instead. diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 589573cfe9..dab6ce272b 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -1039,7 +1039,7 @@ def connection_pool_list end alias :connection_pools :connection_pool_list - def establish_connection(config, pool_key = :default) + def establish_connection(config, pool_key = ActiveRecord::Base.default_pool_key) pool_config = resolve_pool_config(config) db_config = pool_config.db_config @@ -1100,16 +1100,19 @@ def flush_idle_connections! # active or defined connection: if it is the latter, it will be # opened and set as the active connection for the class it was defined # for (not necessarily the current class). - def retrieve_connection(spec_name) # :nodoc: - pool = retrieve_connection_pool(spec_name) + def retrieve_connection(spec_name, pool_key = ActiveRecord::Base.default_pool_key) # :nodoc: + pool = retrieve_connection_pool(spec_name, pool_key) unless pool - # multiple database application - if ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler - raise ConnectionNotEstablished, "No connection pool for '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role." + if pool_key != ActiveRecord::Base.default_pool_key + message = "No connection pool for '#{spec_name}' found for the '#{pool_key}' shard." + elsif ActiveRecord::Base.connection_handler != ActiveRecord::Base.default_connection_handler + message = "No connection pool for '#{spec_name}' found for the '#{ActiveRecord::Base.current_role}' role." else - raise ConnectionNotEstablished, "No connection pool for '#{spec_name}' found." + message = "No connection pool for '#{spec_name}' found." end + + raise ConnectionNotEstablished, message end pool.connection @@ -1117,7 +1120,7 @@ def retrieve_connection(spec_name) # :nodoc: # Returns true if a connection that's accessible to this class has # already been opened. - def connected?(spec_name, pool_key = :default) + def connected?(spec_name, pool_key = ActiveRecord::Base.default_pool_key) pool = retrieve_connection_pool(spec_name, pool_key) pool && pool.connected? end @@ -1126,12 +1129,12 @@ def connected?(spec_name, pool_key = :default) # connection and the defined connection (if they exist). The result # can be used as an argument for #establish_connection, for easily # re-establishing the connection. - def remove_connection(owner, pool_key = :default) + def remove_connection(owner, pool_key = ActiveRecord::Base.default_pool_key) remove_connection_pool(owner, pool_key)&.configuration_hash end deprecate remove_connection: "Use #remove_connection_pool, which now returns a DatabaseConfig object instead of a Hash" - def remove_connection_pool(owner, pool_key = :default) + def remove_connection_pool(owner, pool_key = ActiveRecord::Base.default_pool_key) if pool_manager = get_pool_manager(owner) pool_config = pool_manager.remove_pool_config(pool_key) @@ -1145,7 +1148,7 @@ def remove_connection_pool(owner, pool_key = :default) # Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool_manager. # This makes retrieving the connection pool O(1) once the process is warm. # When a connection is established or removed, we invalidate the cache. - def retrieve_connection_pool(owner, pool_key = :default) + def retrieve_connection_pool(owner, pool_key = ActiveRecord::Base.default_pool_key) pool_config = get_pool_manager(owner)&.get_pool_config(pool_key) pool_config&.pool end diff --git a/activerecord/lib/active_record/connection_handling.rb b/activerecord/lib/active_record/connection_handling.rb index bb652ba760..ce538e201c 100644 --- a/activerecord/lib/active_record/connection_handling.rb +++ b/activerecord/lib/active_record/connection_handling.rb @@ -48,7 +48,7 @@ module ConnectionHandling # may be returned on an error. def establish_connection(config_or_env = nil) db_config = resolve_config_for_connection(config_or_env) - connection_handler.establish_connection(db_config) + connection_handler.establish_connection(db_config, current_pool_key) end # Connects a model to the databases specified. The +database+ keyword @@ -64,8 +64,24 @@ def establish_connection(config_or_env = nil) # connects_to database: { writing: :primary, reading: :primary_replica } # end # - # Returns an array of established connections. - def connects_to(database: {}) + # +connects_to+ also supports horizontal sharding. The horizontal sharding API + # also supports read replicas. Connect a model to a list of shards like this: + # + # class AnimalsModel < ApplicationRecord + # self.abstract_class = true + # + # connects_to shards: { + # default: { writing: :primary, reading: :primary_replica }, + # shard_two: { writing: :primary_shard_two, reading: :primary_shard_replica_two } + # } + # end + # + # Returns an array of database connections. + def connects_to(database: {}, shards: {}) + if database.present? && shards.present? + raise ArgumentError, "connects_to can only accept a `database` or `shards` argument, but not both arguments." + end + connections = [] database.each do |role, database_key| @@ -75,14 +91,25 @@ def connects_to(database: {}) connections << handler.establish_connection(db_config) end + shards.each do |pool_key, database_keys| + database_keys.each do |role, database_key| + db_config = resolve_config_for_connection(database_key) + handler = lookup_connection_handler(role.to_sym) + + connections << handler.establish_connection(db_config, pool_key.to_sym) + end + end + connections end - # Connects to a database or role (ex writing, reading, or another - # custom role) for the duration of the block. + # Connects to a role (ex writing, reading or a custom role) and/or + # shard for the duration of the block. At the end of the block the + # connection will be returned to the original role / shard. # - # If a role is passed, Active Record will look up the connection - # based on the requested role: + # If only a role is passed, Active Record will look up the connection + # based on the requested role. If a non-established role is requested + # an `ActiveRecord::ConnectionNotEstablished` error will be raised: # # ActiveRecord::Base.connected_to(role: :writing) do # Dog.create! # creates dog using dog writing connection @@ -92,16 +119,29 @@ def connects_to(database: {}) # Dog.create! # throws exception because we're on a replica # end # - # ActiveRecord::Base.connected_to(role: :unknown_role) do - # # raises exception due to non-existent role + # If only a shard is passed, Active Record will look up the shard on the + # current role. If a non-existent shard is passed, an + # `ActiveRecord::ConnectionNotEstablished` error will be raised. + # + # ActiveRecord::Base.connected_to(shard: :default) do + # # Dog.create! # creates dog in shard with the default key # end - def connected_to(database: nil, role: nil, prevent_writes: false, &blk) + # + # If a shard and role is passed, Active Record will first lookup the role, + # and then look up the connection by shard key. + # + # ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one_replica) do + # # Dog.create! # would raise as we're on a readonly connection + # end + # + # The database kwarg is deprecated and will be removed in 6.2.0 without replacement. + def connected_to(database: nil, role: nil, shard: nil, prevent_writes: false, &blk) if database ActiveSupport::Deprecation.warn("The database key in `connected_to` is deprecated. It will be removed in Rails 6.2.0 without replacement.") end - if database && role - raise ArgumentError, "connected_to can only accept a `database` or a `role` argument, but not both arguments." + if database && (role || shard) + raise ArgumentError, "`connected_to` cannot accept a `database` argument with any other arguments." elsif database if database.is_a?(Hash) role, database = database.first @@ -114,14 +154,12 @@ def connected_to(database: nil, role: nil, prevent_writes: false, &blk) handler.establish_connection(db_config) with_handler(role, &blk) + elsif shard + with_shard(connection_specification_name, shard, role || current_role, prevent_writes, &blk) elsif role - prevent_writes = true if role == reading_role - - with_handler(role.to_sym) do - connection_handler.while_preventing_writes(prevent_writes, &blk) - end + with_role(role, prevent_writes, &blk) else - raise ArgumentError, "must provide a `database` or a `role`." + raise ArgumentError, "must provide a `shard` and/or `role`." end end @@ -131,8 +169,8 @@ def connected_to(database: nil, role: nil, prevent_writes: false, &blk) # ActiveRecord::Base.connected_to?(role: :writing) #=> true # ActiveRecord::Base.connected_to?(role: :reading) #=> false # end - def connected_to?(role:) - current_role == role.to_sym + def connected_to?(role:, shard: ActiveRecord::Base.default_pool_key) + current_role == role.to_sym && current_pool_key == shard.to_sym end # Returns the symbol representing the current connected role. @@ -153,11 +191,6 @@ def lookup_connection_handler(handler_key) # :nodoc: connection_handlers[handler_key] ||= ActiveRecord::ConnectionAdapters::ConnectionHandler.new end - def with_handler(handler_key, &blk) # :nodoc: - handler = lookup_connection_handler(handler_key) - swap_connection_handler(handler, &blk) - end - # Clears the query cache for all connections associated with the current thread. def clear_query_caches_for_current_thread ActiveRecord::Base.connection_handlers.each_value do |handler| @@ -211,16 +244,16 @@ def connection_db_config end def connection_pool - connection_handler.retrieve_connection_pool(connection_specification_name) || raise(ConnectionNotEstablished) + connection_handler.retrieve_connection_pool(connection_specification_name, current_pool_key) || raise(ConnectionNotEstablished) end def retrieve_connection - connection_handler.retrieve_connection(connection_specification_name) + connection_handler.retrieve_connection(connection_specification_name, current_pool_key) end # Returns +true+ if Active Record is connected. def connected? - connection_handler.connected?(connection_specification_name) + connection_handler.connected?(connection_specification_name, current_pool_key) end def remove_connection(name = nil) @@ -228,11 +261,11 @@ def remove_connection(name = nil) # if removing a connection that has a pool, we reset the # connection_specification_name so it will use the parent # pool. - if connection_handler.retrieve_connection_pool(name) + if connection_handler.retrieve_connection_pool(name, current_pool_key) self.connection_specification_name = nil end - connection_handler.remove_connection_pool(name) + connection_handler.remove_connection_pool(name, current_pool_key) end def clear_cache! # :nodoc: @@ -255,6 +288,30 @@ def resolve_config_for_connection(config_or_env) db_config end + def with_handler(handler_key, &blk) + handler = lookup_connection_handler(handler_key) + swap_connection_handler(handler, &blk) + end + + def with_role(role, prevent_writes, &blk) + prevent_writes = true if role == reading_role + + with_handler(role.to_sym) do + connection_handler.while_preventing_writes(prevent_writes, &blk) + end + end + + def with_shard(connection_specification_name, pool_key, role, prevent_writes) + old_pool_key = current_pool_key + + with_role(role, prevent_writes) do + self.current_pool_key = pool_key + yield + end + ensure + self.current_pool_key = old_pool_key + end + def swap_connection_handler(handler, &blk) # :nodoc: old_handler, ActiveRecord::Base.connection_handler = ActiveRecord::Base.connection_handler, handler return_value = yield diff --git a/activerecord/lib/active_record/core.rb b/activerecord/lib/active_record/core.rb index f2e2d2978e..4e5af4ac00 100644 --- a/activerecord/lib/active_record/core.rb +++ b/activerecord/lib/active_record/core.rb @@ -132,6 +132,8 @@ def self.configurations class_attribute :default_connection_handler, instance_writer: false + class_attribute :default_pool_key, instance_writer: false + self.filter_attributes = [] def self.connection_handler @@ -142,7 +144,16 @@ def self.connection_handler=(handler) Thread.current.thread_variable_set(:ar_connection_handler, handler) end + def self.current_pool_key + Thread.current.thread_variable_get(:ar_pool_key) || default_pool_key + end + + def self.current_pool_key=(pool_key) + Thread.current.thread_variable_set(:ar_pool_key, pool_key) + end + self.default_connection_handler = ConnectionAdapters::ConnectionHandler.new + self.default_pool_key = :default end module ClassMethods diff --git a/activerecord/test/cases/connection_adapters/connection_handlers_multi_db_test.rb b/activerecord/test/cases/connection_adapters/connection_handlers_multi_db_test.rb index 97796ebfa7..5e2fd952e9 100644 --- a/activerecord/test/cases/connection_adapters/connection_handlers_multi_db_test.rb +++ b/activerecord/test/cases/connection_adapters/connection_handlers_multi_db_test.rb @@ -215,14 +215,14 @@ def test_switching_connections_with_database_and_role_raises ActiveRecord::Base.connected_to(database: :readonly, role: :writing) { } end end - assert_equal "connected_to can only accept a `database` or a `role` argument, but not both arguments.", error.message + assert_equal "`connected_to` cannot accept a `database` argument with any other arguments.", error.message end def test_switching_connections_without_database_and_role_raises error = assert_raises(ArgumentError) do ActiveRecord::Base.connected_to { } end - assert_equal "must provide a `database` or a `role`.", error.message + assert_equal "must provide a `shard` and/or `role`.", error.message end def test_switching_connections_with_database_symbol_uses_default_role @@ -376,7 +376,7 @@ def test_connection_handlers_are_per_thread_and_not_per_fiber reading_handler = ActiveRecord::Base.connection_handlers[:reading] - reading = ActiveRecord::Base.with_handler(:reading) do + reading = ActiveRecord::Base.connected_to(role: :reading) do Person.connection_handler end @@ -397,7 +397,7 @@ def test_connection_handlers_swapping_connections_in_fiber r << ActiveRecord::Base.connection_handler end - reading = ActiveRecord::Base.with_handler(:reading) do + reading = ActiveRecord::Base.connected_to(role: :reading) do enum.next end diff --git a/activerecord/test/cases/connection_adapters/connection_handlers_sharding_db_test.rb b/activerecord/test/cases/connection_adapters/connection_handlers_sharding_db_test.rb new file mode 100644 index 0000000000..c2fe3c28bb --- /dev/null +++ b/activerecord/test/cases/connection_adapters/connection_handlers_sharding_db_test.rb @@ -0,0 +1,412 @@ +# frozen_string_literal: true + +require "cases/helper" +require "models/person" + +module ActiveRecord + module ConnectionAdapters + class ConnectionHandlersShardingDbTest < ActiveRecord::TestCase + self.use_transactional_tests = false + + fixtures :people + + def setup + @handlers = { writing: ConnectionHandler.new, reading: ConnectionHandler.new } + @rw_handler = @handlers[:writing] + @ro_handler = @handlers[:reading] + @owner_name = "ActiveRecord::Base" + db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", spec_name: "primary") + @rw_pool = @handlers[:writing].establish_connection(db_config) + @ro_pool = @handlers[:reading].establish_connection(db_config) + end + + def teardown + ActiveRecord::Base.connection_handlers = { writing: ActiveRecord::Base.default_connection_handler } + end + + unless in_memory_db? + def test_establish_connection_using_3_levels_config + previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env" + + config = { + "default_env" => { + "primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" }, + "primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" }, + } + } + + @prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config + + ActiveRecord::Base.connects_to(shards: { + default: { writing: :primary }, + shard_one: { writing: :primary_shard_one } + }) + + base_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base") + default_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :default) + + assert_equal base_pool, default_pool + assert_equal "db/primary.sqlite3", default_pool.db_config.database + assert_equal "primary", default_pool.db_config.spec_name + + assert_not_nil pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :shard_one) + assert_equal "db/primary_shard_one.sqlite3", pool.db_config.database + assert_equal "primary_shard_one", pool.db_config.spec_name + ensure + ActiveRecord::Base.configurations = @prev_configs + ActiveRecord::Base.establish_connection(:arunit) + ENV["RAILS_ENV"] = previous_env + end + + def test_establish_connection_using_3_levels_config_with_shards_and_replica + previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env" + + config = { + "default_env" => { + "primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" }, + "primary_replica" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3", "replica" => true }, + "primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" }, + "primary_shard_one_replica" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3", "replica" => true } + } + } + + @prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config + + ActiveRecord::Base.connects_to(shards: { + default: { writing: :primary, reading: :primary_replica }, + shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica } + }) + + default_writing_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :default) + base_writing_pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base") + assert_equal base_writing_pool, default_writing_pool + assert_equal "db/primary.sqlite3", default_writing_pool.db_config.database + assert_equal "primary", default_writing_pool.db_config.spec_name + + default_reading_pool = ActiveRecord::Base.connection_handlers[:reading].retrieve_connection_pool("ActiveRecord::Base", :default) + base_reading_pool = ActiveRecord::Base.connection_handlers[:reading].retrieve_connection_pool("ActiveRecord::Base") + assert_equal base_reading_pool, default_reading_pool + assert_equal "db/primary.sqlite3", default_reading_pool.db_config.database + assert_equal "primary_replica", default_reading_pool.db_config.spec_name + + assert_not_nil pool = ActiveRecord::Base.connection_handlers[:writing].retrieve_connection_pool("ActiveRecord::Base", :shard_one) + assert_equal "db/primary_shard_one.sqlite3", pool.db_config.database + assert_equal "primary_shard_one", pool.db_config.spec_name + + assert_not_nil pool = ActiveRecord::Base.connection_handlers[:reading].retrieve_connection_pool("ActiveRecord::Base", :shard_one) + assert_equal "db/primary_shard_one.sqlite3", pool.db_config.database + assert_equal "primary_shard_one_replica", pool.db_config.spec_name + ensure + ActiveRecord::Base.configurations = @prev_configs + ActiveRecord::Base.establish_connection(:arunit) + ENV["RAILS_ENV"] = previous_env + end + + def test_switching_connections_via_handler + previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env" + + config = { + "default_env" => { + "primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" }, + "primary_replica" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3", "replica" => true }, + "primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" }, + "primary_shard_one_replica" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3", "replica" => true } + } + } + + @prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config + + ActiveRecord::Base.connects_to(shards: { + default: { writing: :primary, reading: :primary_replica }, + shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica } + }) + + ActiveRecord::Base.connected_to(role: :reading, shard: :default) do + @ro_handler = ActiveRecord::Base.connection_handler + assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:reading] + assert_equal :reading, ActiveRecord::Base.current_role + assert ActiveRecord::Base.connected_to?(role: :reading, shard: :default) + assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :default) + assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one) + assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one) + assert_predicate ActiveRecord::Base.connection, :preventing_writes? + end + + ActiveRecord::Base.connected_to(role: :writing, shard: :default) do + assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:writing] + assert_not_equal @ro_handler, ActiveRecord::Base.connection_handler + assert_equal :writing, ActiveRecord::Base.current_role + assert ActiveRecord::Base.connected_to?(role: :writing, shard: :default) + assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :default) + assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one) + assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one) + assert_not_predicate ActiveRecord::Base.connection, :preventing_writes? + end + + ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one) do + @ro_handler = ActiveRecord::Base.connection_handler + assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:reading] + assert_equal :reading, ActiveRecord::Base.current_role + assert ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one) + assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one) + assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :default) + assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :default) + assert_predicate ActiveRecord::Base.connection, :preventing_writes? + end + + ActiveRecord::Base.connected_to(role: :writing, shard: :shard_one) do + assert_equal ActiveRecord::Base.connection_handler, ActiveRecord::Base.connection_handlers[:writing] + assert_not_equal @ro_handler, ActiveRecord::Base.connection_handler + assert_equal :writing, ActiveRecord::Base.current_role + assert ActiveRecord::Base.connected_to?(role: :writing, shard: :shard_one) + assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :shard_one) + assert_not ActiveRecord::Base.connected_to?(role: :reading, shard: :default) + assert_not ActiveRecord::Base.connected_to?(role: :writing, shard: :default) + assert_not_predicate ActiveRecord::Base.connection, :preventing_writes? + end + ensure + ActiveRecord::Base.configurations = @prev_configs + ActiveRecord::Base.establish_connection(:arunit) + ENV["RAILS_ENV"] = previous_env + FileUtils.rm_rf("db") + end + + def test_retrieves_proper_connection_with_nested_connected_to + previous_env, ENV["RAILS_ENV"] = ENV["RAILS_ENV"], "default_env" + + config = { + "default_env" => { + "primary" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3" }, + "primary_replica" => { "adapter" => "sqlite3", "database" => "db/primary.sqlite3", "replica" => true }, + "primary_shard_one" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3" }, + "primary_shard_one_replica" => { "adapter" => "sqlite3", "database" => "db/primary_shard_one.sqlite3", "replica" => true } + } + } + + @prev_configs, ActiveRecord::Base.configurations = ActiveRecord::Base.configurations, config + + ActiveRecord::Base.connects_to(shards: { + default: { writing: :primary, reading: :primary_replica }, + shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica } + }) + + ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one) do + # Uses the correct connection + assert_equal "primary_shard_one_replica", ActiveRecord::Base.connection_pool.db_config.spec_name + + # Uses the shard currently in use + ActiveRecord::Base.connected_to(role: :writing) do + assert_equal "primary_shard_one", ActiveRecord::Base.connection_pool.db_config.spec_name + end + + # Allows overriding the shard as well + ActiveRecord::Base.connected_to(role: :reading, shard: :default) do + assert_equal "primary_replica", ActiveRecord::Base.connection_pool.db_config.spec_name + end + + # Uses the current role + ActiveRecord::Base.connected_to(shard: :default) do + assert_equal "primary_replica", ActiveRecord::Base.connection_pool.db_config.spec_name + end + + # Resets correctly + assert_equal "primary_shard_one_replica", ActiveRecord::Base.connection_pool.db_config.spec_name + end + ensure + ActiveRecord::Base.configurations = @prev_configs + ActiveRecord::Base.establish_connection(:arunit) + ENV["RAILS_ENV"] = previous_env + FileUtils.rm_rf("db") + end + + def test_connected_to_raises_without_a_shard_or_role + error = assert_raises(ArgumentError) do + ActiveRecord::Base.connected_to { } + end + assert_equal "must provide a `shard` and/or `role`.", error.message + end + + def test_connects_to_raises_with_a_shard_and_database_key + error = assert_raises(ArgumentError) do + ActiveRecord::Base.connects_to(database: { writing: :arunit }, shards: { shard_one: { writing: :arunit } }) + end + assert_equal "connects_to can only accept a `database` or `shards` argument, but not both arguments.", error.message + end + + def test_retrieve_connection_pool_with_invalid_shard + assert_not_nil @rw_handler.retrieve_connection_pool("ActiveRecord::Base") + assert_nil @rw_handler.retrieve_connection_pool("ActiveRecord::Base", :foo) + + assert_not_nil @ro_handler.retrieve_connection_pool("ActiveRecord::Base") + assert_nil @ro_handler.retrieve_connection_pool("ActiveRecord::Base", :foo) + end + + def test_calling_connected_to_on_a_non_existent_shard_raises + ActiveRecord::Base.connects_to(shards: { + default: { writing: :arunit, reading: :arunit } + }) + + error = assert_raises ActiveRecord::ConnectionNotEstablished do + ActiveRecord::Base.connected_to(role: :reading, shard: :foo) do + Person.first + end + end + + assert_equal "No connection pool for 'ActiveRecord::Base' found for the 'foo' shard.", error.message + end + end + + class ShardConnectionTestModel < ActiveRecord::Base + end + + class ShardConnectionTestModelB < ActiveRecord::Base + end + + def test_same_shards_across_clusters + ShardConnectionTestModel.connects_to shards: { one: { writing: { database: ":memory:", adapter: "sqlite3" } } } + ShardConnectionTestModelB.connects_to shards: { one: { writing: { database: ":memory:", adapter: "sqlite3" } } } + + ActiveRecord::Base.connected_to(shard: :one) do + ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))") + ShardConnectionTestModel.create!(shard_key: "test_model_default") + + ShardConnectionTestModelB.connection.execute("CREATE TABLE `shard_connection_test_model_bs` (shard_key VARCHAR (255))") + ShardConnectionTestModelB.create!(shard_key: "test_model_b_default") + + assert_equal "test_model_default", ShardConnectionTestModel.where(shard_key: "test_model_default").first.shard_key + assert_equal "test_model_b_default", ShardConnectionTestModelB.where(shard_key: "test_model_b_default").first.shard_key + end + end + + def test_sharding_separation + ShardConnectionTestModel.connects_to shards: { + default: { writing: { database: ":memory:", adapter: "sqlite3" } }, + one: { writing: { database: ":memory:", adapter: "sqlite3" } } + } + + [:default, :one].each do |shard_name| + ActiveRecord::Base.connected_to(shard: shard_name) do + ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))") + end + end + + # Create a record on :default + ShardConnectionTestModel.create!(shard_key: "foo") + + # Make sure we can read it when explicitly connecting to :default + ActiveRecord::Base.connected_to(shard: :default) do + assert ShardConnectionTestModel.find_by_shard_key("foo") + end + + # Switch to shard and make sure we can't read the record from :default + # Also add a new record on :one + ActiveRecord::Base.connected_to(shard: :one) do + assert_not ShardConnectionTestModel.find_by_shard_key("foo") + ShardConnectionTestModel.create!(shard_key: "bar") + end + + # Make sure we can't read the record from :one but can read the record + # from :default + assert_not ShardConnectionTestModel.find_by_shard_key("bar") + assert ShardConnectionTestModel.find_by_shard_key("foo") + end + + def test_swapping_shards_in_a_multi_threaded_environment + tf_default = Tempfile.open "shard_key_default" + tf_shard_one = Tempfile.open "shard_key_one" + + ShardConnectionTestModel.connects_to shards: { + default: { writing: { database: tf_default.path, adapter: "sqlite3" } }, + one: { writing: { database: tf_shard_one.path, adapter: "sqlite3" } } + } + + [:default, :one].each do |shard_name| + ActiveRecord::Base.connected_to(shard: shard_name) do + ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))") + ShardConnectionTestModel.connection.execute("INSERT INTO `shard_connection_test_models` VALUES ('shard_key_#{shard_name}')") + end + end + + shard_one_latch = Concurrent::CountDownLatch.new + shard_default_latch = Concurrent::CountDownLatch.new + + ShardConnectionTestModel.connection + + thread = Thread.new do + ShardConnectionTestModel.connection + + shard_default_latch.wait + assert_equal "shard_key_default", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models") + shard_one_latch.count_down + end + + ActiveRecord::Base.connected_to(shard: :one) do + shard_default_latch.count_down + assert_equal "shard_key_one", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models") + shard_one_latch.wait + end + + thread.join + ensure + tf_shard_one.close + tf_shard_one.unlink + tf_default.close + tf_default.unlink + end + + def test_swapping_shards_and_roles_in_a_multi_threaded_environment + tf_default = Tempfile.open "shard_key_default" + tf_shard_one = Tempfile.open "shard_key_one" + tf_default_reading = Tempfile.open "shard_key_default_reading" + tf_shard_one_reading = Tempfile.open "shard_key_one_reading" + + ShardConnectionTestModel.connects_to shards: { + default: { writing: { database: tf_default.path, adapter: "sqlite3" }, secondary: { database: tf_default_reading.path, adapter: "sqlite3" } }, + one: { writing: { database: tf_shard_one.path, adapter: "sqlite3" }, secondary: { database: tf_shard_one_reading.path, adapter: "sqlite3" } } + } + + [:default, :one].each do |shard_name| + ActiveRecord::Base.connected_to(shard: shard_name) do + ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))") + ShardConnectionTestModel.connection.execute("INSERT INTO `shard_connection_test_models` VALUES ('shard_key_#{shard_name}')") + end + + ActiveRecord::Base.connected_to(shard: shard_name, role: :secondary) do + ShardConnectionTestModel.connection.execute("CREATE TABLE `shard_connection_test_models` (shard_key VARCHAR (255))") + ShardConnectionTestModel.connection.execute("INSERT INTO `shard_connection_test_models` VALUES ('shard_key_#{shard_name}_secondary')") + end + end + + shard_one_latch = Concurrent::CountDownLatch.new + shard_default_latch = Concurrent::CountDownLatch.new + + ShardConnectionTestModel.connection + + thread = Thread.new do + ShardConnectionTestModel.connection + + shard_default_latch.wait + assert_equal "shard_key_default", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models") + shard_one_latch.count_down + end + + ActiveRecord::Base.connected_to(shard: :one, role: :secondary) do + shard_default_latch.count_down + assert_equal "shard_key_one_secondary", ShardConnectionTestModel.connection.select_value("SELECT shard_key from shard_connection_test_models") + shard_one_latch.wait + end + + thread.join + ensure + tf_shard_one.close + tf_shard_one.unlink + tf_default.close + tf_default.unlink + tf_shard_one_reading.close + tf_shard_one_reading.unlink + tf_default_reading.close + tf_default_reading.unlink + end + end + end +end diff --git a/guides/source/active_record_multiple_databases.md b/guides/source/active_record_multiple_databases.md index 4f4b2de327..3e8909ef25 100644 --- a/guides/source/active_record_multiple_databases.md +++ b/guides/source/active_record_multiple_databases.md @@ -9,6 +9,7 @@ After reading this guide you will know: * How to set up your application for multiple databases. * How automatic connection switching works. +* How to use horizontal sharding for multiple databases. * What features are supported and what's still a work in progress. -------------------------------------------------------------------------------- @@ -29,7 +30,7 @@ databases The following features are not (yet) supported: -* Sharding +* Automatic swapping for horizontal sharding * Joining across clusters * Load balancing replicas * Dumping schema caches for multiple databases @@ -259,15 +260,75 @@ like `connected_to(role: :nonexistent)` you will get an error that says `ActiveRecord::ConnectionNotEstablished (No connection pool with 'AnimalsBase' found for the 'nonexistent' role.)` +## Horizontal sharding + +Horizontal sharding is when you split up your database to reduce the number of rows on each +database server, but maintain the same schema across "shards". This is commonly called "multi-tenant" +sharding. + +The API for supporting horizontal sharding in Rails is similar to the multiple database / vertical +sharding API that's existed since Rails 6.0. + +Shards are declared in the three-tier config like this: + +```yaml +production: + primary: + database: my_primary_database + adapter: mysql + primary_replica: + database: my_primary_database + adapter: mysql + replica: true + primary_shard_one: + database: my_primary_shard_one + adapter: mysql + primary_shard_one_replica: + database: my_primary_shard_one + adapter: mysql + replica: true +``` + +Models are then connected with the `connects_to` API via the `shards` key: + +```ruby +class ApplicationRecord < ActiveRecord::Base + self.abstract_class = true + + connects_to shards: { + default: { writing: :primary, reading: :primary_replica }, + shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica } + } +end + +Then models can swap connections manually via the `connected_to` API: + +```ruby +ActiveRecord::Base.connected_to(shard: :default) do + @id = Record.create! # creates a record in shard one +end + +ActiveRecord::Base.connected_to(shard: :shard_one) do + Record.find(@id) # can't find record, doesn't exist +end +``` + +The horizontal sharding API also supports read replicas. You can swap the +role and the shard with the `connected_to` API. + +```ruby +ActiveRecord::Base.connected_to(role: :reading, shard: :shard_one) do + Record.first # lookup record from read replica of shard one +end +``` + ## Caveats -### Sharding +### Automatic swapping for horizontal sharding -As noted at the top, Rails doesn't (yet) support sharding. We had to do a lot of work -to support multiple databases for Rails 6.0. The lack of support for sharding isn't -an oversight, but does require additional work that didn't make it in for 6.0. For now -if you need sharding it may be advisable to continue using one of the many gems -that supports this. +While Rails now supports an API for connecting to and swapping connections of shards, it does +not yet support an automatic swapping strategy. Any shard swapping will need to be done manually +in your app via a middleware or `around_action`. ### Load Balancing Replicas -- GitLab