diff --git a/Gemfile.lock b/Gemfile.lock index a7b1daaef44d171d2c72128c9884c5c894e94e12..7d2844ae89f330d18f27b08d88ec3153351cb6d3 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -32,7 +32,8 @@ PATH actioncable (5.0.0.beta1.1) actionpack (= 5.0.0.beta1.1) coffee-rails (~> 4.1.0) - nio4r (~> 1.2) + eventmachine (~> 1.0) + faye-websocket (~> 0.10.0) websocket-driver (~> 0.6.1) actionmailer (5.0.0.beta1.1) actionpack (= 5.0.0.beta1.1) @@ -144,6 +145,9 @@ GEM erubis (2.7.0) eventmachine (1.0.9.1) execjs (2.6.0) + faye-websocket (0.10.2) + eventmachine (>= 0.12.0) + websocket-driver (>= 0.5.1) ffi (1.9.10) ffi (1.9.10-x64-mingw32) ffi (1.9.10-x86-mingw32) @@ -181,7 +185,6 @@ GEM mysql2 (0.4.2) mysql2 (0.4.2-x64-mingw32) mysql2 (0.4.2-x86-mingw32) - nio4r (1.2.0) nokogiri (1.6.7.1) mini_portile2 (~> 2.0.0.rc2) nokogiri (1.6.7.1-x64-mingw32) diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 14f968f1ef1bb87f4225c0fb2d73a0048dd85125..a36acc8f6f329ff88c831af37e0486ef036dc194 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,7 +21,8 @@ s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' - s.add_dependency 'nio4r', '~> 1.2' + s.add_dependency 'eventmachine', '~> 1.0' + s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_development_dependency 'em-hiredis', '~> 0.3.0' diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 56597d02d79d82316a31732a04e8c9b840527ef2..7f0fb37afcb5c8b0d6bc31e34395a65d45d8be93 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ def active_periodic_timers def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do + active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.each { |timer| timer.cancel } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index a26373e38727c2fb84edda8b21ad3d872ac0aaa7..e2876ef6fa188a83e64dab5638efe5d7751f756d 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ def stream_from(broadcasting, callback = nil) callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - Concurrent.global_io_executor.post do + EM.next_tick do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 902efb07e2baa6e73978f0e9bd551560633963c5..b672e006828e989a0e460a050c5ff5450ac5d8cf 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,15 +5,12 @@ module Connection eager_autoload do autoload :Authorization autoload :Base - autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :Stream - autoload :StreamEventLoop + autoload :WebSocket autoload :Subscriptions autoload :TaggedLoggerProxy - autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 0016d1a1a47d9bb373727a4b62b50c0c0d4feb91..bb8850aaa01700e2aeda4d219773a1c63d8a7d2d 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ class Base include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + delegate :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,6 +70,10 @@ def process logger.info started_request_message if websocket.possible? && allow_request_origin? + websocket.on(:open) { |event| send_async :on_open } + websocket.on(:message) { |event| on_message event.data } + websocket.on(:close) { |event| send_async :on_close } + respond_to_successful_request else respond_to_invalid_request @@ -117,22 +121,6 @@ def beat transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end - def on_open # :nodoc: - send_async :handle_open - end - - def on_message(message) # :nodoc: - message_buffer.append message - end - - def on_error(message) # :nodoc: - # ignore - end - - def on_close # :nodoc: - send_async :handle_close - end - protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -151,7 +139,7 @@ def cookies attr_reader :message_buffer private - def handle_open + def on_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -162,7 +150,11 @@ def handle_open respond_to_invalid_request end - def handle_close + def on_message(message) + message_buffer.append message + end + + def on_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb deleted file mode 100644 index 62dd753646b2eac6d995264a52c54ec37f022afc..0000000000000000000000000000000000000000 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ /dev/null @@ -1,152 +0,0 @@ -require 'websocket/driver' - -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class ClientSocket # :nodoc: - def self.determine_url(env) - scheme = secure_request?(env) ? 'wss:' : 'ws:' - "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" - end - - def self.secure_request?(env) - return true if env['HTTPS'] == 'on' - return true if env['HTTP_X_FORWARDED_SSL'] == 'on' - return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' - return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' - return true if env['rack.url_scheme'] == 'https' - - return false - end - - CONNECTING = 0 - OPEN = 1 - CLOSING = 2 - CLOSED = 3 - - attr_reader :env, :url - - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop - - @url = ClientSocket.determine_url(@env) - - @driver = @driver_started = nil - - @ready_state = CONNECTING - - # The driver calls +env+, +url+, and +write+ - @driver = ::WebSocket::Driver.rack(self) - - @driver.on(:open) { |e| open } - @driver.on(:message) { |e| receive_message(e.data) } - @driver.on(:close) { |e| begin_close(e.reason, e.code) } - @driver.on(:error) { |e| emit_error(e.message) } - - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) - - if callback = @env['async.callback'] - callback.call([101, {}, @stream]) - end - end - - def start_driver - return if @driver.nil? || @driver_started - @driver_started = true - @driver.start - end - - def rack_response - start_driver - [ -1, {}, [] ] - end - - def write(data) - @stream.write(data) - end - - def transmit(message) - return false if @ready_state > OPEN - case message - when Numeric then @driver.text(message.to_s) - when String then @driver.text(message) - when Array then @driver.binary(message) - else false - end - end - - def close(code = nil, reason = nil) - code ||= 1000 - reason ||= '' - - unless code == 1000 or (code >= 3000 and code <= 4999) - raise ArgumentError, "Failed to execute 'close' on WebSocket: " + - "The code must be either 1000, or between 3000 and 4999. " + - "#{code} is neither." - end - - @ready_state = CLOSING unless @ready_state == CLOSED - @driver.close(reason, code) - end - - def parse(data) - @driver.parse(data) - end - - def client_gone - finalize_close - end - - def alive? - @ready_state == OPEN - end - - private - def open - return unless @ready_state == CONNECTING - @ready_state = OPEN - - @event_target.on_open - end - - def receive_message(data) - return unless @ready_state == OPEN - - @event_target.on_message(data) - end - - def emit_error(message) - return if @ready_state >= CLOSING - - @event_target.on_error(message) - end - - def begin_close(reason, code) - return if @ready_state == CLOSED - @ready_state = CLOSING - @close_params = [reason, code] - - if @stream - @stream.shutdown - else - finalize_close - end - end - - def finalize_close - return if @ready_state == CLOSED - @ready_state = CLOSED - - reason = @close_params ? @close_params[0] : '' - code = @close_params ? @close_params[1] : 1006 - - @event_target.on_close(code, reason) - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b30299b03f517d63f463b57ecddb1e92..54ed7672d28cd8176b7cd077bd19ffcdfc98200c 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ def subscribe_to_internal_channel @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + EM.next_tick { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb deleted file mode 100644 index ace250cd16b27a6be89f70bee2f65df31c4eac8a..0000000000000000000000000000000000000000 --- a/actioncable/lib/action_cable/connection/stream.rb +++ /dev/null @@ -1,59 +0,0 @@ -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class Stream - def initialize(event_loop, socket) - @event_loop = event_loop - @socket_object = socket - @stream_send = socket.env['stream.send'] - - @rack_hijack_io = nil - - hijack_rack_socket - end - - def each(&callback) - @stream_send ||= callback - end - - def close - shutdown - @socket_object.client_gone - end - - def shutdown - clean_rack_hijack - end - - def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send - rescue EOFError - @socket_object.client_gone - end - - def receive(data) - @socket_object.parse(data) - end - - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] - - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] - - @event_loop.attach(@rack_hijack_io, self) - end - - def clean_rack_hijack - return unless @rack_hijack_io - @event_loop.detach(@rack_hijack_io, self) - @rack_hijack_io = nil - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb deleted file mode 100644 index f773814973409ef3dabde103decc3ab7cc263c16..0000000000000000000000000000000000000000 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ /dev/null @@ -1,68 +0,0 @@ -require 'nio' - -module ActionCable - module Connection - class StreamEventLoop - def initialize - @nio = NIO::Selector.new - @map = {} - @stopping = false - @todo = Queue.new - - Thread.new do - Thread.current.abort_on_exception = true - run - end - end - - def attach(io, stream) - @todo << lambda do - @map[io] = stream - @nio.register(io, :r) - end - @nio.wakeup - end - - def detach(io, stream) - @todo << lambda do - @nio.deregister(io) - @map.delete io - end - @nio.wakeup - end - - def stop - @stopping = true - @nio.wakeup - end - - def run - loop do - if @stopping - @nio.close - break - end - - until @todo.empty? - @todo.pop(true).call - end - - if monitors = @nio.select - monitors.each do |monitor| - io = monitor.io - stream = @map[io] - - begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next - rescue EOFError - stream.close - end - end - end - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b7267a35700020d4cac32eaaa1bacf051..670d5690aecf14d17af8115bbfb40e5f8d847011 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,11 +1,13 @@ -require 'websocket/driver' +require 'faye/websocket' module ActionCable module Connection - # Wrap the real socket to minimize the externally-presented API + # Decorate the Faye::WebSocket with helpers we need. class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + delegate :rack_response, :close, :on, to: :websocket + + def initialize(env) + @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil end def possible? @@ -13,19 +15,11 @@ def possible? end def alive? - websocket && websocket.alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end def transmit(data) - websocket.transmit data - end - - def close - websocket.close - end - - def rack_response - websocket.rack_response + websocket.send data end protected diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb new file mode 100644 index 0000000000000000000000000000000000000000..dce637b3ca861b12a35ef1b964d5aec2471ed5e0 --- /dev/null +++ b/actioncable/lib/action_cable/process/logging.rb @@ -0,0 +1,7 @@ +require 'action_cable/server' +require 'eventmachine' + +EM.error_handler do |e| + puts "Error raised inside the event loop: #{e.message}" + puts e.backtrace.join("\n") +end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index bd6a3826a3e581338c9f2234936fbada93fc5b77..a2a89d5f1e47dc69f3e3d26ced7a1f11393aaba4 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,3 +1,7 @@ +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index b00abd208cc891adefc85fc36cb35ee1e0fc11b9..3385a4c9f3b5eb19c565c72df21f8b68126300e0 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,10 +32,6 @@ def remote_connections @remote_connections ||= RemoteConnections.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new - end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 8671dd5ebdb25981c4879310ff62050adea55807..47dcea8c209332df4d5dd6c4b378571e974efdb2 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,9 +22,11 @@ def remove_connection(connection) # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map(&:beat) } + end + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a3df9e61df4811770d713d50f5c7739..85d4892e4cfd0fbcdbb750fe54661053c09e3741 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ def subscriber_map class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed685e689d49fa4c8898447d82f406c2cc..78f8aeb599823a379ac7a913012ffb5fb5c4c293 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ def listen case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + ::EM.next_tick(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ def remove_channel(channel) end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988da7bde6f64bb5f94205a1abe6f88bea..3b86354621d585e08b7c997cfaae762216501f99 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,18 +1,11 @@ -require 'thread' - gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -34,7 +27,6 @@ def shutdown private def redis_connection_for_subscriptions - ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -45,14 +37,6 @@ def redis_connection_for_subscriptions def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end - - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end end end end diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 64f0247cd6981cc87171289244d44d597987fa50..1590a12f09f0bb6900a0067a23903e9fd51e05c5 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ def ping end test "timer start and stop" do - Concurrent::TimerTask.expects(:new).times(2).returns(true) + EventMachine::PeriodicTimer.expects(:new).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 947efd96d40659e622fbc8eb4be6f5dc550ccde4..3fa2b291b718e0d04cc69cb82d2888b25901adfd 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -31,7 +31,9 @@ def send_confirmation test "stream_for" do run_in_eventmachine do connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + EM.next_tick do + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + end channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) @@ -39,35 +41,39 @@ def send_confirmation end test "stream_from subscription confirmation" do - run_in_eventmachine do + EM.run do connection = TestConnection.new ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission - wait_for_async + EM::Timer.new(0.1) do + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + EM.run_deferred_callbacks + EM.stop + end end end test "subscription confirmation should only be sent out once" do - run_in_eventmachine do + EM.run do connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" channel.send_confirmation channel.send_confirmation - wait_for_async + EM.run_deferred_callbacks expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" assert_equal 1, connection.transmissions.size + EM.stop end end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index e2b017a9a19ed72eb90f19731e14e5614eb8eb67..182562db82921ed3b8ae10bd01adefde2a62a9ff 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -37,8 +37,6 @@ def send_async(method, *args) connection.process assert connection.websocket.possible? - - wait_for_async assert connection.websocket.alive? end end @@ -55,15 +53,16 @@ def send_async(method, *args) test "on connection open" do run_in_eventmachine do connection = open_connection + connection.process connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) - connection.process - wait_for_async - - assert_equal [ connection ], @server.connections - assert connection.connected + # Allow EM to run on_open callback + EM.next_tick do + assert_equal [ connection ], @server.connections + assert connection.connected + end end end @@ -73,12 +72,12 @@ def send_async(method, *args) connection.process # Setup the connection - Concurrent::TimerTask.stubs(:new).returns(true) - connection.send :handle_open + EventMachine.stubs(:add_periodic_timer).returns(true) + connection.send :on_open assert connection.connected connection.subscriptions.expects(:unsubscribe_from_all) - connection.send :handle_close + connection.send :on_close assert ! connection.connected assert_equal [], @server.connections diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index 1019ad541e83317d41da491fb84998d4e524c2ee..a110dfdee0dd479909a8914eb10527c35d7b16c7 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -68,10 +68,10 @@ def open_connection(server:) @connection = Connection.new(server, env) @connection.process - @connection.send :handle_open + @connection.send :on_open end def close_connection - @connection.send :handle_close + @connection.send :on_close end end diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index e9bb4e6d7f10b4ec850df0baede09153bf937a57..55a9f96cb316a555d3c54d59416e4ef076143076 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -32,10 +32,10 @@ def open_connection(server:) @connection = Connection.new(server, env) @connection.process - @connection.send :handle_open + @connection.send :on_open end def close_connection - @connection.send :handle_close + @connection.send :on_close end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 56d132b30a124708e3cb286cad7d23358edccdb3..6e6541a952668deb169b148fd6b53aef87aa2444 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -14,7 +14,6 @@ def pubsub @config.subscription_adapter.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + def send_async end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 361858784e60f4914ac5d2a3ec7791256d7f78e3..d4a13be889b0537d1b147b59d40f17b2fa25a730 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,6 +1,7 @@ require 'test_helper' require 'concurrent' +require 'action_cable/process/logging' require 'active_support/core_ext/hash/indifferent_access' require 'pathname' @@ -23,6 +24,8 @@ def setup # and now the "real" setup for our test: + spawn_eventmachine + server.config.cable = cable_config.with_indifferent_access adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 8ddbd4e76447179281f93ed573e495626bf2d299..6636ce078b498d8479507e45fe1bcf8ea0b744f7 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,16 +13,28 @@ # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } +require 'faye/websocket' +class << Faye::WebSocket + remove_method :ensure_reactor_running + + # We don't want Faye to start the EM reactor in tests because it makes testing much harder. + # We want to be able to start and stop EM loop in tests to make things simpler. + def ensure_reactor_running + # no-op + end +end + class ActionCable::TestCase < ActiveSupport::TestCase - def wait_for_async - e = Concurrent.global_io_executor - until e.completed_task_count == e.scheduled_task_count - sleep 0.1 + def run_in_eventmachine + EM.run do + yield + + EM.run_deferred_callbacks + EM.stop end end - def run_in_eventmachine - yield - wait_for_async + def spawn_eventmachine + Thread.new { EventMachine.run } unless EventMachine.reactor_running? end end diff --git a/railties/lib/rails/generators/rails/app/templates/config.ru.tt b/railties/lib/rails/generators/rails/app/templates/config.ru.tt index 343c0833d7f676bd2a75e0e1527a01dbdd95b117..70556fcc99407aa9fe85bffbb0cbcbdbdd92c5ef 100644 --- a/railties/lib/rails/generators/rails/app/templates/config.ru.tt +++ b/railties/lib/rails/generators/rails/app/templates/config.ru.tt @@ -3,8 +3,9 @@ require ::File.expand_path('../config/environment', __FILE__) <%- unless options[:skip_action_cable] -%> -# Action Cable requires that all classes are loaded in advance +# Action Cable uses EventMachine which requires that all classes are loaded in advance Rails.application.eager_load! +require 'action_cable/process/logging' <%- end -%> run Rails.application