提交 185c93eb 编写于 作者: M Matthew Draper

Use AS::Executor / AS::Reloader to support reloading in ActionCable

上级 d3c9d808
* Safely support autoloading and class unloading, by preventing concurrent
loads, and disconnecting all cables during reload.
*Matthew Draper*
* Ensure ActionCable behaves correctly for non-string queue names.
*Jay Hayes*
......
......@@ -48,12 +48,13 @@ class Base
include InternalChannel
include Authorization
attr_reader :server, :env, :subscriptions, :logger
delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
delegate :stream_event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
......
......@@ -51,5 +51,30 @@ class Railtie < Rails::Engine # :nodoc:
end
end
end
initializer "action_cable.set_work_hooks" do |app|
ActiveSupport.on_load(:action_cable) do
ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
app.executor.wrap do
# If we took a while to get the lock, we may have been halted
# in the meantime. As we haven't started doing any real work
# yet, we should pretend that we never made it off the queue.
unless stopping?
inner.call
end
end
end
wrap = lambda do |_, inner|
app.executor.wrap(&inner)
end
ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap
app.reloader.before_class_unload do
ActionCable.server.restart
end
end
end
end
end
......@@ -33,6 +33,16 @@ def disconnect(identifiers)
remote_connections.where(identifiers).disconnect
end
def restart
connections.each(&:close)
@mutex.synchronize do
worker_pool.halt if @worker_pool
@worker_pool = nil
end
end
# Gateway to RemoteConnections. See that class for details.
def remote_connections
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
......
......@@ -20,6 +20,26 @@ def initialize(max_size: 5)
)
end
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
@pool.kill
end
def stopping?
@pool.shuttingdown?
end
def work(connection)
self.connection = connection
run_callbacks :work do
yield
end
ensure
self.connection = nil
end
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
......@@ -27,19 +47,15 @@ def async_invoke(receiver, method, *args)
end
def invoke(receiver, method, *args)
begin
self.connection = receiver
run_callbacks :work do
work(receiver) do
begin
receiver.send method, *args
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
ensure
self.connection = nil
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
end
end
......@@ -50,14 +66,8 @@ def async_run_periodic_timer(channel, callback)
end
def run_periodic_timer(channel, callback)
begin
self.connection = channel.connection
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
ensure
self.connection = nil
work(channel.connection) do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
......
module ActionCable
module Server
class Worker
# Clear active connections between units of work so that way long-running channels or connection processes do not hoard connections.
module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
......@@ -13,8 +12,6 @@ module ActiveRecordConnectionManagement
def with_database_connections
connection.logger.tag(ActiveRecord::Base.logger) { yield }
ensure
ActiveRecord::Base.clear_active_connections!
end
end
end
......
......@@ -127,8 +127,16 @@ def close
end
@ws.close
wait_for_close
end
def wait_for_close
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
end
def closed?
@closed.set?
end
end
def faye_client(port)
......@@ -220,4 +228,16 @@ def test_unsubscribe_client
assert_equal(0, app.connections.count)
end
end
def test_server_restart
with_puma_server do |port|
c = faye_client(port)
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
ActionCable.server.restart
c.wait_for_close
assert c.closed?
end
end
end
......@@ -17,4 +17,8 @@ def pubsub
def stream_event_loop
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
end
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.new(max_size: 5)
end
end
# This file is used by Rack-based servers to start the application.
require ::File.expand_path('../config/environment', __FILE__)
<%- unless options[:skip_action_cable] -%>
# Action Cable requires that all classes are loaded in advance
Rails.application.eager_load!
<%- end -%>
run Rails.application
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册