Revert "Merge pull request #22977 from rails/revert-22934-master"

This reverts commit d0393fcc, reversing
changes made to 3b7ccadf.
上级 f5065ef6
......@@ -31,8 +31,8 @@ PATH
specs:
actioncable (5.0.0.beta1)
actionpack (= 5.0.0.beta1)
celluloid (~> 0.17.2)
coffee-rails (~> 4.1.0)
concurrent-ruby (~> 1.0.0)
em-hiredis (~> 0.3.0)
faye-websocket (~> 0.10.0)
redis (~> 3.0)
......
......@@ -427,7 +427,7 @@ messages back and forth over the WebSocket cable connection. This dependency may
be alleviated in the future, but for the moment that's what it is. So be sure to have
Redis installed and running.
The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [celluloid](https://github.com/celluloid/celluloid).
The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby).
## Deployment
......
......@@ -23,7 +23,6 @@
s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_dependency 'celluloid', '~> 0.17.2'
s.add_dependency 'em-hiredis', '~> 0.3.0'
s.add_dependency 'redis', '~> 3.0'
......
......@@ -28,7 +28,7 @@ def active_periodic_timers
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
connection.worker_pool.async.run_periodic_timer(self, callback)
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
......
......@@ -103,7 +103,7 @@ def close
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments)
worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
......
require 'action_cable/server'
require 'eventmachine'
require 'celluloid'
EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n")
end
Celluloid.logger = ActionCable.server.logger
# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10
require 'celluloid/current'
require 'em-hiredis'
module ActionCable
......
require 'celluloid'
require 'active_support/callbacks'
require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent'
module ActionCable
module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use.
class Worker
include ActiveSupport::Callbacks
include Celluloid
attr_reader :connection
thread_mattr_accessor :connection
define_callbacks :work
include ActiveRecordConnectionManagement
def initialize(max_size: 5)
@pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
end
end
def invoke(receiver, method, *args)
@connection = receiver
begin
self.connection = receiver
run_callbacks :work do
receiver.send method, *args
run_callbacks :work do
receiver.send method, *args
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
ensure
self.connection = nil
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
end
receiver.handle_exception if receiver.respond_to?(:handle_exception)
def async_run_periodic_timer(channel, callback)
@pool.post do
run_periodic_timer(channel, callback)
end
end
def run_periodic_timer(channel, callback)
@connection = channel.connection
begin
self.connection = channel.connection
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
ensure
self.connection = nil
end
end
private
def logger
ActionCable.server.logger
end
......
......@@ -10,7 +10,6 @@ def connect
end
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
......
......@@ -14,7 +14,6 @@ def disconnect
end
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
......
......@@ -6,7 +6,6 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
......
......@@ -10,7 +10,6 @@ def connect
end
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
......
......@@ -5,7 +5,6 @@ class Connection < ActionCable::Connection::Base
attr_reader :websocket
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
......
......@@ -14,11 +14,6 @@
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
$CELLULOID_DEBUG = false
$CELLULOID_TEST = false
require 'celluloid'
Celluloid.logger = Logger.new(StringIO.new)
require 'faye/websocket'
class << Faye::WebSocket
remove_method :ensure_reactor_running
......
......@@ -17,8 +17,6 @@ def connection
end
setup do
Celluloid.boot
@worker = ActionCable::Server::Worker.new
@receiver = Receiver.new
end
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册