提交 03a209e9 编写于 作者: P palkan

[Fix #25381] Avoid race condition on subscription confirmation

上级 254db498
......@@ -144,7 +144,9 @@ def initialize(connection, identifier, params = {})
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
@defer_subscription_confirmation = false
#
# We use atomic fixnum to track the number of waiting tasks to avoid race conditions
@defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
......@@ -153,6 +155,14 @@ def initialize(connection, identifier, params = {})
subscribe_to_channel
end
# This method is called after subscription has been added to the channel.
# Send confirmation here to avoid race conditions when client tries to perform actions
# right after receiving confirmation.
def registered!
@defer_subscription_confirmation_counter.decrement
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
# Extract the action name from the passed data and process it via the channel. The process will ensure
# that the action requested is a public method on the channel declared by the user (so not one of the callbacks
# like #subscribed).
......@@ -202,17 +212,21 @@ def transmit(data, via: nil)
end
def defer_subscription_confirmation!
@defer_subscription_confirmation = true
@defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation?
@defer_subscription_confirmation
@defer_subscription_confirmation_counter.value.positive?
end
def subscription_confirmation_sent?
@subscription_confirmation_sent
end
def registered?
@registered
end
def reject
@reject_subscription = true
end
......@@ -235,11 +249,7 @@ def subscribe_to_channel
subscribed
end
if subscription_rejected?
reject_subscription
else
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
reject_subscription if subscription_rejected?
end
def extract_action(data)
......
......@@ -84,7 +84,8 @@ def stream_from(broadcasting, callback = nil, coder: nil, &block)
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
transmit_subscription_confirmation
@defer_subscription_confirmation_counter.decrement
transmit_subscription_confirmation unless defer_subscription_confirmation?
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
......
......@@ -29,7 +29,9 @@ def add(data)
subscription_klass = id_options[:channel].safe_constantize
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
subscription = subscription_klass.new(connection, id_key, id_options)
subscriptions[id_key] ||= subscription
subscription.registered!
else
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end
......
......@@ -150,8 +150,13 @@ def rm_rf
assert_equal expected, @connection.last_transmission
end
test "subscription confirmation" do
test "do not send subscription confirmation on initialize" do
assert_nil @connection.last_transmission
end
test "subscription confirmation on registration" do
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
@channel.registered!
assert_equal expected, @connection.last_transmission
end
......@@ -208,6 +213,8 @@ def rm_rf
test "notification for transmit_subscription_confirmation" do
begin
@channel.registered!
events = []
ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args|
events << ActiveSupport::Notifications::Event.new(*args)
......
......@@ -84,7 +84,9 @@ class StreamTest < ActionCable::TestCase
run_in_eventmachine do
connection = TestConnection.new
ChatChannel.new connection, "{id: 1}", id: 1
channel = ChatChannel.new connection, "{id: 1}", id: 1
channel.registered!
assert_nil connection.last_transmission
wait_for_async
......@@ -101,6 +103,7 @@ class StreamTest < ActionCable::TestCase
connection = TestConnection.new
channel = ChatChannel.new connection, "test_channel"
channel.registered!
channel.send_confirmation
channel.send_confirmation
......@@ -114,7 +117,7 @@ class StreamTest < ActionCable::TestCase
end
end
require "action_cable/subscription_adapter/inline"
require "action_cable/subscription_adapter/async"
class UserCallbackChannel < ActionCable::Channel::Base
def subscribed
......@@ -124,9 +127,16 @@ def subscribed
end
end
class StreamEncodingTest < ActionCable::TestCase
class MultiChatChannel < ActionCable::Channel::Base
def subscribed
stream_from "main_room"
stream_from "test_all_rooms"
end
end
class StreamFromTest < ActionCable::TestCase
setup do
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
end
......@@ -153,6 +163,17 @@ class StreamEncodingTest < ActionCable::TestCase
end
end
test "subscription confirmation should only be sent out once with muptiple stream_from" do
run_in_eventmachine do
connection = open_connection
expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" }
connection.websocket.expects(:transmit).with(expected.to_json)
receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {})
wait_for_async
end
end
private
def subscribe_to(connection, identifiers:)
receive connection, command: "subscribe", identifiers: identifiers
......
......@@ -66,8 +66,12 @@ class ActionCable::TestCase < ActiveSupport::TestCase
end
def wait_for_executor(executor)
# do not wait forever, wait 2s
timeout = 2
until executor.completed_task_count == executor.scheduled_task_count
sleep 0.1
timeout -= 0.1
raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0
end
end
end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册