提交 fc1b32f8 编写于 作者: M Matthew Draper

Merge pull request #23976 from danielrhodes/enhancement/ac-ping-to-message-type

ActionCable: Add a "welcome" and "ping" message type
......@@ -73,8 +73,11 @@ class ActionCable.Connection
events:
message: (event) ->
{identifier, message, type} = JSON.parse(event.data)
switch type
when message_types.welcome
@consumer.connectionMonitor.connected()
when message_types.ping
@consumer.connectionMonitor.ping()
when message_types.confirmation
@consumer.subscriptions.notify(identifier, "connected")
when message_types.rejection
......
......@@ -7,10 +7,7 @@ class ActionCable.ConnectionMonitor
@staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings)
identifier: ActionCable.INTERNAL.identifiers.ping
constructor: (@consumer) ->
@consumer.subscriptions.add(this)
@start()
connected: ->
......@@ -22,11 +19,12 @@ class ActionCable.ConnectionMonitor
disconnected: ->
@disconnectedAt = now()
received: ->
ping: ->
@pingedAt = now()
reset: ->
@reconnectAttempts = 0
@consumer.connection.isOpen()
start: ->
@reset()
......
......@@ -58,7 +58,5 @@ class ActionCable.Subscriptions
sendCommand: (subscription, command) ->
{identifier} = subscription
if identifier is ActionCable.INTERNAL.identifiers.ping
@consumer.connection.isOpen()
else
@consumer.send({command, identifier})
@consumer.send({command, identifier})
......@@ -29,10 +29,9 @@ module ActionCable
extend ActiveSupport::Autoload
INTERNAL = {
identifiers: {
ping: '_ping'.freeze
},
message_types: {
welcome: 'welcome'.freeze,
ping: 'ping'.freeze,
confirmation: 'confirm_subscription'.freeze,
rejection: 'reject_subscription'.freeze
}
......
......@@ -115,7 +115,7 @@ def statistics
end
def beat
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i)
end
def on_open # :nodoc:
......@@ -155,7 +155,7 @@ def cookies
def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
confirm_connection_monitor_subscription
send_welcome_message
message_buffer.process!
server.add_connection(self)
......@@ -174,11 +174,11 @@ def handle_close
disconnect if respond_to?(:disconnect)
end
def confirm_connection_monitor_subscription
# Send confirmation message to the internal connection monitor channel.
def send_welcome_message
# Send welcome message to the internal connection monitor channel.
# This ensures the connection monitor state is reset after a successful
# websocket connection.
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], type: ActionCable::INTERNAL[:message_types][:confirmation])
transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome])
end
def allow_request_origin?
......
......@@ -75,7 +75,7 @@ def initialize(port)
@ws.on(:message) do |event|
hash = JSON.parse(event.data)
if hash['identifier'] == '_ping'
if hash['type'] == 'ping'
@pings += 1
else
@messages << hash
......@@ -146,6 +146,7 @@ def faye_client(port)
def test_single_client
with_puma_server do |port|
c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
......@@ -162,6 +163,7 @@ def test_interacting_clients
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
clients.map {|c| Concurrent::Future.execute {
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
......@@ -181,6 +183,7 @@ def test_many_clients
clients = 100.times.map { faye_client(port) }
clients.map {|c| Concurrent::Future.execute {
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
......@@ -194,12 +197,14 @@ def test_many_clients
def test_disappearing_client
with_puma_server do |port|
c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
c.close # disappear before write
c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
......@@ -214,6 +219,7 @@ def test_unsubscribe_client
identifier = JSON.dump(channel: 'EchoChannel')
c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message)
c.send_message command: 'subscribe', identifier: identifier
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
assert_equal(1, app.connections.count)
......@@ -232,6 +238,7 @@ def test_unsubscribe_client
def test_server_restart
with_puma_server do |port|
c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message)
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
......
......@@ -56,7 +56,7 @@ def send_async(method, *args)
run_in_eventmachine do
connection = open_connection
connection.websocket.expects(:transmit).with({ identifier: "_ping", type: "confirm_subscription" }.to_json)
connection.websocket.expects(:transmit).with({ type: "welcome" }.to_json)
connection.message_buffer.expects(:process!)
connection.process
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册