1. 27 9月, 2013 4 次提交
    • A
      Improve handling of missed heartbeats · 6679feb7
      Andy Wilkinson 提交于
      Previously, when a broker heartbeat was mnissed, the STOMP connection
      would be left in a semi-disconnected state such that, for example, the
      read and write idle callbacks would still be active, even though
      the underlying TCP connection had been nulled out.
      
      As part of disconnecting the STOMP connection, this commit closes the
      underlying TCP connection when a heartbeat's missed which cancels the
      read and write idle callbacks. It also now copes with the underlying
      TCP connection being null when sending a heartbeat to the broker. This
      protects again a race condition between the write idle callback being
      fired, such that a heartbeat needs to be sent, and the connection
      being nulled out due to it being closed.
      6679feb7
    • A
      Add heart-beat support to STOMP broker relay · 496d8321
      Andy Wilkinson 提交于
      Previously, the STOMP broker relay did not support heart-beats. It sent
      0,0 in the heart-beats header for its own CONNECTED message, and set the
      heart-beats header to 0,0 when it was forwarding a CONNECTED from from a
      client to the broker.
      
      The broker relay now supports heart-beats for the system relay session.
      It will send heart-beats at the send interval that's been negotiated
      with the broker and will also expect to receive heart-beats at the
      receive interval that's been negotiated with the broker. The receive
      interval is multiplied by a factor of three to satisfy the STOMP spec's
      suggestion of lenience and ActiveMQ 5.8.0's heart-beat behaviour (see
      AMQ-4710).
      
      The broker relay also supports heart-beats between clients and the
      broker. For any given client's relay session, any heart-beats received
      from the client are forwarded on to the broker and any heart-beats
      received from the broker are sent back to the client.
      
      Internally, a heart-beat is represented as a Message with a byte array
      payload containing the single byte of new line ('\n') character and
      'empty' headers. SubscriptionMethodReturnValueHandler has been updated
      to default the message type to SimpMessageType.MESSAGE. This eases
      the distinction between a heartbeat and a message that's been created
      from a return value from application code.
      496d8321
    • A
      Remove CONNECT-related message buffer from STOMP relay · 8d2a376b
      Andy Wilkinson 提交于
      Before this change, the StompProtocolHandler always responded to
      clients with a CONNECTED frame, while the STOMP broker relay
      independantly forwarded the client CONNECT to the broker and waited
      for the CONNECTED frame back. That meant the relay had to buffer
      client messages until it received the CONNECTED response from
      the message broker.
      
      This change ensures that clients wait for a CONNECTED frame from
      the message broker. The broker relay forwards the CONNECT frame to
      the broker. The broker responds with a CONNECTED frame, which the
      relay then forwards to the client. As a result, a (well-written)
      client will not send any messages to the relay until the connection
      to the broker is fully established.
      
      The StompProtcolHandler can now be configured whether to send CONNECTED
      frame back. By default that is off. So when using the simple broker,
      the StompProtocolHandler can still respond with CONNECTED frames.
      
      The relay's handling of a connection being dropped has also been
      improved. When a connection for a client relay session is dropped
      an ERROR frame will be sent back to the client. If a connection is
      closed as part of a DISCONNECT frame being sent, no ERROR frame
      is sent back to the client. When the connection for the system relay
      session is dropped, an event is published indicating that the broker
      is unavailable. Reactor's TcpClient will then attempt to re-restablish
      the connection.
      8d2a376b
    • A
      Add StompCodec · a489c2cf
      Andy Wilkinson 提交于
      Previously, the broker relay's TCP client used Reactor's built in
      delimited codec as part of its parsing of STOMP frames. \0 was used as
      the delimiter. This worked for most STOMP frames but, crucially,
      not for frames with a body that contained \0: when such a frame was
      received it would be truncated.
      
      This commit adds a custom codec that parses STOMP frames more
      intelligently. It honours the content-length header allowing it to
      correctly parse frames with a body that contains \0. The codec largely
      delegates to two new classes: StompEncoder and StompDecoder. For
      consistency, code that previously used StompMessageConverter has been
      reworked to use these new encoder and decoder classes.
      
      Issue: SPR-10818
      a489c2cf
  2. 28 8月, 2013 1 次提交
  3. 20 8月, 2013 6 次提交
    • R
      Polish StompBrokerRelayMessageHandler · 68c0df83
      Rossen Stoyanchev 提交于
      68c0df83
    • R
      94fefec0
    • A
      Improve broker availability events · 3fb5ff26
      Andy Wilkinson 提交于
      Use a single class for the broker availability event with a boolean
      that indicates whether or not the broker is available, rather than one
      event for an available broker and one event for an unavailable broker.
      
      Publish broker availability events in SimpleBrokerMessageHandler so that
      it can be used as a drop-in replacement for
      StompBrokerRelayMessageHandler.
      3fb5ff26
    • A
      Add reconnect logic to the relay's system session · 131b5de6
      Andy Wilkinson 提交于
      Upgrade to Reactor snapshot builds to take advantage of TcpClient's
      reconnect support that was added post-M1. Now, the system relay session
      will try every 5 seconds to establish a connection with the broker, both
      when first connecting and in the event of subsequently becoming
      disconnected.
      
      A more sophisticated reconnection policy, including back off and
      failover to different brokers, is possible with the Reactor API. We may
      want to enhance the relay's reconnection policy in the future.
      
      Typically, a broken connection is identified by the failure to forward
      a message to the broker. As things stand, the message id then discarded.
      Any further messages that are forwarded before the connection's been
      re-established are queued for forwarding once the CONNECTED frame's been
      received. We may want to consider also queueing the message that failed
      to send, however we would then need to consider the possibility of the
      message itself being what caused the broker to close the connection
      and resending it would simply cause the connection to be closed again.
      131b5de6
    • A
      Publish events about broker's availability · 8b48d8f4
      Andy Wilkinson 提交于
      Components that are using a StompBrokerRelayMessageHandler may want
      to know whether or not the broker's unavailable. If they're sending
      messages to the relay via an asynchronous channel there's currently
      no way for them to find this out.
      
      This commit enhances StompBrokerRelayMessageHandler to publish
      application events when the broker's availability changes:
      BrokerBecameAvailableEvent and BrokerBecameUnavailableEvent.
      Irrespective of the number of relay sessions only a single event is
      published for each change in the broker's availability.
      8b48d8f4
    • A
      Integration tests for the broker relay · be6dbe54
      Andy Wilkinson 提交于
      be6dbe54
  4. 02 8月, 2013 1 次提交
    • A
      Introduce SubProtocolHandler abstraction · 9e20a256
      Andy Wilkinson 提交于
      Add SubProtocolHandler to encapsulate the logic for using a
      sub-protocol.
      
      A SubProtocolWebSocketHandler is also provided to
      delegate to the appropriate SubProtocolHandler based on the
      negotiated sub-protocol value at handshake.
      
      StompSubProtocolHandler provides handling for STOMP messages.
      
      Issue: SPR-10786
      9e20a256
  5. 22 7月, 2013 1 次提交
  6. 18 7月, 2013 3 次提交
  7. 16 7月, 2013 1 次提交
  8. 14 7月, 2013 1 次提交
    • R
      Refactor and polish spring-messaging · 2a48ad88
      Rossen Stoyanchev 提交于
      Remove base class for STOMP-related message handler classes
      (AbstractSimpMessageHandler), polish subclasses and fix issues with
      more significant updates to STOMP broker relay.
      
      Introduce base class for SubscribableChannel implementations providing
      consistent logging for all channel implementations.
      2a48ad88
  9. 13 7月, 2013 2 次提交
  10. 11 7月, 2013 1 次提交
  11. 08 7月, 2013 1 次提交
    • R
      Refactor SubscriptionRegistry · 3a2f5e71
      Rossen Stoyanchev 提交于
      The SubscriptionRegistry and implementations are now in a package
      together with SimpleBrokerWebMessageHandler and primarily support
      with matching subscriptions to messages. Subscriptions can contain
      patterns as supported by AntPathMatcher.
      
      StopmWebSocketHandler no longer keeps track of subscriptions and simply
      ignores messages without a subscription id, since it has no way of
      knowing broker-specific destination semantics for patterns.
      3a2f5e71
  12. 30 6月, 2013 1 次提交
  13. 28 6月, 2013 2 次提交
  14. 26 6月, 2013 2 次提交
    • R
      Remove PubSubChannelRegistry · ee9c46ad
      Rossen Stoyanchev 提交于
      ee9c46ad
    • R
      Introduce MessageHeader accessor types · 486b4101
      Rossen Stoyanchev 提交于
      A new type MessageHeaderAccesssor provides read/write access to
      MessageHeaders along with typed getter/setter methods along the lines
      of the existing MessageBuilder methods (internally MessageBuilder
      merely delegates to MessageHeaderAccessor). This class is extensible
      with sub-classes expected to provide typed getter/setter methods for
      specific categories of message headers.
      
      NativeMessageHeaderAccessor is one specific sub-class that further
      provides read/write access to headers from some external message
      source (e.g. STOMP headers). Native headers are stored in a separate
      MultiValueMap and kept under a specific key.
      486b4101
  15. 24 6月, 2013 1 次提交
  16. 20 6月, 2013 2 次提交
    • R
      Add @MessageExceptionHandler · 55a212d4
      Rossen Stoyanchev 提交于
      Similar to @ExceptionHandler but for message processing. Such a method
      can send messages to both the message broker channel and the client
      channel provided the client is subscribed to the target destination.
      55a212d4
    • R
      Add support for "system" STOMP session · 01c4e458
      Rossen Stoyanchev 提交于
      The "system" STOMP session is established at startup and can be used
      to send messages without a client session, e.g. to support broadcasting
      from a REST/HTTP handler method.
      01c4e458
  17. 19 6月, 2013 4 次提交
    • R
      Refactor PubSubHeaders, StompHeaders, MessageBuilder · 5cfc59d7
      Rossen Stoyanchev 提交于
      Rename to PubSubHeaderAccessor and StompHeaderAccessor
      Move the renamed classes to support packages
      
      Remove fromPayloadAndHeaders from MessageBuilder, just use
      withPayload(..).copyHeaders(..) instead.
      5cfc59d7
    • R
      811bb1b0
    • A
      Fix race when flushing messages · 28174744
      Andy Wilkinson 提交于
      The use of an AtomicBoolean and no lock meant that it was possible
      for a message to be queued and then never be flushed and sent to the
      broker:
      
      1. On t1, a message is received and isConnected is false. The message
         will be queued.
      2. On t2, CONNECTED is received from the broker. isConnected is set
         to true, the queue is drained and the queued messages are forwarded
      3. On t1, the message is added to the queue
      
      To fix this, checking that isConnected is false (step 1 above) and the
      queueing of a message (step 3 above) need to be performed as a unit
      so that the flushing of the queued messages can't be interleaved. This
      is achieved by synchronizing on a monitor and performing steps 1
      and 3 and synchronizing on the same monitor while performing step 2.
      
      The monitor is held while the messages are actually being forwarded
      to the broker. An alternative would be to drain the queue into
      a local variable, release the monitor, and then forward the messages.
      The main advantage of this alternative is that the monitor is held for
      less time. It also reduces the theoretical risk of deadlock by not
      holding the monitor while making an alien call. The downside of the
      alternative is that it may lead to messages being forwarded out of
      order. For this reason the alternative approach was rejected.
      28174744
    • R
      Add generic parameters to MessageHandler impls · 3f9da6f4
      Rossen Stoyanchev 提交于
      3f9da6f4
  18. 18 6月, 2013 1 次提交
  19. 17 6月, 2013 1 次提交
  20. 15 6月, 2013 2 次提交
  21. 14 6月, 2013 2 次提交
    • A
      Make Message type pluggable · 3022f5e3
      Andy Wilkinson 提交于
      To improve compatibility between Spring's messaging classes and
      Spring Integration, the type of Message that is created has been made
      pluggable through the introduction of a factory abstraction;
      MessageFactory.
      
      By default a MessageFactory is provided that will create
      org.springframework.messaging.GenericMessage instances, however this
      can be replaced with an alternative implementation. For example,
      Spring Integration can provide an implementation that creates
      org.springframework.integration.message.GenericMessage instances.
      
      This control over the type of Message that's created allows messages
      to flow from Spring messaging code into Spring Integration code without
      any need for conversion. In further support of this goal,
      MessageChannel, MessageHandler, and SubscribableChannel have been
      genericized to make the Message type that they deal with more
      flexible.
      3022f5e3
    • R
      Use tcp-reactor in StompRelayMessageHandler · 641aaf4b
      Rossen Stoyanchev 提交于
      641aaf4b