streams.rb 7.2 KB
Newer Older
1 2
module ActionCable
  module Channel
3
    # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
4
    # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
5
    # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
6 7 8 9 10 11 12 13 14
    #
    # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
    # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
    # comments on a given page:
    #
    #   class CommentsChannel < ApplicationCable::Channel
    #     def follow(data)
    #       stream_from "comments_for_#{data['recording_id']}"
    #     end
15
    #
16 17 18 19 20
    #     def unfollow
    #       stop_all_streams
    #     end
    #   end
    #
21 22 23 24
    # Based on the above example, the subscribers of this channel will get whatever data is put into the,
    # let's say, `comments_for_45` broadcasting as soon as it's put there.
    #
    # An example broadcasting for this channel looks like so:
25 26 27
    #
    #   ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
    #
28
    # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel.
29
    # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE`
30 31 32 33 34 35 36 37 38 39
    #
    #   class CommentsChannel < ApplicationCable::Channel
    #     def subscribed
    #       post = Post.find(params[:id])
    #       stream_for post
    #     end
    #   end
    #
    # You can then broadcast to this channel using:
    #
40
    #   CommentsChannel.broadcast_to(@post, @comment)
41
    #
42 43
    # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out.
    # The below example shows how you can use this to provide performance introspection in the process:
44 45
    #
    #   class ChatChannel < ApplicationCable::Channel
46 47
    #     def subscribed
    #       @room = Chat::Room[params[:room_number]]
48
    #
J
Jeremy Daer 已提交
49
    #       stream_for @room, coder: ActiveSupport::JSON do |message|
50 51
    #         if message['originated_at'].present?
    #           elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
52
    #
53 54 55
    #           ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
    #           logger.info "Message took #{elapsed_time}s to arrive"
    #         end
56
    #
57 58 59 60
    #         transmit message
    #       end
    #     end
    #   end
61 62
    #
    # You can stop streaming from all broadcasts by calling #stop_all_streams.
63 64 65 66 67 68 69
    module Streams
      extend ActiveSupport::Concern

      included do
        on_unsubscribe :stop_all_streams
      end

70 71
      # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
      # instead of the default of just transmitting the updates straight to the subscriber.
J
Jeremy Daer 已提交
72 73 74
      # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
      # Defaults to `coder: nil` which does no decoding, passes raw messages.
      def stream_from(broadcasting, callback = nil, coder: nil, &block)
75
        broadcasting = String(broadcasting)
76

77
        # Don't send the confirmation until pubsub#subscribe is successful
78
        defer_subscription_confirmation!
79

80 81 82
        # Build a stream handler by wrapping the user-provided callback with
        # a decoder or defaulting to a JSON-decoding retransmitter.
        handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
J
Jeremy Daer 已提交
83
        streams << [ broadcasting, handler ]
84

85
        connection.server.event_loop.post do
J
Jeremy Daer 已提交
86
          pubsub.subscribe(broadcasting, handler, lambda do
87 88
            transmit_subscription_confirmation
            logger.info "#{self.class.name} is streaming from #{broadcasting}"
89
          end)
90
        end
91 92
      end

93 94 95
      # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
      # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
      # to the subscriber.
J
Jeremy Daer 已提交
96 97 98 99 100
      #
      # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
      # Defaults to `coder: nil` which does no decoding, passes raw messages.
      def stream_for(model, callback = nil, coder: nil, &block)
        stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
101 102
      end

103
      # Unsubscribes all streams associated with this channel from the pubsub queue.
104 105
      def stop_all_streams
        streams.each do |broadcasting, callback|
J
Jon Moss 已提交
106
          pubsub.unsubscribe broadcasting, callback
107
          logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
108
        end.clear
109 110 111
      end

      private
J
Jon Moss 已提交
112
        delegate :pubsub, to: :connection
113 114 115 116 117

        def streams
          @_streams ||= []
        end

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
        # Always wrap the outermost handler to invoke the user handler on the
        # worker pool rather than blocking the event loop.
        def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
          handler = stream_handler(broadcasting, user_handler, coder: coder)

          -> message do
            connection.worker_pool.async_invoke handler, :call, message, connection: connection
          end
        end

        # May be overridden to add instrumentation, logging, specialized error
        # handling, or other forms of handler decoration.
        #
        # TODO: Tests demonstrating this.
        def stream_handler(broadcasting, user_handler, coder: nil)
          if user_handler
            stream_decoder user_handler, coder: coder
          else
            default_stream_handler broadcasting, coder: coder
          end
        end

        # May be overridden to change the default stream handling behavior
        # which decodes JSON and transmits to client.
        #
        # TODO: Tests demonstrating this.
        #
        # TODO: Room for optimization. Update transmit API to be coder-aware
        # so we can no-op when pubsub and connection are both JSON-encoded.
        # Then we can skip decode+encode if we're just proxying messages.
J
Jeremy Daer 已提交
148 149
        def default_stream_handler(broadcasting, coder:)
          coder ||= ActiveSupport::JSON
150 151 152 153 154 155 156 157 158 159 160 161 162
          stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
        end

        def stream_decoder(handler = identity_handler, coder:)
          if coder
            -> message { handler.(coder.decode(message)) }
          else
            handler
          end
        end

        def stream_transmitter(handler = identity_handler, broadcasting:)
          via = "streamed from #{broadcasting}"
J
Jeremy Daer 已提交
163

164
          -> (message) do
165
            transmit handler.(message), via: via
166 167
          end
        end
168 169 170 171

        def identity_handler
          -> message { message }
        end
172 173 174
    end
  end
end