提交 1248c632 编写于 作者: P Pratik Naik

Merge commit 'mainstream/master'

*Edge*
* Added ActiveRecord::Base.each and ActiveRecord::Base.find_in_batches for batch processing [DHH/Jamis Buck]
* Added that ActiveRecord::Base.exists? can be called with no arguments #1817 [Scott Taylor]
......
......@@ -48,6 +48,7 @@ def self.load_all!
autoload :AttributeMethods, 'active_record/attribute_methods'
autoload :AutosaveAssociation, 'active_record/autosave_association'
autoload :Base, 'active_record/base'
autoload :Batches, 'active_record/batches'
autoload :Calculations, 'active_record/calculations'
autoload :Callbacks, 'active_record/callbacks'
autoload :Dirty, 'active_record/dirty'
......
......@@ -3148,7 +3148,7 @@ def clone_attribute_value(reader_method, attribute_name)
# #save_with_autosave_associations to be wrapped inside a transaction.
include AutosaveAssociation, NestedAttributes
include Aggregations, Transactions, Reflection, Calculations, Serialization
include Aggregations, Transactions, Reflection, Batches, Calculations, Serialization
end
end
......
module ActiveRecord
module Batches # :nodoc:
def self.included(base)
base.extend(ClassMethods)
end
# When processing large numbers of records, it's often a good idea to do so in batches to prevent memory ballooning.
module ClassMethods
# Yields each record that was found by the find +options+. The find is performed by find_in_batches
# with a batch size of 1000 (or as specified by the +batch_size+ option).
#
# Example:
#
# Person.each(:conditions => "age > 21") do |person|
# person.party_all_night!
# end
#
# Note: This method is only intended to use for batch processing of large amounts of records that wouldn't fit in
# memory all at once. If you just need to loop over less than 1000 records, it's probably better just to use the
# regular find methods.
def each(options = {})
find_in_batches(options) do |records|
records.each { |record| yield record }
end
self
end
# Yields each batch of records that was found by the find +options+ as an array. The size of each batch is
# set by the +batch_size+ option; the default is 1000.
#
# You can control the starting point for the batch processing by supplying the +start+ option. This is especially
# useful if you want multiple workers dealing with the same processing queue. You can make worker 1 handle all the
# records between id 0 and 10,000 and worker 2 handle from 10,000 and beyond (by setting the +start+ option on that
# worker).
#
# It's not possible to set the order. That is automatically set to ascending on the primary key ("id ASC")
# to make the batch ordering work. This also mean that this method only works with integer-based primary keys.
# You can't set the limit either, that's used to control the the batch sizes.
#
# Example:
#
# Person.find_in_batches(:conditions => "age > 21") do |group|
# sleep(50) # Make sure it doesn't get too crowded in there!
# group.each { |person| person.party_all_night! }
# end
def find_in_batches(options = {})
raise "You can't specify an order, it's forced to be #{batch_order}" if options[:order]
raise "You can't specify a limit, it's forced to be the batch_size" if options[:limit]
start = options.delete(:start).to_i
with_scope(:find => options.merge(:order => batch_order, :limit => options.delete(:batch_size) || 1000)) do
records = find(:all, :conditions => [ "#{table_name}.#{primary_key} >= ?", start ])
while records.any?
yield records
records = find(:all, :conditions => [ "#{table_name}.#{primary_key} > ?", records.last.id ])
end
end
end
private
def batch_order
"#{table_name}.#{primary_key} ASC"
end
end
end
end
\ No newline at end of file
require 'cases/helper'
require 'models/post'
class EachTest < ActiveRecord::TestCase
fixtures :posts
def setup
@posts = Post.all(:order => "id asc")
@total = Post.count
end
def test_each_should_excecute_one_query_per_batch
assert_queries(Post.count + 1) do
Post.each(:batch_size => 1) do |post|
assert_kind_of Post, post
end
end
end
def test_each_should_raise_if_the_order_is_set
assert_raise(RuntimeError) do
Post.each(:order => "title") { |post| post }
end
end
def test_each_should_raise_if_the_limit_is_set
assert_raise(RuntimeError) do
Post.each(:limit => 1) { |post| post }
end
end
def test_find_in_batches_should_return_batches
assert_queries(Post.count + 1) do
Post.find_in_batches(:batch_size => 1) do |batch|
assert_kind_of Array, batch
assert_kind_of Post, batch.first
end
end
end
def test_find_in_batches_should_start_from_the_start_option
assert_queries(Post.count) do
Post.find_in_batches(:batch_size => 1, :start => 2) do |batch|
assert_kind_of Array, batch
assert_kind_of Post, batch.first
end
end
end
end
\ No newline at end of file
*Edge*
* Update bundled memcache-client from 1.5.0.5 to 1.6.4.99. See http://www.mikeperham.com/2009/02/15/memcache-client-performance/ [Mike Perham]
* Ruby 1.9.1p0 fix: URI.unescape can decode multibyte chars. #2033 [MOROHASHI Kyosuke]
* Time#to_s(:rfc822) uses #formatted_offset instead of unreliable and non-standard %z directive #1899 [Zachary Zolton]
......
......@@ -9,9 +9,9 @@
require 'builder'
begin
gem 'memcache-client', '>= 1.5.0.5'
gem 'memcache-client', '>= 1.6.5'
rescue Gem::LoadError
$:.unshift "#{File.dirname(__FILE__)}/vendor/memcache-client-1.5.0.5"
$:.unshift "#{File.dirname(__FILE__)}/vendor/memcache-client-1.6.5"
end
begin
......
# All original code copyright 2005, 2006, 2007 Bob Cottrell, Eric Hodel,
# The Robot Co-op. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the names of the authors nor the names of their contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
# OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
# OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
$TESTING = defined?($TESTING) && $TESTING
require 'socket'
require 'thread'
require 'timeout'
require 'rubygems'
require 'zlib'
require 'digest/sha1'
##
# A Ruby client library for memcached.
#
# This is intended to provide access to basic memcached functionality. It
# does not attempt to be complete implementation of the entire API, but it is
# approaching a complete implementation.
class MemCache
##
# The version of MemCache you are using.
VERSION = '1.5.0.5'
VERSION = '1.6.4.99'
##
# Default options for the cache object.
......@@ -54,8 +23,10 @@ class MemCache
DEFAULT_OPTIONS = {
:namespace => nil,
:readonly => false,
:multithread => false,
:failover => true
:multithread => true,
:failover => true,
:timeout => 0.5,
:logger => nil,
}
##
......@@ -68,13 +39,6 @@ class MemCache
DEFAULT_WEIGHT = 1
##
# The amount of time to wait for a response from a memcached server. If a
# response is not completed within this time, the connection to the server
# will be closed and an error will be raised.
attr_accessor :request_timeout
##
# The namespace for this instance
......@@ -91,9 +55,22 @@ class MemCache
attr_reader :servers
##
# Whether this client should failover reads and writes to another server
# Socket timeout limit with this client, defaults to 0.5 sec.
# Set to nil to disable timeouts.
attr_reader :timeout
##
# Should the client try to failover to another server if the
# first server is down? Defaults to true.
attr_reader :failover
##
# Log debug/info/warn/error to the given Logger, defaults to nil.
attr_reader :logger
attr_accessor :failover
##
# Accepts a list of +servers+ and a list of +opts+. +servers+ may be
# omitted. See +servers=+ for acceptable server list arguments.
......@@ -103,7 +80,11 @@ class MemCache
# [:namespace] Prepends this value to all keys added or retrieved.
# [:readonly] Raises an exception on cache writes when true.
# [:multithread] Wraps cache access in a Mutex for thread safety.
#
# [:failover] Should the client try to failover to another server if the
# first server is down? Defaults to true.
# [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec,
# set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8).
# [:logger] Logger to use for info/debug output, defaults to nil
# Other options are ignored.
def initialize(*args)
......@@ -130,9 +111,15 @@ def initialize(*args)
@namespace = opts[:namespace]
@readonly = opts[:readonly]
@multithread = opts[:multithread]
@failover = opts[:failover]
@timeout = opts[:timeout]
@failover = opts[:failover]
@logger = opts[:logger]
@mutex = Mutex.new if @multithread
@buckets = []
logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger
Thread.current[:memcache_client] = self.object_id if !@multithread
self.servers = servers
end
......@@ -140,8 +127,8 @@ def initialize(*args)
# Returns a string representation of the cache object.
def inspect
"<MemCache: %d servers, %d buckets, ns: %p, ro: %p>" %
[@servers.length, @buckets.length, @namespace, @readonly]
"<MemCache: %d servers, ns: %p, ro: %p>" %
[@servers.length, @namespace, @readonly]
end
##
......@@ -162,7 +149,7 @@ def readonly?
# Set the servers that the requests will be distributed between. Entries
# can be either strings of the form "hostname:port" or
# "hostname:port:weight" or MemCache::Server objects.
#
def servers=(servers)
# Create the server objects.
@servers = Array(servers).collect do |server|
......@@ -172,21 +159,17 @@ def servers=(servers)
port ||= DEFAULT_PORT
weight ||= DEFAULT_WEIGHT
Server.new self, host, port, weight
when Server
if server.memcache.multithread != @multithread then
raise ArgumentError, "can't mix threaded and non-threaded servers"
end
server
else
raise TypeError, "cannot convert #{server.class} into MemCache::Server"
server
end
end
# Create an array of server buckets for weight selection of servers.
@buckets = []
@servers.each do |server|
server.weight.times { @buckets.push(server) }
end
logger.debug { "Servers now: #{@servers.inspect}" } if logger
# There's no point in doing this if there's only one server
@continuum = create_continuum_for(@servers) if @servers.size > 1
@servers
end
##
......@@ -210,6 +193,7 @@ def decr(key, amount = 1)
def get(key, raw = false)
with_server(key) do |server, cache_key|
value = cache_get server, cache_key
logger.debug { "GET #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
return nil if value.nil?
value = Marshal.load value unless raw
return value
......@@ -233,6 +217,8 @@ def get(key, raw = false)
# cache["a"] = 1
# cache["b"] = 2
# cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
#
# Note that get_multi assumes the values are marshalled.
def get_multi(*keys)
raise MemCacheError, 'No active servers' unless active?
......@@ -252,15 +238,20 @@ def get_multi(*keys)
results = {}
server_keys.each do |server, keys_for_server|
keys_for_server = keys_for_server.join ' '
values = cache_get_multi server, keys_for_server
values.each do |key, value|
results[cache_keys[key]] = Marshal.load value
keys_for_server_str = keys_for_server.join ' '
begin
values = cache_get_multi server, keys_for_server_str
values.each do |key, value|
results[cache_keys[key]] = Marshal.load value
end
rescue IndexError => e
# Ignore this server and try the others
logger.warn { "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger
end
end
return results
rescue TypeError, IndexError => err
rescue TypeError => err
handle_error nil, err
end
......@@ -285,12 +276,19 @@ def incr(key, amount = 1)
# Warning: Readers should not call this method in the event of a cache miss;
# see MemCache#add.
ONE_MB = 1024 * 1024
def set(key, value, expiry = 0, raw = false)
raise MemCacheError, "Update of readonly cache" if @readonly
with_server(key) do |server, cache_key|
value = Marshal.dump value unless raw
command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n"
logger.debug { "SET #{key} to #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
data = value.to_s
raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if data.size > ONE_MB
command = "set #{cache_key} 0 #{expiry} #{data.size}\r\n#{data}\r\n"
with_socket_management(server) do |socket|
socket.write command
......@@ -319,7 +317,8 @@ def add(key, value, expiry = 0, raw = false)
raise MemCacheError, "Update of readonly cache" if @readonly
with_server(key) do |server, cache_key|
value = Marshal.dump value unless raw
command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
logger.debug { "ADD #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n"
with_socket_management(server) do |socket|
socket.write command
......@@ -353,7 +352,6 @@ def flush_all
raise MemCacheError, "Update of readonly cache" if @readonly
begin
@mutex.lock if @multithread
@servers.each do |server|
with_socket_management(server) do |socket|
socket.write "flush_all\r\n"
......@@ -364,8 +362,6 @@ def flush_all
end
rescue IndexError => err
handle_error nil, err
ensure
@mutex.unlock if @multithread
end
end
......@@ -424,7 +420,7 @@ def stats
while line = socket.gets do
raise_on_error_response! line
break if line == "END\r\n"
if line =~ /\ASTAT ([\w]+) ([\w\.\:]+)/ then
if line =~ /\ASTAT ([\S]+) ([\w\.\:]+)/ then
name, value = $1, $2
stats[name] = case name
when 'version'
......@@ -477,6 +473,14 @@ def make_cache_key(key)
end
end
##
# Returns an interoperable hash value for +key+. (I think, docs are
# sketchy for down servers).
def hash_for(key)
Zlib.crc32(key)
end
##
# Pick a server to handle the request based on a hash of the key.
......@@ -487,27 +491,17 @@ def get_server_for_key(key, options = {})
raise MemCacheError, "No servers available" if @servers.empty?
return @servers.first if @servers.length == 1
hkey = hash_for key
if @failover
20.times do |try|
server = @buckets[hkey % @buckets.compact.size]
return server if server.alive?
hkey += hash_for "#{try}#{key}"
end
else
return @buckets[hkey % @buckets.compact.size]
end
raise MemCacheError, "No servers available"
end
hkey = hash_for(key)
##
# Returns an interoperable hash value for +key+. (I think, docs are
# sketchy for down servers).
20.times do |try|
entryidx = Continuum.binary_search(@continuum, hkey)
server = @continuum[entryidx].server
return server if server.alive?
break unless failover
hkey = hash_for "#{try}#{key}"
end
def hash_for(key)
(Zlib.crc32(key) >> 16) & 0x7fff
raise MemCacheError, "No servers available"
end
##
......@@ -608,24 +602,28 @@ def cache_incr(server, cache_key, amount)
# failures (but does still apply to unexpectedly lost connections etc.).
def with_socket_management(server, &block)
check_multithread_status!
@mutex.lock if @multithread
retried = false
begin
socket = server.socket
# Raise an IndexError to show this server is out of whack. If were inside
# a with_server block, we'll catch it and attempt to restart the operation.
raise IndexError, "No connection to server (#{server.status})" if socket.nil?
block.call(socket)
rescue SocketError => err
server.mark_dead(err.message)
logger.warn { "Socket failure: #{err.message}" } if logger
server.mark_dead(err)
handle_error(server, err)
rescue MemCacheError, SocketError, SystemCallError, IOError => err
rescue MemCacheError, SystemCallError, IOError => err
logger.warn { "Generic failure: #{err.class.name}: #{err.message}" } if logger
handle_error(server, err) if retried || socket.nil?
retried = true
retry
......@@ -640,8 +638,9 @@ def with_server(key)
server, cache_key = request_setup(key)
yield server, cache_key
rescue IndexError => e
logger.warn { "Server failed: #{e.class.name}: #{e.message}" } if logger
if !retried && @servers.size > 1
puts "Connection to server #{server.inspect} DIED! Retrying operation..."
logger.info { "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger
retried = true
retry
end
......@@ -677,6 +676,37 @@ def raise_on_error_response!(response)
end
end
def create_continuum_for(servers)
total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
continuum = []
servers.each do |server|
entry_count_for(server, servers.size, total_weight).times do |idx|
hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
value = Integer("0x#{hash[0..7]}")
continuum << Continuum::Entry.new(value, server)
end
end
continuum.sort { |a, b| a.value <=> b.value }
end
def entry_count_for(server, total_servers, total_weight)
((total_servers * Continuum::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
end
def check_multithread_status!
return if @multithread
if Thread.current[:memcache_client] != self.object_id
raise MemCacheError, <<-EOM
You are accessing this memcache-client instance from multiple threads but have not enabled multithread support.
Normally: MemCache.new(['localhost:11211'], :multithread => true)
In Rails: config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }]
EOM
end
end
##
# This class represents a memcached server instance.
......@@ -689,13 +719,6 @@ class Server
CONNECT_TIMEOUT = 0.25
##
# The amount of time to wait for a response from a memcached server.
# If a response isn't received within this time limit,
# the server will be marked as down.
SOCKET_TIMEOUT = 0.5
##
# The amount of time to wait before attempting to re-establish a
# connection with a server that is marked dead.
......@@ -727,6 +750,8 @@ class Server
attr_reader :status
attr_reader :logger
##
# Create a new MemCache::Server object for the memcached instance
# listening on the given host and port, weighted by the given weight.
......@@ -735,17 +760,15 @@ def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT)
raise ArgumentError, "No host specified" if host.nil? or host.empty?
raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero?
@memcache = memcache
@host = host
@port = port.to_i
@weight = weight.to_i
@multithread = @memcache.multithread
@mutex = Mutex.new
@sock = nil
@retry = nil
@status = 'NOT CONNECTED'
@timeout = memcache.timeout
@logger = memcache.logger
end
##
......@@ -770,7 +793,6 @@ def alive?
# Returns the connected socket object on success or nil on failure.
def socket
@mutex.lock if @multithread
return @sock if @sock and not @sock.closed?
@sock = nil
......@@ -780,8 +802,7 @@ def socket
# Attempt to connect if not already connected.
begin
@sock = TCPTimeoutSocket.new @host, @port
@sock = @timeout ? TCPTimeoutSocket.new(@host, @port, @timeout) : TCPSocket.new(@host, @port)
if Socket.constants.include? 'TCP_NODELAY' then
@sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
......@@ -789,12 +810,11 @@ def socket
@retry = nil
@status = 'CONNECTED'
rescue SocketError, SystemCallError, IOError, Timeout::Error => err
mark_dead err.message
logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
mark_dead err
end
return @sock
ensure
@mutex.unlock if @multithread
end
##
......@@ -802,24 +822,23 @@ def socket
# object. The server is not considered dead.
def close
@mutex.lock if @multithread
@sock.close if @sock && !@sock.closed?
@sock = nil
@retry = nil
@status = "NOT CONNECTED"
ensure
@mutex.unlock if @multithread
end
##
# Mark the server as dead and close its socket.
def mark_dead(reason = "Unknown error")
def mark_dead(error)
@sock.close if @sock && !@sock.closed?
@sock = nil
@retry = Time.now + RETRY_DELAY
@status = sprintf "%s:%s DEAD: %s, will retry at %s", @host, @port, reason, @retry
reason = "#{error.class.name}: #{error.message}"
@status = sprintf "%s:%s DEAD (%s), will retry at %s", @host, @port, reason, @retry
@logger.info { @status } if @logger
end
end
......@@ -833,36 +852,84 @@ class MemCacheError < RuntimeError; end
# TCPSocket facade class which implements timeouts.
class TCPTimeoutSocket
def initialize(*args)
def initialize(host, port, timeout)
Timeout::timeout(MemCache::Server::CONNECT_TIMEOUT, SocketError) do
@sock = TCPSocket.new(*args)
@len = MemCache::Server::SOCKET_TIMEOUT.to_f || 0.5
@sock = TCPSocket.new(host, port)
@len = timeout
end
end
def write(*args)
Timeout::timeout(@len, SocketError) do
@sock.write(*args)
end
end
def gets(*args)
Timeout::timeout(@len, SocketError) do
@sock.gets(*args)
end
end
def read(*args)
Timeout::timeout(@len, SocketError) do
@sock.read(*args)
end
end
def _socket
@sock
end
def method_missing(meth, *args)
@sock.__send__(meth, *args)
end
end
\ No newline at end of file
def closed?
@sock.closed?
end
def close
@sock.close
end
end
module Continuum
POINTS_PER_SERVER = 160 # this is the default in libmemcached
# Find the closest index in Continuum with value <= the given value
def self.binary_search(ary, value, &block)
upper = ary.size - 1
lower = 0
idx = 0
while(lower <= upper) do
idx = (lower + upper) / 2
comp = ary[idx].value <=> value
if comp == 0
return idx
elsif comp > 0
upper = idx - 1
else
lower = idx + 1
end
end
return upper
end
class Entry
attr_reader :value
attr_reader :server
def initialize(val, srv)
@value = val
@server = srv
end
def inspect
"<#{value}, #{server.host}:#{server.port}>"
end
end
end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册