提交 dc407392 编写于 作者: F fatkodima 提交者: fatkodima

Add support for connection pooling on RedisCacheStore

上级 2417f3c5
......@@ -52,7 +52,7 @@ end
gem "dalli", ">= 2.2.1"
gem "listen", ">= 3.0.5", "< 3.2", require: false
gem "libxml-ruby", platforms: :ruby
gem "connection_pool"
gem "connection_pool", require: false
# for railties app_generator_test
gem "bootsnap", ">= 1.1.0", require: false
......
* Add support for connection pooling on RedisCacheStore.
*fatkodima*
Please check [5-2-stable](https://github.com/rails/rails/blob/5-2-stable/activesupport/CHANGELOG.md) for previous changes.
......@@ -160,6 +160,23 @@ class Store
attr_reader :silence, :options
alias :silence? :silence
class << self
private
def retrieve_pool_options(options)
{}.tap do |pool_options|
pool_options[:size] = options.delete(:pool_size) if options[:pool_size]
pool_options[:timeout] = options.delete(:pool_timeout) if options[:pool_timeout]
end
end
def ensure_connection_pool_added!
require "connection_pool"
rescue LoadError => e
$stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install"
raise e
end
end
# Creates a new cache. The options will be passed to any write method calls
# except for <tt>:namespace</tt> which can be used to set the global
# namespace for the cache.
......
......@@ -63,21 +63,12 @@ def self.build_mem_cache(*addresses) # :nodoc:
addresses = addresses.flatten
options = addresses.extract_options!
addresses = ["localhost:11211"] if addresses.empty?
pool_options = {}
pool_options[:size] = options[:pool_size] if options[:pool_size]
pool_options[:timeout] = options[:pool_timeout] if options[:pool_timeout]
pool_options = retrieve_pool_options(options)
if pool_options.empty?
Dalli::Client.new(addresses, options)
else
begin
require "connection_pool"
rescue LoadError => e
$stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install"
raise e
end
ensure_connection_pool_added!
ConnectionPool.new(pool_options) { Dalli::Client.new(addresses, options.merge(threadsafe: false)) }
end
end
......
......@@ -20,6 +20,15 @@
module ActiveSupport
module Cache
module ConnectionPoolLike
def with
yield self
end
end
::Redis.include(ConnectionPoolLike)
::Redis::Distributed.include(ConnectionPoolLike)
# Redis cache store.
#
# Deployment note: Take care to use a *dedicated Redis cache* rather
......@@ -172,7 +181,16 @@ def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, e
end
def redis
@redis ||= self.class.build_redis(**redis_options)
@redis ||= begin
pool_options = self.class.send(:retrieve_pool_options, redis_options)
if pool_options.any?
self.class.send(:ensure_connection_pool_added!)
::ConnectionPool.new(pool_options) { self.class.build_redis(**redis_options) }
else
self.class.build_redis(**redis_options)
end
end
end
def inspect
......@@ -211,7 +229,7 @@ def delete_matched(matcher, options = nil)
instrument :delete_matched, matcher do
case matcher
when String
redis.eval DELETE_GLOB_LUA, [], [namespace_key(matcher, options)]
redis.with { |c| c.eval DELETE_GLOB_LUA, [], [namespace_key(matcher, options)] }
else
raise ArgumentError, "Only Redis glob strings are supported: #{matcher.inspect}"
end
......@@ -229,7 +247,7 @@ def delete_matched(matcher, options = nil)
def increment(name, amount = 1, options = nil)
instrument :increment, name, amount: amount do
failsafe :increment do
redis.incrby normalize_key(name, options), amount
redis.with { |c| c.incrby normalize_key(name, options), amount }
end
end
end
......@@ -245,7 +263,7 @@ def increment(name, amount = 1, options = nil)
def decrement(name, amount = 1, options = nil)
instrument :decrement, name, amount: amount do
failsafe :decrement do
redis.decrby normalize_key(name, options), amount
redis.with { |c| c.decrby normalize_key(name, options), amount }
end
end
end
......@@ -267,7 +285,7 @@ def clear(options = nil)
if namespace = merged_options(options)[namespace]
delete_matched "*", namespace: namespace
else
redis.flushdb
redis.with { |c| c.flushdb }
end
end
end
......@@ -298,7 +316,7 @@ def set_redis_capabilities
# Read an entry from the cache.
def read_entry(key, options = nil)
failsafe :read_entry do
deserialize_entry redis.get(key)
deserialize_entry redis.with { |c| c.get(key) }
end
end
......@@ -309,7 +327,7 @@ def read_multi_mget(*names)
keys = names.map { |name| normalize_key(name, options) }
values = failsafe(:read_multi_mget, returning: {}) do
redis.mget(*keys)
redis.with { |c| c.mget(*keys) }
end
names.zip(values).each_with_object({}) do |(name, value), results|
......@@ -341,9 +359,9 @@ def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, ra
modifiers[:nx] = unless_exist
modifiers[:px] = (1000 * expires_in.to_f).ceil if expires_in
redis.set key, value, modifiers
redis.with { |c| c.set key, value, modifiers }
else
redis.set key, value
redis.with { |c| c.set key, value }
end
end
end
......@@ -351,7 +369,7 @@ def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, ra
# Delete an entry from the cache.
def delete_entry(key, options)
failsafe :delete_entry, returning: false do
redis.del key
redis.with { |c| c.del key }
end
end
......@@ -360,7 +378,7 @@ def write_multi_entries(entries, expires_in: nil, **options)
if entries.any?
if mset_capable? && expires_in.nil?
failsafe :write_multi_entries do
redis.mapped_mset(entries)
redis.with { |c| c.mapped_mset(entries) }
end
else
super
......
......@@ -6,7 +6,7 @@ def test_connection_pool
emulating_latency do
begin
cache = ActiveSupport::Cache.lookup_store(store, pool_size: 2, pool_timeout: 1)
cache = ActiveSupport::Cache.lookup_store(store, { pool_size: 2, pool_timeout: 1 }.merge(store_options))
cache.clear
threads = []
......@@ -33,7 +33,7 @@ def test_connection_pool
def test_no_connection_pool
emulating_latency do
begin
cache = ActiveSupport::Cache.lookup_store(store)
cache = ActiveSupport::Cache.lookup_store(store, store_options)
cache.clear
threads = []
......@@ -54,4 +54,7 @@ def test_no_connection_pool
end
end
end
private
def store_options; {}; end
end
......@@ -5,6 +5,24 @@
require "active_support/cache/redis_cache_store"
require_relative "../behaviors"
driver_name = %w[ ruby hiredis ].include?(ENV["REDIS_DRIVER"]) ? ENV["REDIS_DRIVER"] : "hiredis"
driver = Object.const_get("Redis::Connection::#{driver_name.camelize}")
Redis::Connection.drivers.clear
Redis::Connection.drivers.append(driver)
# Emulates a latency on Redis's back-end for the key latency to facilitate
# connection pool testing.
class SlowRedis < Redis
def get(key, options = {})
if key =~ /latency/
sleep 3
else
super
end
end
end
module ActiveSupport::Cache::RedisCacheStoreTests
DRIVER = %w[ ruby hiredis ].include?(ENV["REDIS_DRIVER"]) ? ENV["REDIS_DRIVER"] : "hiredis"
......@@ -110,6 +128,33 @@ class RedisCacheStoreCommonBehaviorTest < StoreTest
include AutoloadingCacheBehavior
end
class ConnectionPoolBehaviourTest < StoreTest
include ConnectionPoolBehavior
private
def store
:redis_cache_store
end
def emulating_latency
old_redis = Object.send(:remove_const, :Redis)
Object.const_set(:Redis, SlowRedis)
yield
ensure
Object.send(:remove_const, :Redis)
Object.const_set(:Redis, old_redis)
end
end
class RedisDistributedConnectionPoolBehaviourTest < ConnectionPoolBehaviourTest
private
def store_options
{ url: %w[ redis://localhost:6379/0 redis://localhost:6379/0 ] }
end
end
# Separate test class so we can omit the namespace which causes expected,
# appropriate complaints about incompatible string encodings.
class KeyEncodingSafetyTest < StoreTest
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册