提交 0016e041 编写于 作者: J Jon Moss

Adapterize ActionCable storage and extract behavior

上级 75f1b229
......@@ -47,4 +47,5 @@ module ActionCable
autoload :Connection
autoload :Channel
autoload :RemoteConnections
autoload :StorageAdapter
end
......@@ -133,8 +133,8 @@ def initialize(connection, identifier, params = {})
@identifier = identifier
@params = params
# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
@defer_subscription_confirmation = false
@reject_subscription = nil
......
......@@ -60,7 +60,7 @@ def initialize(server, env)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@_internal_redis_subscriptions = nil
@_internal_subscriptions = nil
@started_at = Time.now
end
......
......@@ -5,24 +5,24 @@ module InternalChannel
extend ActiveSupport::Concern
private
def internal_redis_channel
def internal_channel
"action_cable/#{connection_identifier}"
end
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
@_internal_redis_subscriptions ||= []
@_internal_redis_subscriptions << [ internal_redis_channel, callback ]
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
EM.next_tick { pubsub.subscribe(internal_channel, &callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_redis_subscriptions.present?
@_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
if @_internal_subscriptions.present?
@_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
end
end
......
......@@ -24,11 +24,11 @@ class Railtie < Rails::Engine # :nodoc:
options = app.config.action_cable
options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
app.paths.add "config/redis/cable", with: "config/redis/cable.yml"
app.paths.add "config/cable", with: "config/cable.yml"
ActiveSupport.on_load(:action_cable) do
if (redis_cable_path = Pathname.new(app.config.paths["config/redis/cable"].first)).exist?
self.redis = Rails.application.config_for(redis_cable_path).with_indifferent_access
if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist?
self.config_opts = Rails.application.config_for(config_path).with_indifferent_access
end
options.each { |k,v| send("#{k}=", v) }
......
......@@ -39,7 +39,7 @@ def initialize(server, ids)
# Uses the internal channel to disconnect the connection.
def disconnect
server.broadcast internal_redis_channel, type: 'disconnect'
server.broadcast internal_channel, type: 'disconnect'
end
# Returns all the identifiers that were applied to this connection.
......
require 'em-hiredis'
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
......@@ -47,20 +45,9 @@ def channel_classes
end
end
# The redis pubsub adapter used for all streams/broadcasting.
# The pubsub adapter used for all streams/broadcasting.
def pubsub
@pubsub ||= redis.pubsub
end
# The EventMachine Redis instance used by the pubsub adapter.
def redis
@redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
redis.on(:reconnect_failed) do
logger.info "[ActionCable] Redis reconnect failed."
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
# @connections.map &:close
end
end
@pubsub ||= config.storage_adapter.new(self).pubsub
end
# All the identifiers applied to the connection class associated with this server.
......
require 'redis'
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
......@@ -31,11 +29,6 @@ def broadcaster_for(broadcasting)
Broadcaster.new(self, broadcasting)
end
# The redis instance used for broadcasting. Not intended for direct user use.
def broadcasting_redis
@broadcasting_redis ||= Redis.new(config.redis)
end
private
class Broadcaster
attr_reader :server, :broadcasting
......@@ -46,7 +39,8 @@ def initialize(server, broadcasting)
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
broadcast_storage_adapter = server.config.storage_adapter.new(server).broadcast
broadcast_storage_adapter.publish broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
......
......@@ -5,9 +5,9 @@ module Server
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
attr_accessor :redis, :channels_path
attr_accessor :channels_path
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :url
attr_accessor :config_opts, :url
def initialize
@log_tags = []
......@@ -29,6 +29,22 @@ def channel_class_names
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
end
end
ADAPTER = ActionCable::StorageAdapter
# Returns constant of storage adapter specified in config/cable.yml
# If the adapter cannot be found, this will default to the Redis adapter
def storage_adapter
# "ActionCable::StorageAdapter::#{adapter.capitalize}"
adapter = config_opts['adapter']
adapter_const = "ActionCable::StorageAdapter::#{adapter.capitalize}"
if Object.const_defined?(adapter_const)
adapter_const.constantize
else
ADAPTER_BASE::Redis
end
end
end
end
end
......
module ActionCable
module StorageAdapter
autoload :Base, 'action_cable/storage_adapter/base'
autoload :Redis, 'action_cable/storage_adapter/redis'
end
end
module ActionCable
module StorageAdapter
class Base
attr_reader :logger, :server
def initialize(server)
@server = server
@logger = @server.logger
end
# Storage connection instance used for broadcasting. Not intended for direct user use.
def broadcast
raise NotImplementedError
end
# Storage connection instance used for pubsub.
def pubsub
raise NotImplementedError
end
end
end
end
require 'em-hiredis'
require 'redis'
module ActionCable
module StorageAdapter
class Redis < Base
# The redis instance used for broadcasting. Not intended for direct user use.
def broadcast
@broadcast ||= ::Redis.new(@server.config.config_opts)
end
def pubsub
redis.pubsub
end
private
# The EventMachine Redis instance used by the pubsub adapter.
def redis
@redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
# @connections.map &:close
end
end
end
end
end
end
require 'test_helper'
require 'stubs/test_server'
class ActionCable::StorageAdapter::BaseTest < ActionCable::TestCase
## TEST THAT ERRORS ARE RETURNED FOR INHERITORS THAT DON'T OVERRIDE METHODS
class BrokenAdapter < ActionCable::StorageAdapter::Base
end
setup do
@server = TestServer.new
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
end
test "#broadcast returns NotImplementedError by default" do
assert_raises NotImplementedError do
BrokenAdapter.new(@server).broadcast
end
end
test "#pubsub returns NotImplementedError by default" do
assert_raises NotImplementedError do
BrokenAdapter.new(@server).pubsub
end
end
# TEST METHODS THAT ARE REQUIRED OF THE ADAPTER'S BACKEND STORAGE OBJECT
class SuccessAdapterBackend
def publish(channel, message)
end
def subscribe(*channels, &block)
end
def unsubscribe(*channels, &block)
end
end
class SuccessAdapter < ActionCable::StorageAdapter::Base
def broadcast
SuccessAdapterBackend.new
end
def pubsub
SuccessAdapterBackend.new
end
end
test "#broadcast responds to #publish" do
broadcast = SuccessAdapter.new(@server).broadcast
assert_respond_to(broadcast, :publish)
end
test "#pubsub responds to #subscribe" do
pubsub = SuccessAdapter.new(@server).pubsub
assert_respond_to(pubsub, :subscribe)
end
test "#pubsub responds to #unsubscribe" do
pubsub = SuccessAdapter.new(@server).pubsub
assert_respond_to(pubsub, :unsubscribe)
end
end
......@@ -78,11 +78,11 @@ def config
template "application.rb"
template "environment.rb"
template "secrets.yml"
template "cable.yml" unless options[:skip_action_cable]
directory "environments"
directory "initializers"
directory "locales"
directory "redis" unless options[:skip_action_cable]
end
end
......@@ -315,7 +315,7 @@ def delete_active_record_initializers_skipping_active_record
def delete_action_cable_files_skipping_action_cable
if options[:skip_action_cable]
remove_file 'config/redis/cable.yml'
remove_file 'config/cable.yml'
remove_file 'app/assets/javascripts/cable.coffee'
remove_dir 'app/channels'
end
......
# Action Cable uses Redis to administer connections, channels, and sending/receiving messages over the WebSocket.
# Action Cable uses Redis by default to administer connections, channels, and sending/receiving messages over the WebSocket.
production:
adapter: redis
url: redis://localhost:6379/1
development:
adapter: redis
url: redis://localhost:6379/2
test:
adapter: redis
url: redis://localhost:6379/3
......@@ -392,7 +392,7 @@ def test_generator_if_skip_sprockets_is_given
def test_generator_if_skip_action_cable_is_given
run_generator [destination_root, "--skip-action-cable"]
assert_file "config/application.rb", /#\s+require\s+["']action_cable\/engine["']/
assert_no_file "config/redis/cable.yml"
assert_no_file "config/cable.yml"
assert_no_file "app/assets/javascripts/cable.coffee"
assert_no_file "app/channels"
end
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册