提交 ccfc2d63 编写于 作者: R Rafael Mendonça França

Merge pull request #31424 from sobrinho/sobrinho/connection-pool-memcache

Support for connection pooling on mem cache store
......@@ -52,6 +52,7 @@ end
gem "dalli", ">= 2.2.1"
gem "listen", ">= 3.0.5", "< 3.2", require: false
gem "libxml-ruby", platforms: :ruby
gem "connection_pool"
# for railties app_generator_test
gem "bootsnap", ">= 1.1.0", require: false
......
......@@ -496,8 +496,6 @@ GEM
websocket (1.2.4)
websocket-driver (0.6.5)
websocket-extensions (>= 0.1.0)
websocket-driver (0.6.5-java)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.2)
xpath (2.1.0)
nokogiri (~> 1.3)
......@@ -524,6 +522,7 @@ DEPENDENCIES
capybara (~> 2.15)
chromedriver-helper
coffee-rails
connection_pool
dalli (>= 2.2.1)
delayed_job
delayed_job_active_record
......
......@@ -63,7 +63,23 @@ def self.build_mem_cache(*addresses) # :nodoc:
addresses = addresses.flatten
options = addresses.extract_options!
addresses = ["localhost:11211"] if addresses.empty?
Dalli::Client.new(addresses, options)
pool_options = {}
pool_options[:size] = options[:pool_size] if options[:pool_size]
pool_options[:timeout] = options[:pool_timeout] if options[:pool_timeout]
if pool_options.empty?
Dalli::Client.new(addresses, options)
else
begin
require "connection_pool"
rescue LoadError => e
$stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install"
raise e
end
ConnectionPool.new(pool_options) { Dalli::Client.new(addresses, options.merge(threadsafe: false)) }
end
end
# Creates a new MemCacheStore object, with the given memcached server
......@@ -99,7 +115,7 @@ def increment(name, amount = 1, options = nil)
options = merged_options(options)
instrument(:increment, name, amount: amount) do
rescue_error_with nil do
@data.incr(normalize_key(name, options), amount, options[:expires_in])
@data.with { |c| c.incr(normalize_key(name, options), amount, options[:expires_in]) }
end
end
end
......@@ -112,7 +128,7 @@ def decrement(name, amount = 1, options = nil)
options = merged_options(options)
instrument(:decrement, name, amount: amount) do
rescue_error_with nil do
@data.decr(normalize_key(name, options), amount, options[:expires_in])
@data.with { |c| c.decr(normalize_key(name, options), amount, options[:expires_in]) }
end
end
end
......@@ -120,18 +136,18 @@ def decrement(name, amount = 1, options = nil)
# Clear the entire cache on all memcached servers. This method should
# be used with care when shared cache is being used.
def clear(options = nil)
rescue_error_with(nil) { @data.flush_all }
rescue_error_with(nil) { @data.with { |c| c.flush_all } }
end
# Get the statistics from the memcached servers.
def stats
@data.stats
@data.with { |c| c.stats }
end
private
# Read an entry from the cache.
def read_entry(key, options)
rescue_error_with(nil) { deserialize_entry(@data.get(key, options)) }
rescue_error_with(nil) { deserialize_entry(@data.with { |c| c.get(key, options) }) }
end
# Write an entry to the cache.
......@@ -144,7 +160,7 @@ def write_entry(key, entry, options)
expires_in += 5.minutes
end
rescue_error_with false do
@data.send(method, key, value, expires_in, options)
@data.with { |c| c.send(method, key, value, expires_in, options) }
end
end
......@@ -152,7 +168,7 @@ def write_entry(key, entry, options)
def read_multi_entries(names, options)
keys_to_names = Hash[names.map { |name| [normalize_key(name, options), name] }]
raw_values = @data.get_multi(keys_to_names.keys)
raw_values = @data.with { |c| c.get_multi(keys_to_names.keys) }
values = {}
raw_values.each do |key, value|
......@@ -168,7 +184,7 @@ def read_multi_entries(names, options)
# Delete an entry from the cache.
def delete_entry(key, options)
rescue_error_with(false) { @data.delete(key) }
rescue_error_with(false) { @data.with { |c| c.delete(key) } }
end
# Memcache keys are binaries. So we need to force their encoding to binary
......
......@@ -5,6 +5,18 @@
require_relative "../behaviors"
require "dalli"
# Emulates a latency on Dalli's back-end for the key latency to facilitate
# connection pool testing.
class SlowDalliClient < Dalli::Client
def get(key, options = {})
if key =~ /latency/
sleep 3
else
super
end
end
end
class MemCacheStoreTest < ActiveSupport::TestCase
begin
ss = Dalli::Client.new("localhost:11211").stats
......@@ -34,6 +46,56 @@ def setup
include EncodedKeyCacheBehavior
include AutoloadingCacheBehavior
def test_connection_pool
emulating_latency do
begin
cache = ActiveSupport::Cache.lookup_store(:mem_cache_store, pool_size: 2, pool_timeout: 1)
cache.clear
threads = []
assert_raises Timeout::Error do
# One of the three threads will fail in 1 second because our pool size
# is only two.
3.times do
threads << Thread.new do
cache.read("latency")
end
end
threads.each(&:join)
end
ensure
threads.each(&:kill)
end
end
end
def test_no_connection_pool
emulating_latency do
begin
cache = ActiveSupport::Cache.lookup_store(:mem_cache_store)
cache.clear
threads = []
assert_nothing_raised do
# Default connection pool size is 5, assuming 10 will make sure that
# the connection pool isn't used at all.
10.times do
threads << Thread.new do
cache.read("latency")
end
end
threads.each(&:join)
end
ensure
threads.each(&:kill)
end
end
end
def test_raw_values
cache = ActiveSupport::Cache.lookup_store(:mem_cache_store, raw: true)
cache.clear
......@@ -89,4 +151,16 @@ def test_read_should_return_a_different_object_id_each_time_it_is_called
value << "bingo"
assert_not_equal value, @cache.read("foo")
end
private
def emulating_latency
old_client = Dalli.send(:remove_const, :Client)
Dalli.const_set(:Client, SlowDalliClient)
yield
ensure
Dalli.send(:remove_const, :Client)
Dalli.const_set(:Client, old_client)
end
end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册