提交 451437c6 编写于 作者: X Xavier Noria

adds support for limits in batch processing

上级 3b55ac22
* The flag `error_on_ignored_order_or_limit` has been deprecated in favor of
the current `error_on_ignored_order`.
*Xavier Noria*
* Batch processing methods support `limit`:
Post.limit(10_000).find_each do |post|
# ...
end
It also works in `find_in_batches` and `in_batches`.
*Xavier Noria*
* Using `group` with an attribute that has a custom type will properly cast
the hash keys after calling a calculation method like `count`. Fixes #25595.
......
......@@ -72,11 +72,20 @@ def self.configurations
##
# :singleton-method:
# Specifies if an error should be raised on query limit or order being
# Specifies if an error should be raised if the query has an order being
# ignored when doing batch queries. Useful in applications where the
# limit or scope being ignored is error-worthy, rather than a warning.
# scope being ignored is error-worthy, rather than a warning.
mattr_accessor :error_on_ignored_order, instance_writer: false
self.error_on_ignored_order = false
mattr_accessor :error_on_ignored_order_or_limit, instance_writer: false
self.error_on_ignored_order_or_limit = false
def self.error_on_ignored_order_or_limit=(value)
ActiveSupport::Deprecation.warn(<<-MSG.squish)
The flag error_on_ignored_order_or_limit is deprecated. Limits are
now supported. Please use error_on_ignored_order= instead.
MSG
self.error_on_ignored_order = value
end
##
# :singleton-method:
......
require "active_record/relation/batches/batch_enumerator"
require 'active_record/relation/batches/batch_enumerator'
module ActiveRecord
module Batches
ORDER_OR_LIMIT_IGNORED_MESSAGE = "Scoped order and limit are ignored, it's forced to be batch order and batch size."
ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order."
# Looping through a collection of records from the database
# (using the Scoping::Named::ClassMethods.all method, for example)
......@@ -34,15 +34,19 @@ module Batches
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# the order and limit have to be ignored due to batching.
# the order has to be ignored due to batching.
#
# This is especially useful if you want multiple workers dealing with
# the same processing queue. You can make worker 1 handle all the records
# between id 0 and 10,000 and worker 2 handle from 10,000 and beyond
# (by setting the +:start+ and +:finish+ option on each worker).
# Limits are honored, and if present there is no requirement for the batch
# size, it can be less than, equal, or greater than the limit.
#
# # Let's process for a batch of 2000 records, skipping the first 2000 rows
# Person.find_each(start: 2000, batch_size: 2000) do |person|
# The options +start+ and +finish+ are especially useful if you want
# multiple workers dealing with the same processing queue. You can make
# worker 1 handle all the records between id 1 and 9999 and worker 2
# handle from 10000 and beyond by setting the +:start+ and +:finish+
# option on each worker.
#
# # Let's process from record 10_000 on.
# Person.find_each(start: 10_000) do |person|
# person.party_all_night!
# end
#
......@@ -51,8 +55,8 @@ module Batches
# work. This also means that this method only works when the primary key is
# orderable (e.g. an integer or string).
#
# NOTE: You can't set the limit either, that's used to control
# the batch sizes.
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
if block_given?
find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore) do |records|
......@@ -89,15 +93,19 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# the order and limit have to be ignored due to batching.
# the order has to be ignored due to batching.
#
# Limits are honored, and if present there is no requirement for the batch
# size, it can be less than, equal, or greater than the limit.
#
# This is especially useful if you want multiple workers dealing with
# the same processing queue. You can make worker 1 handle all the records
# between id 0 and 10,000 and worker 2 handle from 10,000 and beyond
# (by setting the +:start+ and +:finish+ option on each worker).
# The options +start+ and +finish+ are especially useful if you want
# multiple workers dealing with the same processing queue. You can make
# worker 1 handle all the records between id 1 and 9999 and worker 2
# handle from 10000 and beyond by setting the +:start+ and +:finish+
# option on each worker.
#
# # Let's process the next 2000 records
# Person.find_in_batches(start: 2000, batch_size: 2000) do |group|
# # Let's process from record 10_000 on.
# Person.find_in_batches(start: 10_000) do |group|
# group.each { |person| person.party_all_night! }
# end
#
......@@ -106,8 +114,8 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
# work. This also means that this method only works when the primary key is
# orderable (e.g. an integer or string).
#
# NOTE: You can't set the limit either, that's used to control
# the batch sizes.
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
relation = self
unless block_given?
......@@ -149,17 +157,19 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# the order and limit have to be ignored due to batching.
# the order has to be ignored due to batching.
#
# Limits are honored, and if present there is no requirement for the batch
# size, it can be less than, equal, or greater than the limit.
#
# This is especially useful if you want to work with the
# ActiveRecord::Relation object instead of the array of records, or if
# you want multiple workers dealing with the same processing queue. You can
# make worker 1 handle all the records between id 0 and 10,000 and worker 2
# handle from 10,000 and beyond (by setting the +:start+ and +:finish+
# option on each worker).
# The options +start+ and +finish+ are especially useful if you want
# multiple workers dealing with the same processing queue. You can make
# worker 1 handle all the records between id 1 and 9999 and worker 2
# handle from 10000 and beyond by setting the +:start+ and +:finish+
# option on each worker.
#
# # Let's process the next 2000 records
# Person.in_batches(of: 2000, start: 2000).update_all(awesome: true)
# # Let's process from record 10_000 on.
# Person.in_batches(start: 10_000).update_all(awesome: true)
#
# An example of calling where query method on the relation:
#
......@@ -179,19 +189,26 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
# consistent. Therefore the primary key must be orderable, e.g an integer
# or a string.
#
# NOTE: You can't set the limit either, that's used to control the batch
# sizes.
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil)
relation = self
unless block_given?
return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self)
end
if arel.orders.present? || arel.taken.present?
act_on_order_or_limit_ignored(error_on_ignore)
if arel.orders.present?
act_on_ignored_order(error_on_ignore)
end
batch_limit = of
if limit_value
remaining = limit_value
batch_limit = remaining if remaining < batch_limit
relation = relation.limit(nil) # new relation without the limit
end
relation = relation.reorder(batch_order).limit(of)
relation = relation.reorder(batch_order).limit(batch_limit)
relation = apply_limits(relation, start, finish)
batch_relation = relation
......@@ -199,11 +216,11 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
if load
records = batch_relation.records
ids = records.map(&:id)
yielded_relation = self.where(primary_key => ids)
yielded_relation = where(primary_key => ids)
yielded_relation.load_records(records)
else
ids = batch_relation.pluck(primary_key)
yielded_relation = self.where(primary_key => ids)
yielded_relation = where(primary_key => ids)
end
break if ids.empty?
......@@ -213,7 +230,20 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
yield yielded_relation
break if ids.length < of
break if ids.length < batch_limit
if limit_value
remaining -= ids.length
if remaining == 0
# Saves a useless iteration when the limit is a multiple of the
# batch size.
break
elsif remaining < batch_limit
relation = relation.limit(remaining)
end
end
batch_relation = relation.where(arel_attribute(primary_key).gt(primary_key_offset))
end
end
......@@ -230,13 +260,13 @@ def batch_order
"#{quoted_table_name}.#{quoted_primary_key} ASC"
end
def act_on_order_or_limit_ignored(error_on_ignore)
raise_error = (error_on_ignore.nil? ? self.klass.error_on_ignored_order_or_limit : error_on_ignore)
def act_on_ignored_order(error_on_ignore)
raise_error = (error_on_ignore.nil? ? self.klass.error_on_ignored_order : error_on_ignore)
if raise_error
raise ArgumentError.new(ORDER_OR_LIMIT_IGNORED_MESSAGE)
raise ArgumentError.new(ORDER_IGNORE_MESSAGE)
elsif logger
logger.warn(ORDER_OR_LIMIT_IGNORED_MESSAGE)
logger.warn(ORDER_IGNORE_MESSAGE)
end
end
end
......
......@@ -68,10 +68,26 @@ def test_each_should_execute_if_id_is_in_select
end
end
def test_warn_if_limit_scope_is_set
assert_called(ActiveRecord::Base.logger, :warn) do
Post.limit(1).find_each { |post| post }
test 'find_each should honor limit if passed a block' do
limit = @total - 1
total = 0
Post.limit(limit).find_each do |post|
total += 1
end
assert_equal limit, total
end
test 'find_each should honor limit if no block is passed' do
limit = @total - 1
total = 0
Post.limit(limit).find_each.each do |post|
total += 1
end
assert_equal limit, total
end
def test_warn_if_order_scope_is_set
......@@ -84,7 +100,7 @@ def test_logger_not_required
previous_logger = ActiveRecord::Base.logger
ActiveRecord::Base.logger = nil
assert_nothing_raised do
Post.limit(1).find_each { |post| post }
Post.order('comments_count DESC').find_each { |post| post }
end
ensure
ActiveRecord::Base.logger = previous_logger
......@@ -172,26 +188,26 @@ def test_find_in_batches_should_error_on_ignore_the_order
def test_find_in_batches_should_not_error_if_config_overridden
# Set the config option which will be overridden
prev = ActiveRecord::Base.error_on_ignored_order_or_limit
ActiveRecord::Base.error_on_ignored_order_or_limit = true
prev = ActiveRecord::Base.error_on_ignored_order
ActiveRecord::Base.error_on_ignored_order = true
assert_nothing_raised do
PostWithDefaultScope.find_in_batches(error_on_ignore: false){}
end
ensure
# Set back to default
ActiveRecord::Base.error_on_ignored_order_or_limit = prev
ActiveRecord::Base.error_on_ignored_order = prev
end
def test_find_in_batches_should_error_on_config_specified_to_error
# Set the config option
prev = ActiveRecord::Base.error_on_ignored_order_or_limit
ActiveRecord::Base.error_on_ignored_order_or_limit = true
prev = ActiveRecord::Base.error_on_ignored_order
ActiveRecord::Base.error_on_ignored_order = true
assert_raise(ArgumentError) do
PostWithDefaultScope.find_in_batches(){}
end
ensure
# Set back to default
ActiveRecord::Base.error_on_ignored_order_or_limit = prev
ActiveRecord::Base.error_on_ignored_order = prev
end
def test_find_in_batches_should_not_error_by_default
......@@ -249,6 +265,28 @@ def test_find_in_batches_should_return_an_enumerator
end
end
test 'find_in_batches should honor limit if passed a block' do
limit = @total - 1
total = 0
Post.limit(limit).find_in_batches do |batch|
total += batch.size
end
assert_equal limit, total
end
test 'find_in_batches should honor limit if no block is passed' do
limit = @total - 1
total = 0
Post.limit(limit).find_in_batches.each do |batch|
total += batch.size
end
assert_equal limit, total
end
def test_in_batches_should_not_execute_any_query
assert_no_queries do
assert_kind_of ActiveRecord::Batches::BatchEnumerator, Post.in_batches(of: 2)
......@@ -486,4 +524,78 @@ def test_find_in_batches_should_return_a_sized_enumerator
assert_equal 1, Post.find_in_batches(:batch_size => 10_000).size
end
end
[true, false].each do |load|
test "in_batches should return limit records when limit is less than batch size and load is #{load}" do
limit = 3
batch_size = 5
total = 0
Post.limit(limit).in_batches(of: batch_size, load: load) do |batch|
total += batch.count
end
assert_equal limit, total
end
test "in_batches should return limit records when limit is greater than batch size and load is #{load}" do
limit = 5
batch_size = 3
total = 0
Post.limit(limit).in_batches(of: batch_size, load: load) do |batch|
total += batch.count
end
assert_equal limit, total
end
test "in_batches should return limit records when limit is a multiple of the batch size and load is #{load}" do
limit = 6
batch_size = 3
total = 0
Post.limit(limit).in_batches(of: batch_size, load: load) do |batch|
total += batch.count
end
assert_equal limit, total
end
test "in_batches should return no records if the limit is 0 and load is #{load}" do
limit = 0
batch_size = 1
total = 0
Post.limit(limit).in_batches(of: batch_size, load: load) do |batch|
total += batch.count
end
assert_equal limit, total
end
test "in_batches should return all if the limit is greater than the number of records when load is #{load}" do
limit = @total + 1
batch_size = 1
total = 0
Post.limit(limit).in_batches(of: batch_size, load: load) do |batch|
total += batch.count
end
assert_equal @total, total
end
end
test 'error_on_ignored_order_or_limit= is deprecated' do
begin
prev = ActiveRecord::Base.error_on_ignored_order
assert_deprecated 'Please use error_on_ignored_order= instead.' do
ActiveRecord::Base.error_on_ignored_order_or_limit = true
end
assert ActiveRecord::Base.error_on_ignored_order
ensure
ActiveRecord::Base.error_on_ignored_order = prev
end
end
end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册