提交 22a26d24 编写于 作者: E Edouard CHIN

Build a multi-statement query when inserting fixtures:

- The `insert_fixtures` method can be optimized by making a single multi statement query for all fixtures having the same connection instead of doing a single query per table
  - The previous code was bulk inserting fixtures for a single table, making X query for X fixture files
  - This patch builds a single **multi statement query** for every tables. Given a set of 3 fixtures (authors, dogs, computers):
  ```ruby
    # before
    %w(authors dogs computers).each do |table|
      sql = build_sql(table)
      connection.query(sql)
    end

    # after

    sql = build_sql(authors, dogs, computers)
    connection.query(sql)
  ```
- `insert_fixtures` is now deprecated, `insert_fixtures_set` is the new way to go with performance improvement
- My tests were done with an app having more than 700 fixtures, the time it takes to insert all of them was around 15s. Using a single multi statement query, it took on average of 8 seconds
- In order for a multi statement to be executed, mysql needs to be connected with the `MULTI_STATEMENTS` [flag](https://dev.mysql.com/doc/refman/5.7/en/c-api-multiple-queries.html), which is done before inserting the fixtures by reconnecting to da the database with the flag declared. Reconnecting to the database creates some caveats:
  1. We loose all open transactions; Inside the original code, when inserting fixtures, a transaction is open. Multple delete statements are [executed](https://github.com/rails/rails/blob/a681eaf22955734c142609961a6d71746cfa0583/activerecord/lib/active_record/fixtures.rb#L566) and finally the fixtures are inserted. The problem with this patch is that we need to open the transaction only after we reconnect to the DB otherwise reconnecting drops the open transaction which doesn't commit all delete statements and inserting fixtures doesn't work since we duplicated them (Primary key duplicate exception)...
    - In order to fix this problem, the transaction is now open directly inside the `insert_fixtures` method, right after we reconnect to the db
    - As an effect, since the transaction is open inside the `insert_fixtures` method, the DELETE statements need to be executed here since the transaction is open later
  2. The same problem happens for the `disable_referential_integrity` since we reconnect, the `FOREIGN_KEY_CHECKS` is reset to the original value
    - Same solution as 1. , the disable_referential_integrity can be called after we reconnect to the transaction
  3. When the multi statement query is executed, no other queries can be performed until we paginate over the set of results, otherwise mysql throws a "Commands out of sync" [Ref](https://dev.mysql.com/doc/refman/5.7/en/commands-out-of-sync.html)
    - Iterating over the set of results until `mysql_client.next_result` is false. [Ref](https://github.com/brianmario/mysql2#multiple-result-sets)
- Removed the `active_record.sql "Fixture delete"` notification, the delete statements are now inside the INSERT's one
- On mysql the `max_allowed_packet` is looked up:
  1. Before executing the multi-statements query, we check the packet length of each statements, if the packet is bigger than the max_allowed_packet config, an `ActiveRecordError` is raised
  2. Otherwise we concatenate the current sql statement into the previous and so on until the packet is `< max_allowed_packet`
上级 a681eaf2
......@@ -356,35 +356,32 @@ def insert_fixture(fixture, table_name)
# Inserts a set of fixtures into the table. Overridden in adapters that require
# something beyond a simple insert (eg. Oracle).
def insert_fixtures(fixtures, table_name)
ActiveSupport::Deprecation.warn(<<-MSG.squish)
`insert_fixtures` is deprecated and will be removed in the next version of Rails.
Consider using `insert_fixtures_set` for performance improvement.
MSG
return if fixtures.empty?
columns = schema_cache.columns_hash(table_name)
execute(build_fixture_sql(fixtures, table_name), "Fixtures Insert")
end
values = fixtures.map do |fixture|
fixture = fixture.stringify_keys
def insert_fixtures_set(fixture_set, tables_to_delete = [])
fixture_inserts = fixture_set.map do |table_name, fixtures|
next if fixtures.empty?
unknown_columns = fixture.keys - columns.keys
if unknown_columns.any?
raise Fixture::FixtureError, %(table "#{table_name}" has no columns named #{unknown_columns.map(&:inspect).join(', ')}.)
end
build_fixture_sql(fixtures, table_name)
end.compact
columns.map do |name, column|
if fixture.key?(name)
type = lookup_cast_type_from_column(column)
bind = Relation::QueryAttribute.new(name, fixture[name], type)
with_yaml_fallback(bind.value_for_database)
else
Arel.sql("DEFAULT")
total_sql = Array.wrap(combine_multi_statements(fixture_inserts, tables_to_delete))
disable_referential_integrity do
transaction(requires_new: true) do
total_sql.each do |sql|
execute sql, "Fixtures Insert"
yield if block_given?
end
end
end
table = Arel::Table.new(table_name)
manager = Arel::InsertManager.new
manager.into(table)
columns.each_key { |column| manager.columns << table[column] }
manager.values = manager.create_values_list(values)
execute manager.to_sql, "Fixtures Insert"
end
def empty_insert_statement_value
......@@ -417,6 +414,43 @@ def join_to_update(update, select, key) # :nodoc:
private
def build_fixture_sql(fixtures, table_name)
columns = schema_cache.columns_hash(table_name)
values = fixtures.map do |fixture|
fixture = fixture.stringify_keys
unknown_columns = fixture.keys - columns.keys
if unknown_columns.any?
raise Fixture::FixtureError, %(table "#{table_name}" has no columns named #{unknown_columns.map(&:inspect).join(', ')}.)
end
columns.map do |name, column|
if fixture.key?(name)
type = lookup_cast_type_from_column(column)
bind = Relation::QueryAttribute.new(name, fixture[name], type)
with_yaml_fallback(bind.value_for_database)
else
Arel.sql("DEFAULT")
end
end
end
table = Arel::Table.new(table_name)
manager = Arel::InsertManager.new
manager.into(table)
columns.each_key { |column| manager.columns << table[column] }
manager.values = manager.create_values_list(values)
manager.to_sql
end
def combine_multi_statements(fixture_inserts, tables_to_delete)
tables_to_delete.each { |table| delete "DELETE FROM #{quote_table_name(table)}", "Fixture Delete" }
fixture_inserts.join(";\n")
end
# Returns a subquery for the given key using the join information.
def subquery_for(key, select)
subselect = select.clone
......
......@@ -533,8 +533,57 @@ def insert_fixtures(*)
without_sql_mode("NO_AUTO_VALUE_ON_ZERO") { super }
end
def insert_fixtures_set(fixture_set, tables_to_delete = [])
iterate_over_results = -> { while raw_connection.next_result; end; }
with_multi_statements do
without_sql_mode("NO_AUTO_VALUE_ON_ZERO") do
super(fixture_set, tables_to_delete, &iterate_over_results)
end
end
end
private
def combine_multi_statements(fixture_inserts, tables_to_delete)
super
fixture_inserts.each_with_object([]) do |sql, total_sql|
previous_packet = total_sql.last
sql << ";\n"
if max_allowed_packet_reached?(sql, previous_packet) || total_sql.empty?
total_sql << sql
else
previous_packet << sql
end
end
end
def max_allowed_packet_reached?(current_packet, previous_packet)
if current_packet.bytesize > max_allowed_packet
raise ActiveRecordError, "Fixtures set is too large #{current_packet.bytesize}. Consider increasing the max_allowed_packet variable."
elsif previous_packet.nil?
false
else
(current_packet.bytesize + previous_packet.bytesize) > max_allowed_packet
end
end
def max_allowed_packet
@max_allowed_packet ||= show_variable("max_allowed_packet")
end
def with_multi_statements
previous_flags = @config[:flags]
@config[:flags] = Mysql2::Client::MULTI_STATEMENTS
reconnect!
yield
ensure
@config[:flags] = previous_flags
reconnect!
end
def without_sql_mode(mode)
result = execute("SELECT @@SESSION.sql_mode")
current_mode = result.first[0]
......
......@@ -373,6 +373,18 @@ def insert_fixtures(rows, table_name)
end
end
def insert_fixtures_set(fixture_set, tables_to_delete = [])
disable_referential_integrity do
transaction(requires_new: true) do
tables_to_delete.each { |table| delete "DELETE FROM #{quote_table_name(table)}", "Fixture Delete" }
fixture_set.each do |table_name, rows|
rows.each { |row| insert_fixture(row, table_name) }
end
end
end
end
private
def initialize_type_map(m = type_map)
super
......
......@@ -540,47 +540,38 @@ def self.create_fixtures(fixtures_directory, fixture_set_names, class_names = {}
}
unless files_to_read.empty?
connection.disable_referential_integrity do
fixtures_map = {}
fixture_sets = files_to_read.map do |fs_name|
klass = class_names[fs_name]
conn = klass ? klass.connection : connection
fixtures_map[fs_name] = new( # ActiveRecord::FixtureSet.new
conn,
fs_name,
klass,
::File.join(fixtures_directory, fs_name))
end
update_all_loaded_fixtures fixtures_map
connection.transaction(requires_new: true) do
deleted_tables = Hash.new { |h, k| h[k] = Set.new }
fixture_sets.each do |fs|
conn = fs.model_class.respond_to?(:connection) ? fs.model_class.connection : connection
table_rows = fs.table_rows
fixtures_map = {}
fixture_sets = files_to_read.map do |fs_name|
klass = class_names[fs_name]
conn = klass ? klass.connection : connection
fixtures_map[fs_name] = new( # ActiveRecord::FixtureSet.new
conn,
fs_name,
klass,
::File.join(fixtures_directory, fs_name))
end
table_rows.each_key do |table|
unless deleted_tables[conn].include? table
conn.delete "DELETE FROM #{conn.quote_table_name(table)}", "Fixture Delete"
end
deleted_tables[conn] << table
end
update_all_loaded_fixtures fixtures_map
fixture_sets_by_connection = fixture_sets.group_by { |fs| fs.model_class ? fs.model_class.connection : connection }
table_rows.each do |fixture_set_name, rows|
conn.insert_fixtures(rows, fixture_set_name)
end
fixture_sets_by_connection.each do |conn, set|
table_rows_for_connection = Hash.new { |h, k| h[k] = [] }
# Cap primary key sequences to max(pk).
if conn.respond_to?(:reset_pk_sequence!)
conn.reset_pk_sequence!(fs.table_name)
end
set.each do |fs|
fs.table_rows.each do |table, rows|
table_rows_for_connection[table].unshift(*rows)
end
end
conn.insert_fixtures_set(table_rows_for_connection, table_rows_for_connection.keys)
cache_fixtures(connection, fixtures_map)
# Cap primary key sequences to max(pk).
if conn.respond_to?(:reset_pk_sequence!)
set.each { |fs| conn.reset_pk_sequence!(fs.table_name) }
end
end
cache_fixtures(connection, fixtures_map)
end
cached_fixtures(connection, fixture_set_names)
end
......
......@@ -228,7 +228,9 @@ def test_insert_fixture
def test_insert_fixtures
tag_values = ["val1", "val2", "val3_with_'_multiple_quote_'_chars"]
@connection.insert_fixtures([{ "tags" => tag_values }], "pg_arrays")
assert_deprecated do
@connection.insert_fixtures([{ "tags" => tag_values }], "pg_arrays")
end
assert_equal(PgArray.last.tags, tag_values)
end
......
......@@ -79,6 +79,122 @@ def test_bulk_insert
ActiveSupport::Notifications.unsubscribe(subscription)
end
end
def test_bulk_insert_multiple_table_with_a_multi_statement_query
subscriber = InsertQuerySubscriber.new
subscription = ActiveSupport::Notifications.subscribe("sql.active_record", subscriber)
create_fixtures("bulbs", "authors", "computers")
expected_sql = <<-EOS.strip_heredoc.chop
INSERT INTO #{ActiveRecord::Base.connection.quote_table_name("bulbs")} .*
INSERT INTO #{ActiveRecord::Base.connection.quote_table_name("authors")} .*
INSERT INTO #{ActiveRecord::Base.connection.quote_table_name("computers")} .*
EOS
assert_equal 1, subscriber.events.size
assert_match(/#{expected_sql}/, subscriber.events.first)
ensure
ActiveSupport::Notifications.unsubscribe(subscription)
end
def test_bulk_insert_with_a_multi_statement_query_raises_an_exception_when_any_insert_fails
require "models/aircraft"
assert_equal false, Aircraft.columns_hash["wheels_count"].null
fixtures = {
"aircraft" => [
{ "name" => "working_aircrafts", "wheels_count" => 2 },
{ "name" => "broken_aircrafts", "wheels_count" => nil },
]
}
assert_no_difference "Aircraft.count" do
assert_raises(ActiveRecord::NotNullViolation) do
ActiveRecord::Base.connection.insert_fixtures_set(fixtures)
end
end
end
end
if current_adapter?(:Mysql2Adapter)
def test_insert_fixtures_set_raises_an_error_when_max_allowed_packet_is_smaller_than_fixtures_set_size
conn = ActiveRecord::Base.connection
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * packet_size] },
]
}
conn.stubs(:max_allowed_packet).returns(packet_size)
error = assert_raises(ActiveRecord::ActiveRecordError) { conn.insert_fixtures_set(fixtures) }
assert_match(/Fixtures set is too large/, error.message)
end
def test_insert_fixture_set_when_max_allowed_packet_is_bigger_than_fixtures_set_size
conn = ActiveRecord::Base.connection
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 51] },
]
}
conn.stubs(:max_allowed_packet).returns(packet_size)
assert_difference "TrafficLight.count" do
conn.insert_fixtures_set(fixtures)
end
end
def test_insert_fixtures_set_split_the_total_sql_into_two_chunks_smaller_than_max_allowed_packet
subscriber = InsertQuerySubscriber.new
subscription = ActiveSupport::Notifications.subscribe("sql.active_record", subscriber)
conn = ActiveRecord::Base.connection
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 450] },
],
"comments" => [
{ "post_id" => 1, "body" => "a" * 450 },
]
}
conn.stubs(:max_allowed_packet).returns(packet_size)
conn.insert_fixtures_set(fixtures)
assert_equal 2, subscriber.events.size
assert_operator subscriber.events.first.bytesize, :<, packet_size
assert_operator subscriber.events.second.bytesize, :<, packet_size
ensure
ActiveSupport::Notifications.unsubscribe(subscription)
end
def test_insert_fixtures_set_concat_total_sql_into_a_single_packet_smaller_than_max_allowed_packet
subscriber = InsertQuerySubscriber.new
subscription = ActiveSupport::Notifications.subscribe("sql.active_record", subscriber)
conn = ActiveRecord::Base.connection
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 200] },
],
"comments" => [
{ "post_id" => 1, "body" => "a" * 200 },
]
}
conn.stubs(:max_allowed_packet).returns(packet_size)
assert_difference ["TrafficLight.count", "Comment.count"], +1 do
conn.insert_fixtures_set(fixtures)
end
assert_equal 1, subscriber.events.size
ensure
ActiveSupport::Notifications.unsubscribe(subscription)
end
end
def test_broken_yaml_exception
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册