提交 b3f5d3c7 编写于 作者: D David Heinemeier Hansson

Merge pull request #20933 from siadat/relations-in-batches-using-ids

in_batches using ids
* Add `ActiveRecord::Relation#in_batches` to work with records and relations
in batches.
Available options are `of` (batch size), `load`, `begin_at`, and `end_at`.
Examples:
Person.in_batches.each_record(&:party_all_night!)
Person.in_batches.update_all(awesome: true)
Person.in_batches.delete_all
Person.in_batches.each do |relation|
relation.delete_all
sleep 10 # Throttles the delete queries
end
Closes #20933.
*Sina Siadat*
* Added methods for PostgreSQL geometric data types to use in migrations
Example:
......
......@@ -6,7 +6,7 @@ module Querying
delegate :find_or_create_by, :find_or_create_by!, :find_or_initialize_by, to: :all
delegate :find_by, :find_by!, to: :all
delegate :destroy, :destroy_all, :delete, :delete_all, :update, :update_all, to: :all
delegate :find_each, :find_in_batches, to: :all
delegate :find_each, :find_in_batches, :in_batches, to: :all
delegate :select, :group, :order, :except, :reorder, :limit, :offset, :joins, :or,
:where, :rewhere, :preload, :eager_load, :includes, :from, :lock, :readonly,
:having, :create_with, :uniq, :distinct, :references, :none, :unscope, to: :all
......
......@@ -667,6 +667,13 @@ def inspect
"#<#{self.class.name} [#{entries.join(', ')}]>"
end
protected
def load_records(records)
@records = records
@loaded = true
end
private
def exec_queries
......
require "active_record/relation/batches/batch_enumerator"
module ActiveRecord
module Batches
# Looping through a collection of records from the database
......@@ -122,24 +124,102 @@ def find_in_batches(begin_at: nil, end_at: nil, batch_size: 1000, start: nil)
end
end
in_batches(of: batch_size, begin_at: begin_at, end_at: end_at, load: true) do |batch|
yield batch.to_a
end
end
# Yields ActiveRecord::Relation objects to work with a batch of records.
#
# Person.where("age > 21").in_batches do |relation|
# relation.delete_all
# sleep(10) # Throttle the delete queries
# end
#
# If you do not provide a block to #in_batches, it will return a
# BatchEnumerator which is enumerable.
#
# Person.in_batches.with_index do |relation, batch_index|
# puts "Processing relation ##{batch_index}"
# relation.each { |relation| relation.delete_all }
# end
#
# Examples of calling methods on the returned BatchEnumerator object:
#
# Person.in_batches.delete_all
# Person.in_batches.update_all(awesome: true)
# Person.in_batches.each_record(&:party_all_night!)
#
# ==== Options
# * <tt>:of</tt> - Specifies the size of the batch. Default to 1000.
# * <tt>:load</tt> - Specifies if the relation should be loaded. Default to false.
# * <tt>:begin_at</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:end_at</tt> - Specifies the primary key value to end at, inclusive of the value.
#
# This is especially useful if you want to work with the
# ActiveRecord::Relation object instead of the array of records, or if
# you want multiple workers dealing with the same processing queue. You can
# make worker 1 handle all the records between id 0 and 10,000 and worker 2
# handle from 10,000 and beyond (by setting the +:begin_at+ and +:end_at+
# option on each worker).
#
# # Let's process the next 2000 records
# Person.in_batches(of: 2000, begin_at: 2000).update_all(awesome: true)
#
# An example of calling where query method on the relation:
#
# Person.in_batches.each do |relation|
# relation.update_all('age = age + 1')
# relation.where('age > 21').update_all(should_party: true)
# relation.where('age <= 21').delete_all
# end
#
# NOTE: If you are going to iterate through each record, you should call
# #each_record on the yielded BatchEnumerator:
#
# Person.in_batches.each_record(&:party_all_night!)
#
# NOTE: It's not possible to set the order. That is automatically set to
# ascending on the primary key ("id ASC") to make the batch ordering
# consistent. Therefore the primary key must be orderable, e.g an integer
# or a string.
#
# NOTE: You can't set the limit either, that's used to control the batch
# sizes.
def in_batches(of: 1000, begin_at: nil, end_at: nil, load: false)
relation = self
unless block_given?
return BatchEnumerator.new(of: of, begin_at: begin_at, end_at: end_at, relation: self)
end
if logger && (arel.orders.present? || arel.taken.present?)
logger.warn("Scoped order and limit are ignored, it's forced to be batch order and batch size")
end
relation = relation.reorder(batch_order).limit(batch_size)
relation = relation.reorder(batch_order).limit(of)
relation = apply_limits(relation, begin_at, end_at)
records = relation.to_a
batch_relation = relation
loop do
if load
records = batch_relation.to_a
ids = records.map(&:id)
yielded_relation = self.where(primary_key => ids)
yielded_relation.load_records(records)
else
ids = batch_relation.pluck(primary_key)
yielded_relation = self.where(primary_key => ids)
end
while records.any?
records_size = records.size
primary_key_offset = records.last.id
raise "Primary key not included in the custom select clause" unless primary_key_offset
break if ids.empty?
yield records
primary_key_offset = ids.last
raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
break if records_size < batch_size
yield yielded_relation
records = relation.where(table[primary_key].gt(primary_key_offset)).to_a
break if ids.length < of
batch_relation = relation.where(table[primary_key].gt(primary_key_offset))
end
end
......
module ActiveRecord
module Batches
class BatchEnumerator
include Enumerable
def initialize(of: 1000, begin_at: nil, end_at: nil, relation:) #:nodoc:
@of = of
@relation = relation
@begin_at = begin_at
@end_at = end_at
end
# Looping through a collection of records from the database (using the
# +all+ method, for example) is very inefficient since it will try to
# instantiate all the objects at once.
#
# In that case, batch processing methods allow you to work with the
# records in batches, thereby greatly reducing memory consumption.
#
# Person.in_batches.each_record do |person|
# person.do_awesome_stuff
# end
#
# Person.where("age > 21").in_batches(of: 10).each_record do |person|
# person.party_all_night!
# end
#
# If you do not provide a block to #each_record, it will return an Enumerator
# for chaining with other methods:
#
# Person.in_batches.each_record.with_index do |person, index|
# person.award_trophy(index + 1)
# end
def each_record
return to_enum(:each_record) unless block_given?
@relation.to_enum(:in_batches, of: @of, begin_at: @begin_at, end_at: @end_at, load: true).each do |relation|
relation.to_a.each { |record| yield record }
end
end
# Delegates #delete_all, #update_all, #destroy_all methods to each batch.
#
# People.in_batches.delete_all
# People.in_batches.destroy_all('age < 10')
# People.in_batches.update_all('age = age + 1')
[:delete_all, :update_all, :destroy_all].each do |method|
define_method(method) do |*args, &block|
@relation.to_enum(:in_batches, of: @of, begin_at: @begin_at, end_at: @end_at, load: false).each do |relation|
relation.send(method, *args, &block)
end
end
end
# Yields an ActiveRecord::Relation object for each batch of records.
#
# Person.in_batches.each do |relation|
# relation.update_all(awesome: true)
# end
def each
enum = @relation.to_enum(:in_batches, of: @of, begin_at: @begin_at, end_at: @end_at, load: false)
return enum.each { |relation| yield relation } if block_given?
enum
end
end
end
end
......@@ -53,7 +53,7 @@ def test_each_enumerator_should_execute_one_query_per_batch
end
def test_each_should_raise_if_select_is_set_without_id
assert_raise(RuntimeError) do
assert_raise(ArgumentError) do
Post.select(:title).find_each(batch_size: 1) { |post|
flunk "should not call this block"
}
......@@ -199,7 +199,7 @@ def test_find_in_batches_should_use_any_column_as_primary_key_when_start_is_not_
def test_find_in_batches_should_return_an_enumerator
enum = nil
assert_queries(0) do
assert_no_queries do
enum = Post.find_in_batches(:batch_size => 1)
end
assert_queries(4) do
......@@ -210,6 +210,234 @@ def test_find_in_batches_should_return_an_enumerator
end
end
def test_in_batches_should_not_execute_any_query
assert_no_queries do
assert_kind_of ActiveRecord::Batches::BatchEnumerator, Post.in_batches(of: 2)
end
end
def test_in_batches_should_yield_relation_if_block_given
assert_queries(6) do
Post.in_batches(of: 2) do |relation|
assert_kind_of ActiveRecord::Relation, relation
end
end
end
def test_in_batches_should_be_enumerable_if_no_block_given
assert_queries(6) do
Post.in_batches(of: 2).each do |relation|
assert_kind_of ActiveRecord::Relation, relation
end
end
end
def test_in_batches_each_record_should_yield_record_if_block_is_given
assert_queries(6) do
Post.in_batches(of: 2).each_record do |post|
assert post.title.present?
assert_kind_of Post, post
end
end
end
def test_in_batches_each_record_should_return_enumerator_if_no_block_given
assert_queries(6) do
Post.in_batches(of: 2).each_record.with_index do |post, i|
assert post.title.present?
assert_kind_of Post, post
end
end
end
def test_in_batches_each_record_should_be_ordered_by_id
ids = Post.order('id ASC').pluck(:id)
assert_queries(6) do
Post.in_batches(of: 2).each_record.with_index do |post, i|
assert_equal ids[i], post.id
end
end
end
def test_in_batches_update_all_affect_all_records
assert_queries(6 + 6) do # 6 selects, 6 updates
Post.in_batches(of: 2).update_all(title: "updated-title")
end
assert_equal Post.all.pluck(:title), ["updated-title"] * Post.count
end
def test_in_batches_delete_all_should_not_delete_records_in_other_batches
not_deleted_count = Post.where('id <= 2').count
Post.where('id > 2').in_batches(of: 2).delete_all
assert_equal 0, Post.where('id > 2').count
assert_equal not_deleted_count, Post.count
end
def test_in_batches_should_not_be_loaded
Post.in_batches(of: 1) do |relation|
assert_not relation.loaded?
end
Post.in_batches(of: 1, load: false) do |relation|
assert_not relation.loaded?
end
end
def test_in_batches_should_be_loaded
Post.in_batches(of: 1, load: true) do |relation|
assert relation.loaded?
end
end
def test_in_batches_if_not_loaded_executes_more_queries
assert_queries(@total + 1) do
Post.in_batches(of: 1, load: false) do |relation|
assert_not relation.loaded?
end
end
end
def test_in_batches_should_return_relations
assert_queries(@total + 1) do
Post.in_batches(of: 1) do |relation|
assert_kind_of ActiveRecord::Relation, relation
end
end
end
def test_in_batches_should_start_from_the_start_option
post = Post.order('id ASC').where('id >= ?', 2).first
assert_queries(2) do
relation = Post.in_batches(of: 1, begin_at: 2).first
assert_equal post, relation.first
end
end
def test_in_batches_should_end_at_the_end_option
post = Post.order('id DESC').where('id <= ?', 5).first
assert_queries(7) do
relation = Post.in_batches(of: 1, end_at: 5, load: true).reverse_each.first
assert_equal post, relation.last
end
end
def test_in_batches_shouldnt_execute_query_unless_needed
assert_queries(2) do
Post.in_batches(of: @total) { |relation| assert_kind_of ActiveRecord::Relation, relation }
end
assert_queries(1) do
Post.in_batches(of: @total + 1) { |relation| assert_kind_of ActiveRecord::Relation, relation }
end
end
def test_in_batches_should_quote_batch_order
c = Post.connection
assert_sql(/ORDER BY #{c.quote_table_name('posts')}.#{c.quote_column_name('id')}/) do
Post.in_batches(of: 1) do |relation|
assert_kind_of ActiveRecord::Relation, relation
assert_kind_of Post, relation.first
end
end
end
def test_in_batches_should_not_use_records_after_yielding_them_in_case_original_array_is_modified
not_a_post = "not a post"
def not_a_post.id
raise StandardError.new("not_a_post had #id called on it")
end
assert_nothing_raised do
Post.in_batches(of: 1) do |relation|
assert_kind_of ActiveRecord::Relation, relation
assert_kind_of Post, relation.first
relation = [not_a_post] * relation.count
end
end
end
def test_in_batches_should_not_ignore_default_scope_without_order_statements
special_posts_ids = SpecialPostWithDefaultScope.all.map(&:id).sort
posts = []
SpecialPostWithDefaultScope.in_batches do |relation|
posts.concat(relation)
end
assert_equal special_posts_ids, posts.map(&:id)
end
def test_in_batches_should_not_modify_passed_options
assert_nothing_raised do
Post.in_batches({ of: 42, begin_at: 1 }.freeze){}
end
end
def test_in_batches_should_use_any_column_as_primary_key
nick_order_subscribers = Subscriber.order('nick asc')
start_nick = nick_order_subscribers.second.nick
subscribers = []
Subscriber.in_batches(of: 1, begin_at: start_nick) do |relation|
subscribers.concat(relation)
end
assert_equal nick_order_subscribers[1..-1].map(&:id), subscribers.map(&:id)
end
def test_in_batches_should_use_any_column_as_primary_key_when_start_is_not_specified
assert_queries(Subscriber.count + 1) do
Subscriber.in_batches(of: 1, load: true) do |relation|
assert_kind_of ActiveRecord::Relation, relation
assert_kind_of Subscriber, relation.first
end
end
end
def test_in_batches_should_return_an_enumerator
enum = nil
assert_no_queries do
enum = Post.in_batches(of: 1)
end
assert_queries(4) do
enum.first(4) do |relation|
assert_kind_of ActiveRecord::Relation, relation
assert_kind_of Post, relation.first
end
end
end
def test_in_batches_relations_should_not_overlap_with_each_other
seen_posts = []
Post.in_batches(of: 2, load: true) do |relation|
relation.to_a.each do |post|
assert_not seen_posts.include?(post)
seen_posts << post
end
end
end
def test_in_batches_relations_with_condition_should_not_overlap_with_each_other
seen_posts = []
author_id = Post.first.author_id
posts_by_author = Post.where(author_id: author_id)
Post.in_batches(of: 2) do |batch|
seen_posts += batch.where(author_id: author_id)
end
assert_equal posts_by_author.pluck(:id).sort, seen_posts.map(&:id).sort
end
def test_in_batches_relations_update_all_should_not_affect_matching_records_in_other_batches
Post.update_all(author_id: 0)
person = Post.last
person.update_attributes(author_id: 1)
Post.in_batches(of: 2) do |batch|
batch.where('author_id >= 1').update_all('author_id = author_id + 1')
end
assert_equal 2, person.reload.author_id # incremented only once
end
def test_find_in_batches_start_deprecated
assert_deprecated do
assert_queries(@total) do
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册