提交 730d456e 编写于 作者: R Rossen Stoyanchev 提交者: Rossen Stoyanchev

Add early STOMP/reactor support

上级 827e20e3
......@@ -483,11 +483,13 @@ project("spring-websocket") {
optional("org.eclipse.jetty.websocket:websocket-server:9.0.3.v20130506")
optional("org.eclipse.jetty.websocket:websocket-client:9.0.3.v20130506")
optional("com.fasterxml.jackson.core:jackson-databind:2.2.0") // required for SockJS support currently
optional("reactor:reactor-core:1.0.0.BUILD-SNAPSHOT")
}
repositories {
maven { url "https://repository.apache.org/content/repositories/snapshots" } // tomcat-websocket-* snapshots
maven { url "https://maven.java.net/content/repositories/releases" } // javax.websocket, tyrus
maven { url 'http://repo.springsource.org/libs-snapshot' } // reactor
}
}
......
......@@ -101,4 +101,9 @@ public final class BinaryMessage extends WebSocketMessage<ByteBuffer> {
return (getPayload() != null) ? getPayload().remaining() : 0;
}
@Override
protected String toStringPayload() {
return (getPayload() != null) ? getPayload().toString() : null;
}
}
......@@ -49,4 +49,9 @@ public final class TextMessage extends WebSocketMessage<String> {
return getPayload().length();
}
@Override
protected String toStringPayload() {
return (getPayloadSize() > 25) ? getPayload().substring(0, 25) + "..." : getPayload();
}
}
......@@ -79,9 +79,12 @@ public abstract class WebSocketMessage<T> {
@Override
public String toString() {
return getClass().getSimpleName() + " [payload length=" + getPayloadSize() + ", last=" + isLast() + "]";
return getClass().getSimpleName() + " payload= " + toStringPayload()
+ ", length=" + getPayloadSize() + ", last=" + isLast() + "]";
}
protected abstract String toStringPayload();
protected abstract int getPayloadSize();
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public enum StompCommand {
// client
CONNECT,
STOMP,
SEND,
SUBSCRIBE,
UNSUBSCRIBE,
ACK,
NACK,
BEGIN,
COMMIT,
ABORT,
DISCONNECT,
// server
CONNECTED,
MESSAGE,
RECEIPT,
ERROR
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp;
import org.springframework.core.NestedRuntimeException;
/**
* @author Gary Russell
* @since 4.0
*
*/
@SuppressWarnings("serial")
public class StompException extends NestedRuntimeException {
public StompException(String msg, Throwable cause) {
super(msg, cause);
}
public StompException(String msg) {
super(msg);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompHeaders implements MultiValueMap<String, String>, Serializable {
private static final long serialVersionUID = 1L;
// Client
private static final String ACCEPT_VERSION = "accept-version";
private static final String ID = "id";
private static final String HOST = "host";
// Server
private static final String MESSAGE_ID = "message-id";
private static final String RECEIPT_ID = "receipt-id";
private static final String SUBSCRIPTION = "subscription";
private static final String VERSION = "version";
// Client and Server
private static final String ACK = "ack";
private static final String DESTINATION = "destination";
private static final String HEARTBEAT = "heart-beat";
private final Map<String, List<String>> headers;
/**
* Private constructor that can create read-only {@code StompHeaders} instances.
*/
private StompHeaders(Map<String, List<String>> headers, boolean readOnly) {
Assert.notNull(headers, "'headers' must not be null");
if (readOnly) {
Map<String, List<String>> map = new LinkedHashMap<String, List<String>>(headers.size());
for (Entry<String, List<String>> entry : headers.entrySet()) {
List<String> values = Collections.unmodifiableList(entry.getValue());
map.put(entry.getKey(), values);
}
this.headers = Collections.unmodifiableMap(map);
}
else {
this.headers = headers;
}
}
/**
* Constructs a new, empty instance of the {@code StompHeaders} object.
*/
public StompHeaders() {
this(new LinkedHashMap<String, List<String>>(4), false);
}
/**
* Returns {@code StompHeaders} object that can only be read, not written to.
*/
public static StompHeaders readOnlyStompHeaders(StompHeaders headers) {
return new StompHeaders(headers, true);
}
public Set<String> getAcceptVersion() {
String rawValue = getFirst(ACCEPT_VERSION);
return (rawValue != null) ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.<String>emptySet();
}
public void setAcceptVersion(String acceptVersion) {
set(ACCEPT_VERSION, acceptVersion);
}
public String getVersion() {
return getFirst(VERSION);
}
public void setVersion(String version) {
set(VERSION, version);
}
public String getDestination() {
return getFirst(DESTINATION);
}
public void setDestination(String destination) {
set(DESTINATION, destination);
}
public long[] getHeartbeat() {
String rawValue = getFirst(HEARTBEAT);
if (!StringUtils.hasText(rawValue)) {
return null;
}
String[] rawValues = StringUtils.commaDelimitedListToStringArray(rawValue);
// TODO assertions
return new long[] { Long.valueOf(rawValues[0]), Long.valueOf(rawValues[1])};
}
public void setHeartbeat(long cx, long cy) {
set(HEARTBEAT, StringUtils.arrayToCommaDelimitedString(new Object[] {cx, cy}));
}
public String getId() {
return getFirst(ID);
}
public void setId(String id) {
set(ID, id);
}
public String getMessageId() {
return getFirst(MESSAGE_ID);
}
public void setMessageId(String id) {
set(MESSAGE_ID, id);
}
public String getSubscription() {
return getFirst(SUBSCRIPTION);
}
public void setSubscription(String id) {
set(SUBSCRIPTION, id);
}
// MultiValueMap methods
/**
* Return the first header value for the given header name, if any.
* @param headerName the header name
* @return the first header value; or {@code null}
*/
public String getFirst(String headerName) {
List<String> headerValues = headers.get(headerName);
return headerValues != null ? headerValues.get(0) : null;
}
/**
* Add the given, single header value under the given name.
* @param headerName the header name
* @param headerValue the header value
* @throws UnsupportedOperationException if adding headers is not supported
* @see #put(String, List)
* @see #set(String, String)
*/
public void add(String headerName, String headerValue) {
List<String> headerValues = headers.get(headerName);
if (headerValues == null) {
headerValues = new LinkedList<String>();
this.headers.put(headerName, headerValues);
}
headerValues.add(headerValue);
}
/**
* Set the given, single header value under the given name.
* @param headerName the header name
* @param headerValue the header value
* @throws UnsupportedOperationException if adding headers is not supported
* @see #put(String, List)
* @see #add(String, String)
*/
public void set(String headerName, String headerValue) {
List<String> headerValues = new LinkedList<String>();
headerValues.add(headerValue);
headers.put(headerName, headerValues);
}
public void setAll(Map<String, String> values) {
for (Entry<String, String> entry : values.entrySet()) {
set(entry.getKey(), entry.getValue());
}
}
public Map<String, String> toSingleValueMap() {
LinkedHashMap<String, String> singleValueMap = new LinkedHashMap<String,String>(this.headers.size());
for (Entry<String, List<String>> entry : headers.entrySet()) {
singleValueMap.put(entry.getKey(), entry.getValue().get(0));
}
return singleValueMap;
}
// Map implementation
public int size() {
return this.headers.size();
}
public boolean isEmpty() {
return this.headers.isEmpty();
}
public boolean containsKey(Object key) {
return this.headers.containsKey(key);
}
public boolean containsValue(Object value) {
return this.headers.containsValue(value);
}
public List<String> get(Object key) {
return this.headers.get(key);
}
public List<String> put(String key, List<String> value) {
return this.headers.put(key, value);
}
public List<String> remove(Object key) {
return this.headers.remove(key);
}
public void putAll(Map<? extends String, ? extends List<String>> m) {
this.headers.putAll(m);
}
public void clear() {
this.headers.clear();
}
public Set<String> keySet() {
return this.headers.keySet();
}
public Collection<List<String>> values() {
return this.headers.values();
}
public Set<Entry<String, List<String>>> entrySet() {
return this.headers.entrySet();
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof StompHeaders)) {
return false;
}
StompHeaders otherHeaders = (StompHeaders) other;
return this.headers.equals(otherHeaders.headers);
}
@Override
public int hashCode() {
return this.headers.hashCode();
}
@Override
public String toString() {
return this.headers.toString();
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp;
import java.nio.charset.Charset;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompMessage {
public static final Charset CHARSET = Charset.forName("UTF-8");
private final StompCommand command;
private final StompHeaders headers;
private final byte[] payload;
public StompMessage(StompCommand command, StompHeaders headers, byte[] payload) {
this.command = command;
this.headers = (headers != null) ? headers : new StompHeaders();
this.payload = payload;
}
/**
* Constructor for empty payload message.
*/
public StompMessage(StompCommand command, StompHeaders headers) {
this(command, headers, new byte[0]);
}
public StompCommand getCommand() {
return this.command;
}
public StompHeaders getHeaders() {
return this.headers;
}
public byte[] getPayload() {
return this.payload;
}
@Override
public String toString() {
return "StompMessage [headers=" + this.headers + ", payload=" + new String(this.payload) + "]";
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp;
import java.io.IOException;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface StompSession {
String getId();
void sendMessage(StompMessage message) throws IOException;
void close() throws Exception;
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.adapter;
import java.io.IOException;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface StompMessageProcessor {
void processMessage(StompSession stompSession, StompMessage message) throws IOException;
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.adapter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
import org.springframework.web.stomp.support.StompMessageConverter;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompWebSocketHandler extends TextWebSocketHandlerAdapter {
private final StompMessageProcessor messageProcessor;
private final StompMessageConverter messageConverter = new StompMessageConverter();
private final Map<String, StompSession> sessions = new ConcurrentHashMap<String, StompSession>();
public StompWebSocketHandler(StompMessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WebSocketStompSession stompSession = new WebSocketStompSession(session, this.messageConverter);
this.sessions.put(session.getId(), stompSession);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
StompSession stompSession = this.sessions.get(session.getId());
Assert.notNull(stompSession, "No STOMP session for WebSocket session id=" + session.getId());
StompMessage stompMessage = this.messageConverter.toStompMessage(message.getPayload());
this.messageProcessor.processMessage(stompSession, stompMessage);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
this.sessions.remove(session.getId());
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.adapter;
import java.io.IOException;
import org.springframework.util.Assert;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
import org.springframework.web.stomp.support.StompMessageConverter;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebSocketStompSession implements StompSession {
private final String id;
private WebSocketSession webSocketSession;
private final StompMessageConverter messageConverter;
public WebSocketStompSession(WebSocketSession webSocketSession, StompMessageConverter messageConverter) {
Assert.notNull(webSocketSession, "webSocketSession is required");
this.id = webSocketSession.getId();
this.webSocketSession = webSocketSession;
this.messageConverter = messageConverter;
}
@Override
public String getId() {
return this.id;
}
@Override
public void sendMessage(StompMessage message) throws IOException {
Assert.notNull(this.webSocketSession, "Cannot send message without active session");
byte[] bytes = this.messageConverter.fromStompMessage(message);
this.webSocketSession.sendMessage(new TextMessage(new String(bytes, StompMessage.CHARSET)));
}
public void sessionClosed() {
this.webSocketSession = null;
}
@Override
public void close() throws Exception {
this.webSocketSession.close();
this.webSocketSession = null;
}
}
\ No newline at end of file
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompException;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
import org.springframework.web.stomp.adapter.StompMessageProcessor;
import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.fn.Registration;
import reactor.fn.Tuple;
/**
* @author Gary Russell
* @author Rossen Stoyanchev
* @since 4.0
*
*/
public class ReactorServerStompMessageProcessor implements StompMessageProcessor {
private static Log logger = LogFactory.getLog(ReactorServerStompMessageProcessor.class);
private final Reactor reactor;
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
public ReactorServerStompMessageProcessor(Reactor reactor) {
this.reactor = reactor;
}
public void processMessage(StompSession session, StompMessage message) throws IOException {
StompCommand command = message.getCommand();
Assert.notNull(command, "STOMP command not found");
if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
connect(session, message);
}
else if (StompCommand.SUBSCRIBE.equals(command)) {
subscribe(session, message);
}
else if (StompCommand.UNSUBSCRIBE.equals(command)) {
unsubscribe(session, message);
}
else if (StompCommand.SEND.equals(command)) {
send(session, message);
}
else if (StompCommand.DISCONNECT.equals(command)) {
disconnect(session);
}
else {
throw new IllegalStateException("Unexpected command: " + command);
}
}
protected void connect(StompSession session, StompMessage connectMessage) throws IOException {
StompHeaders headers = new StompHeaders();
Set<String> acceptVersions = connectMessage.getHeaders().getAcceptVersion();
if (acceptVersions.contains("1.2")) {
headers.setVersion("1.2");
}
else if (acceptVersions.contains("1.1")) {
headers.setVersion("1.1");
}
else if (acceptVersions.isEmpty()) {
// 1.0
}
else {
throw new StompException("Unsupported version '" + acceptVersions + "'");
}
headers.setHeartbeat(0,0); // TODO
headers.setId(session.getId());
// TODO: security
this.reactor.notify(StompCommand.CONNECT, Fn.event(session.getId()));
session.sendMessage(new StompMessage(StompCommand.CONNECTED, headers));
}
protected void subscribe(final StompSession session, StompMessage message) {
final String subscription = message.getHeaders().getId();
String replyToKey = StompCommand.SUBSCRIBE + ":" + session.getId() + ":" + subscription;
if (logger.isTraceEnabled()) {
logger.trace("Adding subscription with replyToKey=" + replyToKey);
}
Registration<?> registration = this.reactor.on(Fn.$(replyToKey), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
event.getData().getHeaders().setSubscription(subscription);
try {
session.sendMessage(event.getData());
}
catch (IOException e) {
// TODO: stomp error, close session, websocket close status
ReactorServerStompMessageProcessor.this.removeSubscriptions(session.getId());
e.printStackTrace();
}
}
});
addSubscription(session.getId(), registration);
this.reactor.notify(StompCommand.SUBSCRIBE, Fn.event(Tuple.of(session.getId(), message), replyToKey));
}
private void addSubscription(String sessionId, Registration<?> registration) {
List<Registration<?>> list = this.subscriptionsBySession.get(sessionId);
if (list == null) {
list = new ArrayList<Registration<?>>();
this.subscriptionsBySession.put(sessionId, list);
}
list.add(registration);
}
protected void unsubscribe(StompSession session, StompMessage message) {
this.reactor.notify(StompCommand.UNSUBSCRIBE, Fn.event(Tuple.of(session.getId(), message)));
}
protected void send(StompSession session, StompMessage message) {
this.reactor.notify(StompCommand.SEND, Fn.event(Tuple.of(session.getId(), message)));
}
protected void disconnect(StompSession session) {
String sessionId = session.getId();
removeSubscriptions(sessionId);
this.reactor.notify(StompCommand.DISCONNECT, Fn.event(sessionId));
}
private void removeSubscriptions(String sessionId) {
List<Registration<?>> registrations = this.subscriptionsBySession.remove(sessionId);
if (logger.isTraceEnabled()) {
logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId);
}
for (Registration<?> registration : registrations) {
registration.cancel();
}
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.server;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.fn.Registration;
import reactor.fn.Tuple2;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleStompReactorService {
private static final Log logger = LogFactory.getLog(SimpleStompReactorService.class);
private final Reactor reactor;
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
public SimpleStompReactorService(Reactor reactor) {
this.reactor = reactor;
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new SubscribeConsumer());
this.reactor.on(Fn.$(StompCommand.SEND), new SendConsumer());
this.reactor.on(Fn.$(StompCommand.DISCONNECT), new DisconnectConsumer());
}
private void addSubscription(String sessionId, Registration<?> registration) {
List<Registration<?>> list = this.subscriptionsBySession.get(sessionId);
if (list == null) {
list = new ArrayList<Registration<?>>();
this.subscriptionsBySession.put(sessionId, list);
}
list.add(registration);
}
private void removeSubscriptions(String sessionId) {
List<Registration<?>> registrations = this.subscriptionsBySession.remove(sessionId);
if (logger.isTraceEnabled()) {
logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId);
}
for (Registration<?> registration : registrations) {
registration.cancel();
}
}
private final class SubscribeConsumer implements Consumer<Event<Tuple2<String, StompMessage>>> {
@Override
public void accept(Event<Tuple2<String, StompMessage>> event) {
String sessionId = event.getData().getT1();
StompMessage message = event.getData().getT2();
final Object replyToKey = event.getReplyTo();
if (logger.isDebugEnabled()) {
logger.debug("Subscribe " + message);
}
Registration<?> registration = SimpleStompReactorService.this.reactor.on(
Fn.$("destination:" + message.getHeaders().getDestination()),
new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
StompMessage inMessage = event.getData();
StompHeaders headers = new StompHeaders();
headers.setDestination(inMessage.getHeaders().getDestination());
StompMessage outMessage = new StompMessage(StompCommand.MESSAGE, headers, inMessage.getPayload());
SimpleStompReactorService.this.reactor.notify(replyToKey, Fn.event(outMessage));
}
});
addSubscription(sessionId, registration);
}
}
private final class SendConsumer implements Consumer<Event<Tuple2<String, StompMessage>>> {
@Override
public void accept(Event<Tuple2<String, StompMessage>> event) {
StompMessage message = event.getData().getT2();
logger.debug("Message received: " + message);
String destination = message.getHeaders().getDestination();
SimpleStompReactorService.this.reactor.notify("destination:" + destination, Fn.event(message));
}
}
private final class DisconnectConsumer implements Consumer<Event<String>> {
@Override
public void accept(Event<String> event) {
String sessionId = event.getData();
SimpleStompReactorService.this.removeSubscriptions(sessionId);
}
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.support;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
import org.springframework.util.Assert;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompException;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
/**
* @author Gary Russell
* @since 4.0
*
*/
public class StompMessageConverter {
public static final byte LF = 0x0a;
public static final byte CR = 0x0d;
private static final byte COLON = ':';
/**
* @param bytes a complete STOMP message (without the trailing 0x00).
*/
public StompMessage toStompMessage(Object stomp) {
Assert.state(stomp instanceof String || stomp instanceof byte[], "'stomp' must be String or byte[]");
byte[] stompBytes = null;
if (stomp instanceof String) {
stompBytes = ((String) stomp).getBytes(StompMessage.CHARSET);
}
else {
stompBytes = (byte[]) stomp;
}
int totalLength = stompBytes.length;
if (stompBytes[totalLength-1] == 0) {
totalLength--;
}
int payloadIndex = findPayloadStart(stompBytes);
if (payloadIndex == 0) {
throw new StompException("No command found");
}
String headerString = new String(stompBytes, 0, payloadIndex, StompMessage.CHARSET);
Parser parser = new Parser(headerString);
StompHeaders headers = new StompHeaders();
// TODO: validate command and whether a payload is allowed
StompCommand command = StompCommand.valueOf(parser.nextToken(LF).trim());
Assert.notNull(command, "No command found");
while (parser.hasNext()) {
String header = parser.nextToken(COLON);
if (header != null) {
if (parser.hasNext()) {
String value = parser.nextToken(LF);
headers.add(header, value);
}
else {
throw new StompException("Parse exception for " + headerString);
}
}
}
byte[] payload = new byte[totalLength - payloadIndex];
System.arraycopy(stompBytes, payloadIndex, payload, 0, totalLength - payloadIndex);
return new StompMessage(command, headers, payload);
}
public byte[] fromStompMessage(StompMessage message) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
StompHeaders headers = message.getHeaders();
StompCommand command = message.getCommand();
try {
outputStream.write(command.toString().getBytes("UTF-8"));
outputStream.write(LF);
for (Entry<String, List<String>> entry : headers.entrySet()) {
String key = entry.getKey();
key = replaceAllOutbound(key);
for (String value : entry.getValue()) {
outputStream.write(key.getBytes("UTF-8"));
outputStream.write(COLON);
value = replaceAllOutbound(value);
outputStream.write(value.getBytes("UTF-8"));
outputStream.write(LF);
}
}
outputStream.write(LF);
outputStream.write(message.getPayload());
outputStream.write(0);
return outputStream.toByteArray();
}
catch (IOException e) {
throw new StompException("Failed to serialize " + message, e);
}
}
private String replaceAllOutbound(String key) {
return key.replaceAll("\\\\", "\\\\")
.replaceAll(":", "\\\\c")
.replaceAll("\n", "\\\\n")
.replaceAll("\r", "\\\\r");
}
private int findPayloadStart(byte[] bytes) {
int i;
// ignore any leading EOL from the previous message
for (i = 0; i < bytes.length; i++) {
if (bytes[i] != '\n' && bytes[i] != '\r' ) {
break;
}
bytes[i] = ' ';
}
int payloadOffset = 0;
for (; i < bytes.length - 1; i++) {
if ((bytes[i] == LF && bytes[i+1] == LF)) {
payloadOffset = i + 2;
break;
}
if (i < bytes.length - 3 &&
(bytes[i] == CR && bytes[i+1] == LF &&
bytes[i+2] == CR && bytes[i+3] == LF)) {
payloadOffset = i + 4;
break;
}
}
if (i >= bytes.length) {
throw new StompException("No end of headers found");
}
return payloadOffset;
}
private class Parser {
private final String content;
private int offset;
public Parser(String content) {
this.content = content;
}
public boolean hasNext() {
return this.offset < this.content.length();
}
public String nextToken(byte delimiter) {
if (this.offset >= this.content.length()) {
return null;
}
int delimAt = this.content.indexOf(delimiter, this.offset);
if (delimAt == -1) {
if (this.offset == this.content.length() - 1 && delimiter == COLON &&
this.content.charAt(this.offset) == LF) {
this.offset++;
return null;
}
else if (this.offset == this.content.length() - 2 && delimiter == COLON &&
this.content.charAt(this.offset) == CR &&
this.content.charAt(this.offset + 1) == LF) {
this.offset += 2;
return null;
}
else {
throw new StompException("No delimiter found at offset " + offset + " in " + this.content);
}
}
int escapeAt = this.content.indexOf('\\', this.offset);
String token = this.content.substring(this.offset, delimAt + 1);
this.offset += token.length();
if (escapeAt >= 0 && escapeAt < delimAt) {
char escaped = this.content.charAt(escapeAt + 1);
if (escaped == 'n' || escaped == 'c' || escaped == '\\') {
token = token.replaceAll("\\\\n", "\n")
.replaceAll("\\\\r", "\r")
.replaceAll("\\\\c", ":")
.replaceAll("\\\\\\\\", "\\\\");
}
else {
throw new StompException("Invalid escape sequence \\" + escaped);
}
}
int length = token.length();
if (delimiter == LF && length > 1 && token.charAt(length - 2) == CR) {
return token.substring(0, length - 2);
}
else {
return token.substring(0, length - 1);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册