SimpleBrokerMessageHandler.java 8.4 KB
Newer Older
1
/*
J
Juergen Hoeller 已提交
2
 * Copyright 2002-2015 the original author or authors.
3 4 5 6 7
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9 10 11 12 13 14 15 16
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.messaging.simp.broker;
18

19
import java.util.Collection;
20

21 22
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
23
import org.springframework.messaging.MessageHeaders;
24
import org.springframework.messaging.SubscribableChannel;
25 26
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
27
import org.springframework.messaging.support.MessageBuilder;
28
import org.springframework.messaging.support.MessageHeaderAccessor;
29
import org.springframework.messaging.support.MessageHeaderInitializer;
30
import org.springframework.util.Assert;
31
import org.springframework.util.MultiValueMap;
32
import org.springframework.util.PathMatcher;
33 34

/**
35 36 37 38
 * A "simple" message broker that recognizes the message types defined in
 * {@link SimpMessageType}, keeps track of subscriptions with the help of a
 * {@link SubscriptionRegistry} and sends messages to subscribers.
 *
39 40 41
 * @author Rossen Stoyanchev
 * @since 4.0
 */
42
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
43

44 45
	private static final byte[] EMPTY_PAYLOAD = new byte[0];

46
	private SubscriptionRegistry subscriptionRegistry;
47

48 49
	private PathMatcher pathMatcher;

50 51
	private MessageHeaderInitializer headerInitializer;

52 53

	/**
54 55
	 * Create a SimpleBrokerMessageHandler instance with the given message channels
	 * and destination prefixes.
56 57
	 * @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
	 * @param clientOutboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
58
	 * @param brokerChannel the channel for the application to send messages to the broker
59
	 * @param destinationPrefixes prefixes to use to filter out messages
60
	 */
61
	public SimpleBrokerMessageHandler(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel,
62 63
			SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {

64 65
		super(clientInboundChannel, clientOutboundChannel, brokerChannel, destinationPrefixes);
		this.subscriptionRegistry = new DefaultSubscriptionRegistry();
66 67 68
	}


69 70 71 72 73 74 75
	/**
	 * Configure a custom SubscriptionRegistry to use for storing subscriptions.
	 * <p><strong>Note</strong> that when a custom PathMatcher is configured via
	 * {@link #setPathMatcher}, if the custom registry is not an instance of
	 * {@link DefaultSubscriptionRegistry}, the provided PathMatcher is not used
	 * and must be configured directly on the custom registry.
	 */
76
	public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
P
Phillip Webb 已提交
77
		Assert.notNull(subscriptionRegistry, "SubscriptionRegistry must not be null");
78
		this.subscriptionRegistry = subscriptionRegistry;
79 80 81 82 83 84 85 86 87
		initPathMatcherToUse();
	}

	private void initPathMatcherToUse() {
		if (this.pathMatcher != null) {
			if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
				((DefaultSubscriptionRegistry) this.subscriptionRegistry).setPathMatcher(this.pathMatcher);
			}
		}
88 89
	}

90 91
	public SubscriptionRegistry getSubscriptionRegistry() {
		return this.subscriptionRegistry;
92 93
	}

94 95 96 97 98 99 100 101 102
	/**
	 * When configured, the given PathMatcher is passed down to the
	 * SubscriptionRegistry to use for matching destination to subscriptions.
	 */
	public void setPathMatcher(PathMatcher pathMatcher) {
		this.pathMatcher = pathMatcher;
		initPathMatcherToUse();
	}

103
	/**
J
Juergen Hoeller 已提交
104 105
	 * Configure a {@link MessageHeaderInitializer} to apply to the headers
	 * of all messages sent to the client outbound channel.
106 107 108 109 110 111 112
	 * <p>By default this property is not set.
	 */
	public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
		this.headerInitializer = headerInitializer;
	}

	/**
J
Juergen Hoeller 已提交
113
	 * Return the configured header initializer.
114 115 116 117 118
	 */
	public MessageHeaderInitializer getHeaderInitializer() {
		return this.headerInitializer;
	}

119 120 121 122

	@Override
	public void startInternal() {
		publishBrokerAvailableEvent();
123 124
	}

125
	@Override
126 127 128 129 130 131
	public void stopInternal() {
		publishBrokerUnavailableEvent();
	}

	@Override
	protected void handleMessageInternal(Message<?> message) {
132 133 134 135
		MessageHeaders headers = message.getHeaders();
		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		String destination = SimpMessageHeaderAccessor.getDestination(headers);
		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
136 137 138 139

		if (!checkDestinationPrefix(destination)) {
			return;
		}
140

141 142 143 144 145 146
		SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
		if (accessor == null) {
			throw new IllegalStateException(
					"No header accessor (not using the SimpMessagingTemplate?): " + message);
		}

147
		if (SimpMessageType.MESSAGE.equals(messageType)) {
148
			logMessage(message);
149 150
			sendMessageToSubscribers(destination, message);
		}
151
		else if (SimpMessageType.CONNECT.equals(messageType)) {
152
			logMessage(message);
153 154 155
			SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
			initHeaders(connectAck);
			connectAck.setSessionId(sessionId);
156
			connectAck.setUser(SimpMessageHeaderAccessor.getUser(headers));
157 158
			connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
			Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
159
			getClientOutboundChannel().send(messageOut);
160
		}
161
		else if (SimpMessageType.DISCONNECT.equals(messageType)) {
162
			logMessage(message);
163
			this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
164 165 166
			SimpMessageHeaderAccessor disconnectAck = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
			initHeaders(disconnectAck);
			disconnectAck.setSessionId(sessionId);
167
			disconnectAck.setUser(SimpMessageHeaderAccessor.getUser(headers));
168 169
			Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, disconnectAck.getMessageHeaders());
			getClientOutboundChannel().send(messageOut);
170 171
		}
		else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
172
			logMessage(message);
173 174 175
			this.subscriptionRegistry.registerSubscription(message);
		}
		else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
176
			logMessage(message);
177 178
			this.subscriptionRegistry.unregisterSubscription(message);
		}
179
	}
180

181 182 183 184 185 186 187 188
	private void logMessage(Message<?> message) {
		if (logger.isDebugEnabled()) {
			SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
			accessor = (accessor != null ? accessor : SimpMessageHeaderAccessor.wrap(message));
			logger.debug("Processing " + accessor.getShortLogMessage(message.getPayload()));
		}
	}

189 190 191 192 193 194
	private void initHeaders(SimpMessageHeaderAccessor accessor) {
		if (getHeaderInitializer() != null) {
			getHeaderInitializer().initHeaders(accessor);
		}
	}

195
	protected void sendMessageToSubscribers(String destination, Message<?> message) {
196
		MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
J
Juergen Hoeller 已提交
197
		if (!subscriptions.isEmpty() && logger.isDebugEnabled()) {
198
			logger.debug("Broadcasting to " + subscriptions.size() + " sessions.");
199
		}
200 201
		for (String sessionId : subscriptions.keySet()) {
			for (String subscriptionId : subscriptions.get(sessionId)) {
202
				SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
203
				initHeaders(headerAccessor);
204 205 206
				headerAccessor.setSessionId(sessionId);
				headerAccessor.setSubscriptionId(subscriptionId);
				headerAccessor.copyHeadersIfAbsent(message.getHeaders());
207
				Object payload = message.getPayload();
208
				Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
209
				try {
210
					getClientOutboundChannel().send(reply);
211 212
				}
				catch (Throwable ex) {
213
					logger.error("Failed to send " + message, ex);
214 215 216 217
				}
			}
		}
	}
218

219 220 221 222 223
	@Override
	public String toString() {
		return "SimpleBroker[" + this.subscriptionRegistry + "]";
	}

224
}