提交 541e4abb 编写于 作者: M Matthew Draper

Merge pull request #23807 from matthewd/executor

Publish AS::Executor and AS::Reloader APIs
* Safely support autoloading and class unloading, by preventing concurrent
loads, and disconnecting all cables during reload.
*Matthew Draper*
* Ensure ActionCable behaves correctly for non-string queue names.
*Jay Hayes*
......
......@@ -48,12 +48,13 @@ class Base
include InternalChannel
include Authorization
attr_reader :server, :env, :subscriptions, :logger
delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
delegate :stream_event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
......
......@@ -51,5 +51,30 @@ class Railtie < Rails::Engine # :nodoc:
end
end
end
initializer "action_cable.set_work_hooks" do |app|
ActiveSupport.on_load(:action_cable) do
ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
app.executor.wrap do
# If we took a while to get the lock, we may have been halted
# in the meantime. As we haven't started doing any real work
# yet, we should pretend that we never made it off the queue.
unless stopping?
inner.call
end
end
end
wrap = lambda do |_, inner|
app.executor.wrap(&inner)
end
ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap
app.reloader.before_class_unload do
ActionCable.server.restart
end
end
end
end
end
......@@ -33,6 +33,16 @@ def disconnect(identifiers)
remote_connections.where(identifiers).disconnect
end
def restart
connections.each(&:close)
@mutex.synchronize do
worker_pool.halt if @worker_pool
@worker_pool = nil
end
end
# Gateway to RemoteConnections. See that class for details.
def remote_connections
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
......
......@@ -20,6 +20,26 @@ def initialize(max_size: 5)
)
end
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
@pool.kill
end
def stopping?
@pool.shuttingdown?
end
def work(connection)
self.connection = connection
run_callbacks :work do
yield
end
ensure
self.connection = nil
end
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
......@@ -27,19 +47,15 @@ def async_invoke(receiver, method, *args)
end
def invoke(receiver, method, *args)
begin
self.connection = receiver
run_callbacks :work do
work(receiver) do
begin
receiver.send method, *args
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
ensure
self.connection = nil
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
end
end
......@@ -50,14 +66,8 @@ def async_run_periodic_timer(channel, callback)
end
def run_periodic_timer(channel, callback)
begin
self.connection = channel.connection
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
ensure
self.connection = nil
work(channel.connection) do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
......
module ActionCable
module Server
class Worker
# Clear active connections between units of work so that way long-running channels or connection processes do not hoard connections.
module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
......@@ -13,8 +12,6 @@ module ActiveRecordConnectionManagement
def with_database_connections
connection.logger.tag(ActiveRecord::Base.logger) { yield }
ensure
ActiveRecord::Base.clear_active_connections!
end
end
end
......
......@@ -127,8 +127,16 @@ def close
end
@ws.close
wait_for_close
end
def wait_for_close
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
end
def closed?
@closed.set?
end
end
def faye_client(port)
......@@ -220,4 +228,16 @@ def test_unsubscribe_client
assert_equal(0, app.connections.count)
end
end
def test_server_restart
with_puma_server do |port|
c = faye_client(port)
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
ActionCable.server.restart
c.wait_for_close
assert c.closed?
end
end
end
......@@ -17,4 +17,8 @@ def pubsub
def stream_event_loop
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
end
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.new(max_size: 5)
end
end
......@@ -51,8 +51,8 @@ class IllegalStateError < StandardError
autoload :Cookies
autoload :DebugExceptions
autoload :ExceptionWrapper
autoload :Executor
autoload :Flash
autoload :LoadInterlock
autoload :ParamsParser
autoload :PublicExceptions
autoload :Reloader
......
......@@ -7,7 +7,16 @@ class Callbacks
define_callbacks :call
class << self
delegate :to_prepare, :to_cleanup, :to => "ActionDispatch::Reloader"
def to_prepare(*args, &block)
ActiveSupport::Reloader.to_prepare(*args, &block)
end
def to_cleanup(*args, &block)
ActiveSupport::Reloader.to_complete(*args, &block)
end
deprecate to_prepare: 'use ActiveSupport::Reloader.to_prepare instead',
to_cleanup: 'use ActiveSupport::Reloader.to_complete instead'
def before(*args, &block)
set_callback(:call, :before, *args, &block)
......
require 'rack/body_proxy'
module ActionDispatch
class Executor
def initialize(app, executor)
@app, @executor = app, executor
end
def call(env)
state = @executor.run!
begin
response = @app.call(env)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
ensure
state.complete! unless returned
end
end
end
end
require 'active_support/dependencies'
require 'rack/body_proxy'
module ActionDispatch
class LoadInterlock
def initialize(app)
@app = app
end
def call(env)
interlock = ActiveSupport::Dependencies.interlock
interlock.start_running
response = @app.call(env)
body = Rack::BodyProxy.new(response[2]) { interlock.done_running }
response[2] = body
response
ensure
interlock.done_running unless body
end
end
end
......@@ -23,74 +23,32 @@ module ActionDispatch
# middleware stack, but are executed only when <tt>ActionDispatch::Reloader.prepare!</tt>
# or <tt>ActionDispatch::Reloader.cleanup!</tt> are called manually.
#
class Reloader
include ActiveSupport::Callbacks
include ActiveSupport::Deprecation::Reporting
define_callbacks :prepare
define_callbacks :cleanup
# Add a prepare callback. Prepare callbacks are run before each request, prior
# to ActionDispatch::Callback's before callbacks.
class Reloader < Executor
def self.to_prepare(*args, &block)
unless block_given?
warn "to_prepare without a block is deprecated. Please use a block"
end
set_callback(:prepare, *args, &block)
ActiveSupport::Reloader.to_prepare(*args, &block)
end
# Add a cleanup callback. Cleanup callbacks are run after each request is
# complete (after #close is called on the response body).
def self.to_cleanup(*args, &block)
unless block_given?
warn "to_cleanup without a block is deprecated. Please use a block"
end
set_callback(:cleanup, *args, &block)
ActiveSupport::Reloader.to_complete(*args, &block)
end
# Execute all prepare callbacks.
def self.prepare!
new(nil).prepare!
default_reloader.prepare!
end
# Execute all cleanup callbacks.
def self.cleanup!
new(nil).cleanup!
end
def initialize(app, condition=nil)
@app = app
@condition = condition || lambda { true }
@validated = true
default_reloader.reload!
end
def call(env)
@validated = @condition.call
prepare!
response = @app.call(env)
response[2] = ::Rack::BodyProxy.new(response[2]) { cleanup! }
class << self
attr_accessor :default_reloader # :nodoc:
response
rescue Exception
cleanup!
raise
deprecate to_prepare: 'use ActiveSupport::Reloader.to_prepare instead',
to_cleanup: 'use ActiveSupport::Reloader.to_complete instead',
prepare!: 'use Rails.application.reloader.prepare! instead',
cleanup!: 'use Rails.application.reloader.reload! instead of cleanup + prepare'
end
def prepare! #:nodoc:
run_callbacks :prepare if validated?
end
def cleanup! #:nodoc:
run_callbacks :cleanup if validated?
ensure
@validated = true
end
private
def validated? #:nodoc:
@validated
end
self.default_reloader = ActiveSupport::Reloader
end
end
......@@ -39,6 +39,8 @@ class Railtie < Rails::Railtie # :nodoc:
config.action_dispatch.always_write_cookie = Rails.env.development? if config.action_dispatch.always_write_cookie.nil?
ActionDispatch::Cookies::CookieJar.always_write_cookie = config.action_dispatch.always_write_cookie
ActionDispatch::Reloader.default_reloader = app.reloader
ActionDispatch.test_app = app
end
end
......
......@@ -455,17 +455,24 @@ module Runner
def before_setup # :nodoc:
@app = nil
@integration_session = nil
@execution_context = nil
super
end
def after_teardown # :nodoc:
remove!
super
end
def integration_session
@integration_session ||= create_session(app)
@integration_session ||= create_session(app).tap { @execution_context = app.respond_to?(:executor) && app.executor.run! }
end
# Reset the current session. This is useful for testing multiple sessions
# in a single test case.
def reset!
@integration_session = create_session(app)
remove!
integration_session
end
def create_session(app)
......@@ -481,6 +488,8 @@ def create_session(app)
end
def remove! # :nodoc:
@execution_context.complete! if @execution_context
@execution_context = nil
@integration_session = nil
end
......
......@@ -37,13 +37,19 @@ def test_before_and_after_callbacks
def test_to_prepare_and_cleanup_delegation
prepared = cleaned = false
ActionDispatch::Callbacks.to_prepare { prepared = true }
ActionDispatch::Callbacks.to_prepare { cleaned = true }
assert_deprecated do
ActionDispatch::Callbacks.to_prepare { prepared = true }
ActionDispatch::Callbacks.to_prepare { cleaned = true }
end
ActionDispatch::Reloader.prepare!
assert_deprecated do
ActionDispatch::Reloader.prepare!
end
assert prepared
ActionDispatch::Reloader.cleanup!
assert_deprecated do
ActionDispatch::Reloader.cleanup!
end
assert cleaned
end
......
require 'abstract_unit'
class ExecutorTest < ActiveSupport::TestCase
class MyBody < Array
def initialize(&block)
@on_close = block
end
def foo
"foo"
end
def bar
"bar"
end
def close
@on_close.call if @on_close
end
end
def test_returned_body_object_always_responds_to_close
body = call_and_return_body
assert_respond_to body, :close
end
def test_returned_body_object_always_responds_to_close_even_if_called_twice
body = call_and_return_body
assert_respond_to body, :close
body.close
body = call_and_return_body
assert_respond_to body, :close
body.close
end
def test_returned_body_object_behaves_like_underlying_object
body = call_and_return_body do
b = MyBody.new
b << "hello"
b << "world"
[200, { "Content-Type" => "text/html" }, b]
end
assert_equal 2, body.size
assert_equal "hello", body[0]
assert_equal "world", body[1]
assert_equal "foo", body.foo
assert_equal "bar", body.bar
end
def test_it_calls_close_on_underlying_object_when_close_is_called_on_body
close_called = false
body = call_and_return_body do
b = MyBody.new do
close_called = true
end
[200, { "Content-Type" => "text/html" }, b]
end
body.close
assert close_called
end
def test_returned_body_object_responds_to_all_methods_supported_by_underlying_object
body = call_and_return_body do
[200, { "Content-Type" => "text/html" }, MyBody.new]
end
assert_respond_to body, :size
assert_respond_to body, :each
assert_respond_to body, :foo
assert_respond_to body, :bar
end
def test_run_callbacks_are_called_before_close
running = false
executor.to_run { running = true }
body = call_and_return_body
assert running
running = false
body.close
assert !running
end
def test_complete_callbacks_are_called_on_close
completed = false
executor.to_complete { completed = true }
body = call_and_return_body
assert !completed
body.close
assert completed
end
def test_complete_callbacks_are_called_on_exceptions
completed = false
executor.to_complete { completed = true }
begin
call_and_return_body do
raise "error"
end
rescue
end
assert completed
end
def test_callbacks_execute_in_shared_context
result = false
executor.to_run { @in_shared_context = true }
executor.to_complete { result = @in_shared_context }
call_and_return_body.close
assert result
assert !defined?(@in_shared_context) # it's not in the test itself
end
private
def call_and_return_body(&block)
app = middleware(block || proc { [200, {}, 'response'] })
_, _, body = app.call({'rack.input' => StringIO.new('')})
body
end
def middleware(inner_app)
ActionDispatch::Executor.new(inner_app, executor)
end
def executor
@executor ||= Class.new(ActiveSupport::Executor)
end
end
......@@ -4,15 +4,17 @@ class ReloaderTest < ActiveSupport::TestCase
Reloader = ActionDispatch::Reloader
teardown do
Reloader.reset_callbacks :prepare
Reloader.reset_callbacks :cleanup
ActiveSupport::Reloader.reset_callbacks :prepare
ActiveSupport::Reloader.reset_callbacks :complete
end
def test_prepare_callbacks
a = b = c = nil
Reloader.to_prepare { |*args| a = b = c = 1 }
Reloader.to_prepare { |*args| b = c = 2 }
Reloader.to_prepare { |*args| c = 3 }
assert_deprecated do
Reloader.to_prepare { |*args| a = b = c = 1 }
Reloader.to_prepare { |*args| b = c = 2 }
Reloader.to_prepare { |*args| c = 3 }
end
# Ensure to_prepare callbacks are not run when defined
assert_nil a || b || c
......@@ -60,9 +62,15 @@ def test_returned_body_object_always_responds_to_close_even_if_called_twice
def test_condition_specifies_when_to_reload
i, j = 0, 0, 0, 0
Reloader.to_prepare { |*args| i += 1 }
Reloader.to_cleanup { |*args| j += 1 }
app = Reloader.new(lambda { |env| [200, {}, []] }, lambda { i < 3 })
assert_deprecated do
Reloader.to_prepare { |*args| i += 1 }
Reloader.to_cleanup { |*args| j += 1 }
end
x = Class.new(ActiveSupport::Reloader)
x.check = lambda { i < 3 }
app = Reloader.new(lambda { |env| [200, {}, []] }, x)
5.times do
resp = app.call({})
resp[2].close
......@@ -109,7 +117,9 @@ def test_returned_body_object_responds_to_all_methods_supported_by_underlying_ob
def test_cleanup_callbacks_are_called_when_body_is_closed
cleaned = false
Reloader.to_cleanup { cleaned = true }
assert_deprecated do
Reloader.to_cleanup { cleaned = true }
end
body = call_and_return_body
assert !cleaned
......@@ -120,7 +130,9 @@ def test_cleanup_callbacks_are_called_when_body_is_closed
def test_prepare_callbacks_arent_called_when_body_is_closed
prepared = false
Reloader.to_prepare { prepared = true }
assert_deprecated do
Reloader.to_prepare { prepared = true }
end
body = call_and_return_body
prepared = false
......@@ -131,31 +143,43 @@ def test_prepare_callbacks_arent_called_when_body_is_closed
def test_manual_reloading
prepared = cleaned = false
Reloader.to_prepare { prepared = true }
Reloader.to_cleanup { cleaned = true }
assert_deprecated do
Reloader.to_prepare { prepared = true }
Reloader.to_cleanup { cleaned = true }
end
Reloader.prepare!
assert_deprecated do
Reloader.prepare!
end
assert prepared
assert !cleaned
prepared = cleaned = false
Reloader.cleanup!
assert !prepared
assert_deprecated do
Reloader.cleanup!
end
assert prepared
assert cleaned
end
def test_prepend_prepare_callback
i = 10
Reloader.to_prepare { i += 1 }
Reloader.to_prepare(:prepend => true) { i = 0 }
assert_deprecated do
Reloader.to_prepare { i += 1 }
Reloader.to_prepare(:prepend => true) { i = 0 }
end
Reloader.prepare!
assert_deprecated do
Reloader.prepare!
end
assert_equal 1, i
end
def test_cleanup_callbacks_are_called_on_exceptions
cleaned = false
Reloader.to_cleanup { cleaned = true }
assert_deprecated do
Reloader.to_cleanup { cleaned = true }
end
begin
call_and_return_body do
......@@ -169,8 +193,11 @@ def test_cleanup_callbacks_are_called_on_exceptions
private
def call_and_return_body(&block)
x = Class.new(ActiveSupport::Reloader)
x.check = lambda { true }
@response ||= 'response'
@reloader ||= Reloader.new(block || proc {[200, {}, @response]})
@reloader ||= Reloader.new(block || proc {[200, {}, @response]}, x)
@reloader.call({'rack.input' => StringIO.new('')})[2]
end
end
* Enable class reloading prior to job dispatch, and ensure Active Record
connections are returned to the pool when jobs are run in separate threads.
*Matthew Draper*
* Tune the async adapter for low-footprint dev/test usage. Use a single
thread pool for all queues and limit to 0 to #CPU total threads, down from
2 to 10*#CPU per queue.
......
......@@ -17,6 +17,11 @@ module Callbacks
extend ActiveSupport::Concern
include ActiveSupport::Callbacks
class << self
include ActiveSupport::Callbacks
define_callbacks :execute
end
included do
define_callbacks :perform
define_callbacks :enqueue
......
......@@ -17,8 +17,10 @@ def perform_now(*args)
end
def execute(job_data) #:nodoc:
job = deserialize(job_data)
job.perform_now
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
end
......
......@@ -19,5 +19,14 @@ class Railtie < Rails::Railtie # :nodoc:
end
end
initializer "active_job.set_reloader_hook" do |app|
ActiveSupport.on_load(:active_job) do
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, prepend: true) do |_, inner|
app.reloader.wrap do
inner.call
end
end
end
end
end
end
......@@ -137,7 +137,6 @@ module ConnectionAdapters
eager_autoload do
autoload :AbstractAdapter
autoload :ConnectionManagement, "active_record/connection_adapters/abstract/connection_pool"
end
end
......
......@@ -951,24 +951,5 @@ def pool_from_any_process_for(owner)
owner_to_pool && owner_to_pool[owner.name]
end
end
class ConnectionManagement
def initialize(app)
@app = app
end
def call(env)
testing = env['rack.test']
status, headers, body = @app.call(env)
proxy = ::Rack::BodyProxy.new(body) do
ActiveRecord::Base.clear_active_connections! unless testing
end
[status, headers, proxy]
rescue Exception
ActiveRecord::Base.clear_active_connections! unless testing
raise
end
end
end
end
......@@ -27,7 +27,6 @@ module ConnectionAdapters # :nodoc:
autoload_at 'active_record/connection_adapters/abstract/connection_pool' do
autoload :ConnectionHandler
autoload :ConnectionManagement
end
autoload_under 'abstract' do
......
......@@ -23,34 +23,26 @@ def uncached(&block)
end
end
def initialize(app)
@app = app
end
def call(env)
connection = ActiveRecord::Base.connection
enabled = connection.query_cache_enabled
connection_id = ActiveRecord::Base.connection_id
connection.enable_query_cache!
response = @app.call(env)
response[2] = Rack::BodyProxy.new(response[2]) do
restore_query_cache_settings(connection_id, enabled)
def self.install_executor_hooks(executor = ActiveSupport::Executor)
executor.to_run do
connection = ActiveRecord::Base.connection
enabled = connection.query_cache_enabled
connection_id = ActiveRecord::Base.connection_id
connection.enable_query_cache!
@restore_query_cache_settings = lambda do
ActiveRecord::Base.connection_id = connection_id
ActiveRecord::Base.connection.clear_query_cache
ActiveRecord::Base.connection.disable_query_cache! unless enabled
end
end
response
rescue Exception => e
restore_query_cache_settings(connection_id, enabled)
raise e
end
private
executor.to_complete do
@restore_query_cache_settings.call if defined?(@restore_query_cache_settings)
def restore_query_cache_settings(connection_id, enabled)
ActiveRecord::Base.connection_id = connection_id
ActiveRecord::Base.connection.clear_query_cache
ActiveRecord::Base.connection.disable_query_cache! unless enabled
# FIXME: This should be skipped when env['rack.test']
ActiveRecord::Base.clear_active_connections!
end
end
end
end
......@@ -16,12 +16,6 @@ class Railtie < Rails::Railtie # :nodoc:
config.app_generators.orm :active_record, :migration => true,
:timestamps => true
config.app_middleware.insert_after ::ActionDispatch::Callbacks,
ActiveRecord::QueryCache
config.app_middleware.insert_after ::ActionDispatch::Callbacks,
ActiveRecord::ConnectionAdapters::ConnectionManagement
config.action_dispatch.rescue_responses.merge!(
'ActiveRecord::RecordNotFound' => :not_found,
'ActiveRecord::StaleObjectError' => :conflict,
......@@ -153,11 +147,9 @@ class Railtie < Rails::Railtie # :nodoc:
end
end
initializer "active_record.set_reloader_hooks" do |app|
hook = app.config.reload_classes_only_on_change ? :to_prepare : :to_cleanup
initializer "active_record.set_reloader_hooks" do
ActiveSupport.on_load(:active_record) do
ActionDispatch::Reloader.send(hook) do
ActiveSupport::Reloader.before_class_unload do
if ActiveRecord::Base.connected?
ActiveRecord::Base.clear_cache!
ActiveRecord::Base.clear_reloadable_connections!
......@@ -166,6 +158,12 @@ class Railtie < Rails::Railtie # :nodoc:
end
end
initializer "active_record.set_executor_hooks" do
ActiveSupport.on_load(:active_record) do
ActiveRecord::QueryCache.install_executor_hooks
end
end
initializer "active_record.add_watchable_files" do |app|
path = app.paths["db"].first
config.watchable_files.concat ["#{path}/schema.rb", "#{path}/structure.sql"]
......
......@@ -19,7 +19,7 @@ def call(env)
def setup
@env = {}
@app = App.new
@management = ConnectionManagement.new(@app)
@management = middleware(@app)
# make sure we have an active connection
assert ActiveRecord::Base.connection
......@@ -27,17 +27,12 @@ def setup
end
def test_app_delegation
manager = ConnectionManagement.new(@app)
manager = middleware(@app)
manager.call @env
assert_equal [@env], @app.calls
end
def test_connections_are_active_after_call
@management.call(@env)
assert ActiveRecord::Base.connection_handler.active_connections?
end
def test_body_responds_to_each
_, _, body = @management.call(@env)
bits = []
......@@ -52,45 +47,40 @@ def test_connections_are_cleared_after_body_close
end
def test_active_connections_are_not_cleared_on_body_close_during_test
@env['rack.test'] = true
_, _, body = @management.call(@env)
body.close
assert ActiveRecord::Base.connection_handler.active_connections?
executor.wrap do
_, _, body = @management.call(@env)
body.close
assert ActiveRecord::Base.connection_handler.active_connections?
end
end
def test_connections_closed_if_exception
app = Class.new(App) { def call(env); raise NotImplementedError; end }.new
explosive = ConnectionManagement.new(app)
explosive = middleware(app)
assert_raises(NotImplementedError) { explosive.call(@env) }
assert !ActiveRecord::Base.connection_handler.active_connections?
end
def test_connections_not_closed_if_exception_and_test
@env['rack.test'] = true
app = Class.new(App) { def call(env); raise; end }.new
explosive = ConnectionManagement.new(app)
assert_raises(RuntimeError) { explosive.call(@env) }
assert ActiveRecord::Base.connection_handler.active_connections?
end
def test_connections_closed_if_exception_and_explicitly_not_test
@env['rack.test'] = false
app = Class.new(App) { def call(env); raise NotImplementedError; end }.new
explosive = ConnectionManagement.new(app)
assert_raises(NotImplementedError) { explosive.call(@env) }
assert !ActiveRecord::Base.connection_handler.active_connections?
executor.wrap do
app = Class.new(App) { def call(env); raise; end }.new
explosive = middleware(app)
assert_raises(RuntimeError) { explosive.call(@env) }
assert ActiveRecord::Base.connection_handler.active_connections?
end
end
test "doesn't clear active connections when running in a test case" do
@env['rack.test'] = true
@management.call(@env)
assert ActiveRecord::Base.connection_handler.active_connections?
executor.wrap do
@management.call(@env)
assert ActiveRecord::Base.connection_handler.active_connections?
end
end
test "proxy is polite to its body and responds to it" do
body = Class.new(String) { def to_path; "/path"; end }.new
app = lambda { |_| [200, {}, body] }
response_body = ConnectionManagement.new(app).call(@env)[2]
response_body = middleware(app).call(@env)[2]
assert response_body.respond_to?(:to_path)
assert_equal "/path", response_body.to_path
end
......@@ -98,9 +88,23 @@ def test_connections_closed_if_exception_and_explicitly_not_test
test "doesn't mutate the original response" do
original_response = [200, {}, 'hi']
app = lambda { |_| original_response }
ConnectionManagement.new(app).call(@env)[2]
middleware(app).call(@env)[2]
assert_equal 'hi', original_response.last
end
private
def executor
@executor ||= Class.new(ActiveSupport::Executor).tap do |exe|
ActiveRecord::QueryCache.install_executor_hooks(exe)
end
end
def middleware(app)
lambda do |env|
a, b, c = executor.wrap { app.call(env) }
[a, b, Rack::BodyProxy.new(c) { }]
end
end
end
end
end
......@@ -16,7 +16,7 @@ class QueryCacheTest < ActiveRecord::TestCase
def test_exceptional_middleware_clears_and_disables_cache_on_error
assert !ActiveRecord::Base.connection.query_cache_enabled, 'cache off'
mw = ActiveRecord::QueryCache.new lambda { |env|
mw = middleware { |env|
Task.find 1
Task.find 1
assert_equal 1, ActiveRecord::Base.connection.query_cache.length
......@@ -31,7 +31,7 @@ def test_exceptional_middleware_clears_and_disables_cache_on_error
def test_exceptional_middleware_leaves_enabled_cache_alone
ActiveRecord::Base.connection.enable_query_cache!
mw = ActiveRecord::QueryCache.new lambda { |env|
mw = middleware { |env|
raise "lol borked"
}
assert_raises(RuntimeError) { mw.call({}) }
......@@ -42,7 +42,7 @@ def test_exceptional_middleware_leaves_enabled_cache_alone
def test_exceptional_middleware_assigns_original_connection_id_on_error
connection_id = ActiveRecord::Base.connection_id
mw = ActiveRecord::QueryCache.new lambda { |env|
mw = middleware { |env|
ActiveRecord::Base.connection_id = self.object_id
raise "lol borked"
}
......@@ -53,7 +53,7 @@ def test_exceptional_middleware_assigns_original_connection_id_on_error
def test_middleware_delegates
called = false
mw = ActiveRecord::QueryCache.new lambda { |env|
mw = middleware { |env|
called = true
[200, {}, nil]
}
......@@ -62,7 +62,7 @@ def test_middleware_delegates
end
def test_middleware_caches
mw = ActiveRecord::QueryCache.new lambda { |env|
mw = middleware { |env|
Task.find 1
Task.find 1
assert_equal 1, ActiveRecord::Base.connection.query_cache.length
......@@ -74,50 +74,13 @@ def test_middleware_caches
def test_cache_enabled_during_call
assert !ActiveRecord::Base.connection.query_cache_enabled, 'cache off'
mw = ActiveRecord::QueryCache.new lambda { |env|
mw = middleware { |env|
assert ActiveRecord::Base.connection.query_cache_enabled, 'cache on'
[200, {}, nil]
}
mw.call({})
end
def test_cache_on_during_body_write
streaming = Class.new do
def each
yield ActiveRecord::Base.connection.query_cache_enabled
end
end
mw = ActiveRecord::QueryCache.new lambda { |env|
[200, {}, streaming.new]
}
body = mw.call({}).last
body.each { |x| assert x, 'cache should be on' }
body.close
assert !ActiveRecord::Base.connection.query_cache_enabled, 'cache disabled'
end
def test_cache_off_after_close
mw = ActiveRecord::QueryCache.new lambda { |env| [200, {}, nil] }
body = mw.call({}).last
assert ActiveRecord::Base.connection.query_cache_enabled, 'cache enabled'
body.close
assert !ActiveRecord::Base.connection.query_cache_enabled, 'cache disabled'
end
def test_cache_clear_after_close
mw = ActiveRecord::QueryCache.new lambda { |env|
Post.first
[200, {}, nil]
}
body = mw.call({}).last
assert !ActiveRecord::Base.connection.query_cache.empty?, 'cache not empty'
body.close
assert ActiveRecord::Base.connection.query_cache.empty?, 'cache should be empty'
end
def test_cache_passing_a_relation
post = Post.first
Post.cache do
......@@ -244,6 +207,13 @@ def test_query_cache_doesnt_leak_cached_results_of_rolled_back_queries
assert_equal 0, Post.where(title: 'rollback').to_a.count
end
end
private
def middleware(&app)
executor = Class.new(ActiveSupport::Executor)
ActiveRecord::QueryCache.install_executor_hooks executor
lambda { |env| executor.wrap { app.call(env) } }
end
end
class QueryCacheExpiryTest < ActiveRecord::TestCase
......
* Publish ActiveSupport::Executor and ActiveSupport::Reloader APIs to allow
components and libraries to manage, and participate in, the execution of
application code, and the application reloading process.
*Matthew Draper*
## Rails 5.0.0.beta3 (February 24, 2016) ##
* Deprecate arguments on `assert_nothing_raised`.
......
......@@ -33,10 +33,13 @@ module ActiveSupport
autoload :Concern
autoload :Dependencies
autoload :DescendantsTracker
autoload :ExecutionWrapper
autoload :Executor
autoload :FileUpdateChecker
autoload :EventedFileUpdateChecker
autoload :LogSubscriber
autoload :Notifications
autoload :Reloader
eager_autoload do
autoload :BacktraceCleaner
......
......@@ -19,14 +19,12 @@ def unloading
end
end
# Attempt to obtain an "unloading" (exclusive) lock. If possible,
# execute the supplied block while holding the lock. If there is
# concurrent activity, return immediately (without executing the
# block) instead of waiting.
def attempt_unloading
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload], no_wait: true) do
yield
end
def start_unloading
@lock.start_exclusive(purpose: :unload, compatible: [:load, :unload])
end
def done_unloading
@lock.stop_exclusive(compatible: [:load, :unload])
end
def start_running
......
......@@ -37,6 +37,7 @@ def execute
def execute_if_updated
if updated?
yield if block_given?
execute
true
end
......
require 'active_support/callbacks'
module ActiveSupport
class ExecutionWrapper
include ActiveSupport::Callbacks
define_callbacks :run
define_callbacks :complete
def self.to_run(*args, &block)
set_callback(:run, *args, &block)
end
def self.to_complete(*args, &block)
set_callback(:complete, *args, &block)
end
# Run this execution.
#
# Returns an instance, whose +complete!+ method *must* be invoked
# after the work has been performed.
#
# Where possible, prefer +wrap+.
def self.run!
new.tap(&:run!)
end
# Perform the work in the supplied block as an execution.
def self.wrap
return yield if active?
state = run!
begin
yield
ensure
state.complete!
end
end
class << self # :nodoc:
attr_accessor :active
end
def self.inherited(other) # :nodoc:
super
other.active = Concurrent::Hash.new(0)
end
self.active = Concurrent::Hash.new(0)
def self.active? # :nodoc:
@active[Thread.current] > 0
end
def run! # :nodoc:
self.class.active[Thread.current] += 1
run_callbacks(:run)
end
# Complete this in-flight execution. This method *must* be called
# exactly once on the result of any call to +run!+.
#
# Where possible, prefer +wrap+.
def complete!
run_callbacks(:complete)
self.class.active.delete Thread.current if (self.class.active[Thread.current] -= 1) == 0
end
end
end
require 'active_support/execution_wrapper'
module ActiveSupport
class Executor < ExecutionWrapper
end
end
......@@ -81,6 +81,7 @@ def execute
# Execute the block given if updated.
def execute_if_updated
if updated?
yield if block_given?
execute
true
else
......
......@@ -64,8 +64,8 @@ def self.initialize_i18n(app)
end
app.reloaders << reloader
ActionDispatch::Reloader.to_prepare do
reloader.execute_if_updated
app.reloader.to_run do
reloader.execute_if_updated { require_unload_lock! }
# TODO: remove the following line as soon as the return value of
# callbacks is ignored, that is, returning `false` does not
# display a deprecation warning or halts the callback chain.
......
require 'active_support/execution_wrapper'
module ActiveSupport
#--
# This class defines several callbacks:
#
# to_prepare -- Run once at application startup, and also from
# +to_run+.
#
# to_run -- Run before a work run that is reloading. If
# +reload_classes_only_on_change+ is true (the default), the class
# unload will have already occurred.
#
# to_complete -- Run after a work run that has reloaded. If
# +reload_classes_only_on_change+ is false, the class unload will
# have occurred after the work run, but before this callback.
#
# before_class_unload -- Run immediately before the classes are
# unloaded.
#
# after_class_unload -- Run immediately after the classes are
# unloaded.
#
class Reloader < ExecutionWrapper
Null = Class.new(ExecutionWrapper) # :nodoc:
define_callbacks :prepare
define_callbacks :class_unload
def self.to_prepare(*args, &block)
set_callback(:prepare, *args, &block)
end
def self.before_class_unload(*args, &block)
set_callback(:class_unload, *args, &block)
end
def self.after_class_unload(*args, &block)
set_callback(:class_unload, :after, *args, &block)
end
to_run(:after) { self.class.prepare! }
# Initiate a manual reload
def self.reload!
executor.wrap do
new.tap(&:run!).complete!
end
prepare!
end
def self.run! # :nodoc:
if check!
super
else
Null.run!
end
end
# Run the supplied block as a work unit, reloading code as needed
def self.wrap
executor.wrap do
super
end
end
class << self
attr_accessor :executor
attr_accessor :check
end
self.executor = Executor
self.check = lambda { false }
def self.check! # :nodoc:
@should_reload ||= check.call
end
def self.reloaded! # :nodoc:
@should_reload = false
end
def self.prepare! # :nodoc:
new.run_callbacks(:prepare)
end
def initialize
super
@locked = false
end
# Acquire the ActiveSupport::Dependencies::Interlock unload lock,
# ensuring it will be released automatically
def require_unload_lock!
unless @locked
ActiveSupport::Dependencies.interlock.start_unloading
@locked = true
end
end
# Release the unload lock if it has been previously obtained
def release_unload_lock!
if @locked
@locked = false
ActiveSupport::Dependencies.interlock.done_unloading
end
end
def run! # :nodoc:
super
release_unload_lock!
end
def class_unload!(&block) # :nodoc:
require_unload_lock!
run_callbacks(:class_unload, &block)
end
def complete! # :nodoc:
super
self.class.reloaded!
ensure
release_unload_lock!
end
end
end
require 'abstract_unit'
class ExecutorTest < ActiveSupport::TestCase
def test_wrap_invokes_callbacks
called = []
executor.to_run { called << :run }
executor.to_complete { called << :complete }
executor.wrap do
called << :body
end
assert_equal [:run, :body, :complete], called
end
def test_callbacks_share_state
result = false
executor.to_run { @foo = true }
executor.to_complete { result = @foo }
executor.wrap { }
assert result
end
def test_separated_calls_invoke_callbacks
called = []
executor.to_run { called << :run }
executor.to_complete { called << :complete }
state = executor.run!
called << :body
state.complete!
assert_equal [:run, :body, :complete], called
end
def test_avoids_double_wrapping
called = []
executor.to_run { called << :run }
executor.to_complete { called << :complete }
executor.wrap do
called << :early
executor.wrap do
called << :body
end
called << :late
end
assert_equal [:run, :early, :body, :late, :complete], called
end
def test_separate_classes_can_wrap
other_executor = Class.new(ActiveSupport::Executor)
called = []
executor.to_run { called << :run }
executor.to_complete { called << :complete }
other_executor.to_run { called << :other_run }
other_executor.to_complete { called << :other_complete }
executor.wrap do
other_executor.wrap do
called << :body
end
end
assert_equal [:run, :other_run, :body, :other_complete, :complete], called
end
private
def executor
@executor ||= Class.new(ActiveSupport::Executor)
end
end
require 'abstract_unit'
class ReloaderTest < ActiveSupport::TestCase
def test_prepare_callback
prepared = false
reloader.to_prepare { prepared = true }
assert !prepared
reloader.prepare!
assert prepared
prepared = false
reloader.wrap do
assert prepared
prepared = false
end
assert !prepared
end
def test_only_run_when_check_passes
r = new_reloader { true }
invoked = false
r.to_run { invoked = true }
r.wrap { }
assert invoked
r = new_reloader { false }
invoked = false
r.to_run { invoked = true }
r.wrap { }
assert !invoked
end
def test_full_reload_sequence
called = []
reloader.to_prepare { called << :prepare }
reloader.to_run { called << :reloader_run }
reloader.to_complete { called << :reloader_complete }
reloader.executor.to_run { called << :executor_run }
reloader.executor.to_complete { called << :executor_complete }
reloader.wrap { }
assert_equal [:executor_run, :reloader_run, :prepare, :reloader_complete, :executor_complete], called
called = []
reloader.reload!
assert_equal [:executor_run, :reloader_run, :prepare, :reloader_complete, :executor_complete, :prepare], called
reloader.check = lambda { false }
called = []
reloader.wrap { }
assert_equal [:executor_run, :executor_complete], called
called = []
reloader.reload!
assert_equal [:executor_run, :reloader_run, :prepare, :reloader_complete, :executor_complete, :prepare], called
end
def test_class_unload_block
called = []
reloader.before_class_unload { called << :before_unload }
reloader.after_class_unload { called << :after_unload }
reloader.to_run do
class_unload! do
called << :unload
end
end
reloader.wrap { called << :body }
assert_equal [:before_unload, :unload, :after_unload, :body], called
end
private
def new_reloader(&check)
Class.new(ActiveSupport::Reloader).tap do |r|
r.check = check
r.executor = Class.new(ActiveSupport::Executor)
end
end
def reloader
@reloader ||= new_reloader { true }
end
end
......@@ -113,7 +113,7 @@ def find_root(from)
attr_accessor :assets, :sandbox
alias_method :sandbox?, :sandbox
attr_reader :reloaders
attr_reader :reloaders, :reloader, :executor
delegate :default_url_options, :default_url_options=, to: :routes
......@@ -131,6 +131,10 @@ def initialize(initial_variable_values = {}, &block)
@message_verifiers = {}
@ran_load_hooks = false
@executor = Class.new(ActiveSupport::Executor)
@reloader = Class.new(ActiveSupport::Reloader)
@reloader.executor = @executor
# are these actually used?
@initial_variable_values = initial_variable_values
@block = block
......
......@@ -34,22 +34,10 @@ def build_stack
# handling: presumably their code is not threadsafe
middleware.use ::Rack::Lock
elsif config.allow_concurrency == :unsafe
# Do nothing, even if we know this is dangerous. This is the
# historical behaviour for true.
else
# Default concurrency setting: enabled, but safe
unless config.cache_classes && config.eager_load
# Without cache_classes + eager_load, the load interlock
# is required for proper operation
middleware.use ::ActionDispatch::LoadInterlock
end
end
middleware.use ::ActionDispatch::Executor, app.executor
middleware.use ::Rack::Runtime
middleware.use ::Rack::MethodOverride unless config.api_only
middleware.use ::ActionDispatch::RequestId
......@@ -61,7 +49,7 @@ def build_stack
middleware.use ::ActionDispatch::RemoteIp, config.action_dispatch.ip_spoofing_check, config.action_dispatch.trusted_proxies
unless config.cache_classes
middleware.use ::ActionDispatch::Reloader, lambda { reload_dependencies? }
middleware.use ::ActionDispatch::Reloader, app.reloader
end
middleware.use ::ActionDispatch::Callbacks
......@@ -83,10 +71,6 @@ def build_stack
private
def reload_dependencies?
config.reload_classes_only_on_change != true || app.reloaders.map(&:updated?).any?
end
def load_rack_cache
rack_cache = config.action_dispatch.rack_cache
return unless rack_cache
......
......@@ -38,16 +38,16 @@ module Finisher
app.routes.define_mounted_helper(:main_app)
end
initializer :add_to_prepare_blocks do
initializer :add_to_prepare_blocks do |app|
config.to_prepare_blocks.each do |block|
ActionDispatch::Reloader.to_prepare(&block)
app.reloader.to_prepare(&block)
end
end
# This needs to happen before eager load so it happens
# in exactly the same point regardless of config.cache_classes
initializer :run_prepare_callbacks do
ActionDispatch::Reloader.prepare!
initializer :run_prepare_callbacks do |app|
app.reloader.prepare!
end
initializer :eager_load! do
......@@ -62,13 +62,47 @@ module Finisher
ActiveSupport.run_load_hooks(:after_initialize, self)
end
initializer :configure_executor_for_concurrency do |app|
if config.allow_concurrency == false
# User has explicitly opted out of concurrent request
# handling: presumably their code is not threadsafe
mutex = Mutex.new
app.executor.to_run(prepend: true) do
mutex.lock
end
app.executor.to_complete(:after) do
mutex.unlock
end
elsif config.allow_concurrency == :unsafe
# Do nothing, even if we know this is dangerous. This is the
# historical behaviour for true.
else
# Default concurrency setting: enabled, but safe
unless config.cache_classes && config.eager_load
# Without cache_classes + eager_load, the load interlock
# is required for proper operation
app.executor.to_run(prepend: true) do
ActiveSupport::Dependencies.interlock.start_running
end
app.executor.to_complete(:after) do
ActiveSupport::Dependencies.interlock.done_running
end
end
end
end
# Set routes reload after the finisher hook to ensure routes added in
# the hook are taken into account.
initializer :set_routes_reloader_hook do
initializer :set_routes_reloader_hook do |app|
reloader = routes_reloader
reloader.execute_if_updated
self.reloaders << reloader
ActionDispatch::Reloader.to_prepare do
app.reloader.to_run do
# We configure #execute rather than #execute_if_updated because if
# autoloaded constants are cleared we need to reload routes also in
# case any was used there, as in
......@@ -78,18 +112,27 @@ module Finisher
# This means routes are also reloaded if i18n is updated, which
# might not be necessary, but in order to be more precise we need
# some sort of reloaders dependency support, to be added.
require_unload_lock!
reloader.execute
end
end
# Set clearing dependencies after the finisher hook to ensure paths
# added in the hook are taken into account.
initializer :set_clear_dependencies_hook, group: :all do
initializer :set_clear_dependencies_hook, group: :all do |app|
callback = lambda do
ActiveSupport::Dependencies.interlock.unloading do
ActiveSupport::DescendantsTracker.clear
ActiveSupport::Dependencies.clear
ActiveSupport::DescendantsTracker.clear
ActiveSupport::Dependencies.clear
end
if config.cache_classes
app.reloader.check = lambda { false }
elsif config.reload_classes_only_on_change
app.reloader.check = lambda do
app.reloaders.map(&:updated?).any?
end
else
app.reloader.check = lambda { true }
end
if config.reload_classes_only_on_change
......@@ -99,15 +142,19 @@ module Finisher
# Prepend this callback to have autoloaded constants cleared before
# any other possible reloading, in case they need to autoload fresh
# constants.
ActionDispatch::Reloader.to_prepare(prepend: true) do
app.reloader.to_run(prepend: true) do
# In addition to changes detected by the file watcher, if routes
# or i18n have been updated we also need to clear constants,
# that's why we run #execute rather than #execute_if_updated, this
# callback has to clear autoloaded constants after any update.
reloader.execute
class_unload! do
reloader.execute
end
end
else
ActionDispatch::Reloader.to_cleanup(&callback)
app.reloader.to_complete do
class_unload!(&callback)
end
end
end
......
......@@ -29,8 +29,7 @@ def new_session
# reloads the environment
def reload!(print=true)
puts "Reloading..." if print
ActionDispatch::Reloader.cleanup!
ActionDispatch::Reloader.prepare!
Rails.application.reloader.reload!
true
end
end
......
# This file is used by Rack-based servers to start the application.
require ::File.expand_path('../config/environment', __FILE__)
<%- unless options[:skip_action_cable] -%>
# Action Cable requires that all classes are loaded in advance
Rails.application.eager_load!
<%- end -%>
run Rails.application
......@@ -52,12 +52,11 @@ def test_reload_should_fire_preparation_and_cleanup_callbacks
a = b = c = nil
# TODO: These should be defined on the initializer
ActionDispatch::Reloader.to_cleanup { a = b = c = 1 }
ActionDispatch::Reloader.to_cleanup { b = c = 2 }
ActionDispatch::Reloader.to_prepare { c = 3 }
ActiveSupport::Reloader.to_complete { a = b = c = 1 }
ActiveSupport::Reloader.to_complete { b = c = 2 }
ActiveSupport::Reloader.to_prepare { c = 3 }
# Hide Reloading... output
silence_stream(STDOUT) { irb_context.reload! }
irb_context.reload!(false)
assert_equal 1, a
assert_equal 2, b
......@@ -81,7 +80,7 @@ class User
MODEL
assert !User.new.respond_to?(:age)
silence_stream(STDOUT) { irb_context.reload! }
irb_context.reload!(false)
assert User.new.respond_to?(:age)
end
......
......@@ -26,7 +26,7 @@ def app
assert_equal [
"Rack::Sendfile",
"ActionDispatch::Static",
"ActionDispatch::LoadInterlock",
"ActionDispatch::Executor",
"ActiveSupport::Cache::Strategy::LocalCache",
"Rack::Runtime",
"Rack::MethodOverride",
......@@ -38,8 +38,6 @@ def app
"ActionDispatch::Reloader",
"ActionDispatch::Callbacks",
"ActiveRecord::Migration::CheckPending",
"ActiveRecord::ConnectionAdapters::ConnectionManagement",
"ActiveRecord::QueryCache",
"ActionDispatch::Cookies",
"ActionDispatch::Session::CookieStore",
"ActionDispatch::Flash",
......@@ -57,7 +55,7 @@ def app
assert_equal [
"Rack::Sendfile",
"ActionDispatch::Static",
"ActionDispatch::LoadInterlock",
"ActionDispatch::Executor",
"ActiveSupport::Cache::Strategy::LocalCache",
"Rack::Runtime",
"ActionDispatch::RequestId",
......@@ -67,8 +65,6 @@ def app
"ActionDispatch::RemoteIp",
"ActionDispatch::Reloader",
"ActionDispatch::Callbacks",
"ActiveRecord::ConnectionAdapters::ConnectionManagement",
"ActiveRecord::QueryCache",
"Rack::Head",
"Rack::ConditionalGet",
"Rack::ETag"
......@@ -114,23 +110,12 @@ def app
test "removing Active Record omits its middleware" do
use_frameworks []
boot!
assert !middleware.include?("ActiveRecord::ConnectionAdapters::ConnectionManagement")
assert !middleware.include?("ActiveRecord::QueryCache")
assert !middleware.include?("ActiveRecord::Migration::CheckPending")
end
test "includes interlock if cache_classes is set but eager_load is not" do
add_to_config "config.cache_classes = true"
boot!
assert_not_includes middleware, "Rack::Lock"
assert_includes middleware, "ActionDispatch::LoadInterlock"
end
test "includes interlock if cache_classes is off" do
add_to_config "config.cache_classes = false"
test "includes executor" do
boot!
assert_not_includes middleware, "Rack::Lock"
assert_includes middleware, "ActionDispatch::LoadInterlock"
assert_includes middleware, "ActionDispatch::Executor"
end
test "does not include lock if cache_classes is set and so is eager_load" do
......@@ -138,21 +123,18 @@ def app
add_to_config "config.eager_load = true"
boot!
assert_not_includes middleware, "Rack::Lock"
assert_not_includes middleware, "ActionDispatch::LoadInterlock"
end
test "does not include lock if allow_concurrency is set to :unsafe" do
add_to_config "config.allow_concurrency = :unsafe"
boot!
assert_not_includes middleware, "Rack::Lock"
assert_not_includes middleware, "ActionDispatch::LoadInterlock"
end
test "includes lock if allow_concurrency is disabled" do
add_to_config "config.allow_concurrency = false"
boot!
assert_includes middleware, "Rack::Lock"
assert_not_includes middleware, "ActionDispatch::LoadInterlock"
end
test "removes static asset server if public_file_server.enabled is disabled" do
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册