Revert "Eliminate the EventMachine dependency"

上级 92039d7c
......@@ -32,7 +32,8 @@ PATH
actioncable (5.0.0.beta1.1)
actionpack (= 5.0.0.beta1.1)
coffee-rails (~> 4.1.0)
nio4r (~> 1.2)
eventmachine (~> 1.0)
faye-websocket (~> 0.10.0)
websocket-driver (~> 0.6.1)
actionmailer (5.0.0.beta1.1)
actionpack (= 5.0.0.beta1.1)
......@@ -144,6 +145,9 @@ GEM
erubis (2.7.0)
eventmachine (1.0.9.1)
execjs (2.6.0)
faye-websocket (0.10.2)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
ffi (1.9.10)
ffi (1.9.10-x64-mingw32)
ffi (1.9.10-x86-mingw32)
......@@ -181,7 +185,6 @@ GEM
mysql2 (0.4.2)
mysql2 (0.4.2-x64-mingw32)
mysql2 (0.4.2-x86-mingw32)
nio4r (1.2.0)
nokogiri (1.6.7.1)
mini_portile2 (~> 2.0.0.rc2)
nokogiri (1.6.7.1-x64-mingw32)
......
......@@ -21,7 +21,8 @@
s.add_dependency 'actionpack', version
s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'nio4r', '~> 1.2'
s.add_dependency 'eventmachine', '~> 1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_development_dependency 'em-hiredis', '~> 0.3.0'
......
......@@ -27,14 +27,14 @@ def active_periodic_timers
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
def stop_periodic_timers
active_periodic_timers.each { |timer| timer.shutdown }
active_periodic_timers.each { |timer| timer.cancel }
end
end
end
......
......@@ -75,7 +75,7 @@ def stream_from(broadcasting, callback = nil)
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
Concurrent.global_io_executor.post do
EM.next_tick do
pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
......
......@@ -5,15 +5,12 @@ module Connection
eager_autoload do
autoload :Authorization
autoload :Base
autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
autoload :MessageBuffer
autoload :Stream
autoload :StreamEventLoop
autoload :WebSocket
autoload :Subscriptions
autoload :TaggedLoggerProxy
autoload :WebSocket
end
end
end
......@@ -49,14 +49,14 @@ class Base
include Authorization
attr_reader :server, :env, :subscriptions, :logger
delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
delegate :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@websocket = ActionCable::Connection::WebSocket.new(env)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
......@@ -70,6 +70,10 @@ def process
logger.info started_request_message
if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
respond_to_successful_request
else
respond_to_invalid_request
......@@ -117,22 +121,6 @@ def beat
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
def on_open # :nodoc:
send_async :handle_open
end
def on_message(message) # :nodoc:
message_buffer.append message
end
def on_error(message) # :nodoc:
# ignore
end
def on_close # :nodoc:
send_async :handle_close
end
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
......@@ -151,7 +139,7 @@ def cookies
attr_reader :message_buffer
private
def handle_open
def on_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
......@@ -162,7 +150,11 @@ def handle_open
respond_to_invalid_request
end
def handle_close
def on_message(message)
message_buffer.append message
end
def on_close
logger.info finished_request_message
server.remove_connection(self)
......
require 'websocket/driver'
module ActionCable
module Connection
#--
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
class ClientSocket # :nodoc:
def self.determine_url(env)
scheme = secure_request?(env) ? 'wss:' : 'ws:'
"#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
end
def self.secure_request?(env)
return true if env['HTTPS'] == 'on'
return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
return true if env['rack.url_scheme'] == 'https'
return false
end
CONNECTING = 0
OPEN = 1
CLOSING = 2
CLOSED = 3
attr_reader :env, :url
def initialize(env, event_target, stream_event_loop)
@env = env
@event_target = event_target
@stream_event_loop = stream_event_loop
@url = ClientSocket.determine_url(@env)
@driver = @driver_started = nil
@ready_state = CONNECTING
# The driver calls +env+, +url+, and +write+
@driver = ::WebSocket::Driver.rack(self)
@driver.on(:open) { |e| open }
@driver.on(:message) { |e| receive_message(e.data) }
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }
@stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
if callback = @env['async.callback']
callback.call([101, {}, @stream])
end
end
def start_driver
return if @driver.nil? || @driver_started
@driver_started = true
@driver.start
end
def rack_response
start_driver
[ -1, {}, [] ]
end
def write(data)
@stream.write(data)
end
def transmit(message)
return false if @ready_state > OPEN
case message
when Numeric then @driver.text(message.to_s)
when String then @driver.text(message)
when Array then @driver.binary(message)
else false
end
end
def close(code = nil, reason = nil)
code ||= 1000
reason ||= ''
unless code == 1000 or (code >= 3000 and code <= 4999)
raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
"The code must be either 1000, or between 3000 and 4999. " +
"#{code} is neither."
end
@ready_state = CLOSING unless @ready_state == CLOSED
@driver.close(reason, code)
end
def parse(data)
@driver.parse(data)
end
def client_gone
finalize_close
end
def alive?
@ready_state == OPEN
end
private
def open
return unless @ready_state == CONNECTING
@ready_state = OPEN
@event_target.on_open
end
def receive_message(data)
return unless @ready_state == OPEN
@event_target.on_message(data)
end
def emit_error(message)
return if @ready_state >= CLOSING
@event_target.on_error(message)
end
def begin_close(reason, code)
return if @ready_state == CLOSED
@ready_state = CLOSING
@close_params = [reason, code]
if @stream
@stream.shutdown
else
finalize_close
end
end
def finalize_close
return if @ready_state == CLOSED
@ready_state = CLOSED
reason = @close_params ? @close_params[0] : ''
code = @close_params ? @close_params[1] : 1006
@event_target.on_close(code, reason)
end
end
end
end
......@@ -15,14 +15,14 @@ def subscribe_to_internal_channel
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
EM.next_tick { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
@_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
@_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
end
end
......
module ActionCable
module Connection
#--
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
class Stream
def initialize(event_loop, socket)
@event_loop = event_loop
@socket_object = socket
@stream_send = socket.env['stream.send']
@rack_hijack_io = nil
hijack_rack_socket
end
def each(&callback)
@stream_send ||= callback
end
def close
shutdown
@socket_object.client_gone
end
def shutdown
clean_rack_hijack
end
def write(data)
return @rack_hijack_io.write(data) if @rack_hijack_io
return @stream_send.call(data) if @stream_send
rescue EOFError
@socket_object.client_gone
end
def receive(data)
@socket_object.parse(data)
end
private
def hijack_rack_socket
return unless @socket_object.env['rack.hijack']
@socket_object.env['rack.hijack'].call
@rack_hijack_io = @socket_object.env['rack.hijack_io']
@event_loop.attach(@rack_hijack_io, self)
end
def clean_rack_hijack
return unless @rack_hijack_io
@event_loop.detach(@rack_hijack_io, self)
@rack_hijack_io = nil
end
end
end
end
require 'nio'
module ActionCable
module Connection
class StreamEventLoop
def initialize
@nio = NIO::Selector.new
@map = {}
@stopping = false
@todo = Queue.new
Thread.new do
Thread.current.abort_on_exception = true
run
end
end
def attach(io, stream)
@todo << lambda do
@map[io] = stream
@nio.register(io, :r)
end
@nio.wakeup
end
def detach(io, stream)
@todo << lambda do
@nio.deregister(io)
@map.delete io
end
@nio.wakeup
end
def stop
@stopping = true
@nio.wakeup
end
def run
loop do
if @stopping
@nio.close
break
end
until @todo.empty?
@todo.pop(true).call
end
if monitors = @nio.select
monitors.each do |monitor|
io = monitor.io
stream = @map[io]
begin
stream.receive io.read_nonblock(4096)
rescue IO::WaitReadable
next
rescue EOFError
stream.close
end
end
end
end
end
end
end
end
require 'websocket/driver'
require 'faye/websocket'
module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
# Decorate the Faye::WebSocket with helpers we need.
class WebSocket
def initialize(env, event_target, stream_event_loop)
@websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
delegate :rack_response, :close, :on, to: :websocket
def initialize(env)
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
end
def possible?
......@@ -13,19 +15,11 @@ def possible?
end
def alive?
websocket && websocket.alive?
websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
end
def transmit(data)
websocket.transmit data
end
def close
websocket.close
end
def rack_response
websocket.rack_response
websocket.send data
end
protected
......
require 'action_cable/server'
require 'eventmachine'
EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n")
end
require 'eventmachine'
EventMachine.epoll if EventMachine.epoll?
EventMachine.kqueue if EventMachine.kqueue?
module ActionCable
module Server
extend ActiveSupport::Autoload
......
......@@ -32,10 +32,6 @@ def remote_connections
@remote_connections ||= RemoteConnections.new(self)
end
def stream_event_loop
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
......
......@@ -22,9 +22,11 @@ def remove_connection(connection)
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
@heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
Concurrent.global_io_executor.post { connections.map(&:beat) }
end.tap(&:execute)
EM.next_tick do
@heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
EM.next_tick { connections.map(&:beat) }
end
end
end
def open_connections_statistics
......
......@@ -10,11 +10,11 @@ def subscriber_map
class AsyncSubscriberMap < SubscriberMap
def add_subscriber(*)
Concurrent.global_io_executor.post { super }
::EM.next_tick { super }
end
def invoke_callback(*)
Concurrent.global_io_executor.post { super }
::EM.next_tick { super }
end
end
end
......
......@@ -63,7 +63,7 @@ def listen
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
Concurrent.global_io_executor << callback if callback
::EM.next_tick(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
......@@ -93,7 +93,7 @@ def remove_channel(channel)
end
def invoke_callback(*)
Concurrent.global_io_executor.post { super }
::EM.next_tick { super }
end
end
end
......
require 'thread'
gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
require 'em-hiredis'
require 'redis'
EventMachine.epoll if EventMachine.epoll?
EventMachine.kqueue if EventMachine.kqueue?
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
@@mutex = Mutex.new
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
......@@ -34,7 +27,6 @@ def shutdown
private
def redis_connection_for_subscriptions
ensure_reactor_running
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."
......@@ -45,14 +37,6 @@ def redis_connection_for_subscriptions
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
end
def ensure_reactor_running
return if EventMachine.reactor_running?
@@mutex.synchronize do
Thread.new { EventMachine.run } unless EventMachine.reactor_running?
Thread.pass until EventMachine.reactor_running?
end
end
end
end
end
......@@ -31,7 +31,7 @@ def ping
end
test "timer start and stop" do
Concurrent::TimerTask.expects(:new).times(2).returns(true)
EventMachine::PeriodicTimer.expects(:new).times(2).returns(true)
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
channel.expects(:stop_periodic_timers).once
......
......@@ -31,7 +31,9 @@ def send_confirmation
test "stream_for" do
run_in_eventmachine do
connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
EM.next_tick do
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
end
channel = ChatChannel.new connection, ""
channel.stream_for Room.new(1)
......@@ -39,35 +41,39 @@ def send_confirmation
end
test "stream_from subscription confirmation" do
run_in_eventmachine do
EM.run do
connection = TestConnection.new
ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission
wait_for_async
EM::Timer.new(0.1) do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
connection.transmit(expected)
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
connection.transmit(expected)
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
EM.run_deferred_callbacks
EM.stop
end
end
end
test "subscription confirmation should only be sent out once" do
run_in_eventmachine do
EM.run do
connection = TestConnection.new
channel = ChatChannel.new connection, "test_channel"
channel.send_confirmation
channel.send_confirmation
wait_for_async
EM.run_deferred_callbacks
expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
assert_equal 1, connection.transmissions.size
EM.stop
end
end
......
......@@ -37,8 +37,6 @@ def send_async(method, *args)
connection.process
assert connection.websocket.possible?
wait_for_async
assert connection.websocket.alive?
end
end
......@@ -55,15 +53,16 @@ def send_async(method, *args)
test "on connection open" do
run_in_eventmachine do
connection = open_connection
connection.process
connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
connection.message_buffer.expects(:process!)
connection.process
wait_for_async
assert_equal [ connection ], @server.connections
assert connection.connected
# Allow EM to run on_open callback
EM.next_tick do
assert_equal [ connection ], @server.connections
assert connection.connected
end
end
end
......@@ -73,12 +72,12 @@ def send_async(method, *args)
connection.process
# Setup the connection
Concurrent::TimerTask.stubs(:new).returns(true)
connection.send :handle_open
EventMachine.stubs(:add_periodic_timer).returns(true)
connection.send :on_open
assert connection.connected
connection.subscriptions.expects(:unsubscribe_from_all)
connection.send :handle_close
connection.send :on_close
assert ! connection.connected
assert_equal [], @server.connections
......
......@@ -68,10 +68,10 @@ def open_connection(server:)
@connection = Connection.new(server, env)
@connection.process
@connection.send :handle_open
@connection.send :on_open
end
def close_connection
@connection.send :handle_close
@connection.send :on_close
end
end
......@@ -32,10 +32,10 @@ def open_connection(server:)
@connection = Connection.new(server, env)
@connection.process
@connection.send :handle_open
@connection.send :on_open
end
def close_connection
@connection.send :handle_close
@connection.send :on_close
end
end
......@@ -14,7 +14,6 @@ def pubsub
@config.subscription_adapter.new(self)
end
def stream_event_loop
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
def send_async
end
end
require 'test_helper'
require 'concurrent'
require 'action_cable/process/logging'
require 'active_support/core_ext/hash/indifferent_access'
require 'pathname'
......@@ -23,6 +24,8 @@ def setup
# and now the "real" setup for our test:
spawn_eventmachine
server.config.cable = cable_config.with_indifferent_access
adapter_klass = server.config.pubsub_adapter
......
......@@ -13,16 +13,28 @@
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
require 'faye/websocket'
class << Faye::WebSocket
remove_method :ensure_reactor_running
# We don't want Faye to start the EM reactor in tests because it makes testing much harder.
# We want to be able to start and stop EM loop in tests to make things simpler.
def ensure_reactor_running
# no-op
end
end
class ActionCable::TestCase < ActiveSupport::TestCase
def wait_for_async
e = Concurrent.global_io_executor
until e.completed_task_count == e.scheduled_task_count
sleep 0.1
def run_in_eventmachine
EM.run do
yield
EM.run_deferred_callbacks
EM.stop
end
end
def run_in_eventmachine
yield
wait_for_async
def spawn_eventmachine
Thread.new { EventMachine.run } unless EventMachine.reactor_running?
end
end
......@@ -3,8 +3,9 @@
require ::File.expand_path('../config/environment', __FILE__)
<%- unless options[:skip_action_cable] -%>
# Action Cable requires that all classes are loaded in advance
# Action Cable uses EventMachine which requires that all classes are loaded in advance
Rails.application.eager_load!
require 'action_cable/process/logging'
<%- end -%>
run Rails.application
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册